diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index d4f44ea..513e762 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -1474,7 +1474,6 @@ HRESULT DirectPlay8Peer::EnumHosts(PDPN_APPLICATION_DESC CONST pApplicationDesc, std::unique_lock l(lock); host_enums.erase(handle); - l.unlock(); })); return DPNSUCCESS_PENDING; @@ -1972,36 +1971,79 @@ void DirectPlay8Peer::io_peer_send(std::unique_lock &l, unsigned int Peer *peer; SendQueue::SendOp *sqop; - while((peer = get_peer_by_peer_id(peer_id)) != NULL && (sqop = peer->sq.get_pending()) != NULL) + while((peer = get_peer_by_peer_id(peer_id)) != NULL) { - std::pair d = sqop->get_pending_data(); - - int s = send(peer->sock, (const char*)(d.first), d.second, 0); - if(s < 0) + if((sqop = peer->sq.get_pending()) != NULL) { - DWORD err = WSAGetLastError(); + std::pair d = sqop->get_pending_data(); - if(err == WSAEWOULDBLOCK) + int s = send(peer->sock, (const char*)(d.first), d.second, 0); + + if(s < 0) { - break; + DWORD err = WSAGetLastError(); + + if(err == WSAEWOULDBLOCK) + { + /* Send buffer full. Try again later. */ + break; + } + else{ + /* Write error. */ + + log_printf("Write error on peer %u: %s", peer_id, win_strerror(err).c_str()); + log_printf("Closing connection"); + + peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST); + return; + } } - else{ - /* TODO */ - return; + + sqop->inc_sent_data(s); + + if(s == d.second) + { + peer->sq.pop_pending(sqop); + + if(peer->sq.get_pending() != NULL) + { + /* There is another message in the send queue. + * + * Wake another worker to dispatch it in case we have to + * block within the application for a while. + */ + SetEvent(peer->event); + } + + sqop->invoke_callback(l, S_OK); + delete sqop; } } - - sqop->inc_sent_data(s); - - if(s == d.second) - { - peer->sq.pop_pending(sqop); + else{ + if(peer->state == Peer::PS_CLOSING && peer->send_open) + { + /* Peer is in a closing state and send queue has been cleared. + * + * Begin a graceful shutdown from our end to ensure the peer + * receives any informational messages about the close. It will do + * a hard close once it receives our EOF. + */ + + if(shutdown(peer->sock, SD_SEND) != 0) + { + DWORD err = WSAGetLastError(); + log_printf( + "shutdown(SD_SEND) on peer %u failed: %s", + peer_id, win_strerror(err)); + + peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST); + return; + } + + peer->send_open = false; + } - /* TODO: Poke event, send/recv mutexes per TCP socket. */ - - sqop->invoke_callback(l, S_OK); - - delete sqop; + break; } } } @@ -2042,11 +2084,14 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int int r = recv(peer->sock, (char*)(peer->recv_buf) + peer->recv_buf_cur, sizeof(peer->recv_buf) - peer->recv_buf_cur, 0); if(r == 0) { - /* Connection closed */ - /* TODO */ - break; + /* When the remote end initiates a graceful close, it will no longer + * process anything we send it. Just close the connection. + */ + + peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_NORMAL); + return; } - else if(r == -1) + else if(r < 0) { DWORD err = WSAGetLastError(); @@ -2057,15 +2102,19 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int } else{ /* Read error. */ - /* TODO */ - break; + + log_printf("Read error on peer %u: %s", peer_id, win_strerror(err).c_str()); + log_printf("Closing connection"); + + peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST); + return; } } if(peer->state == Peer::PS_CLOSING) { /* When a peer is in PS_CLOSING, we keep the socket open until the send queue has - * been flushed and discard anything we read until we get EOF. + * been flushed and discard anything we read until we get EOF. */ continue; @@ -2081,8 +2130,13 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int if(full_packet_size > MAX_PACKET_SIZE) { /* Malformed packet received - TCP stream invalid! */ - /* TODO */ - abort(); + + log_printf( + "Received over-size packet from peer %u, dropping connection", + peer_id); + + peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST); + return; } if(peer->recv_buf_cur >= full_packet_size) @@ -2096,8 +2150,13 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int catch(const PacketDeserialiser::Error &e) { /* Malformed packet received - TCP stream invalid! */ - /* TODO */ - abort(); + + log_printf( + "Received malformed packet (%s) from peer %u, dropping connection", + e.what(), peer_id); + + peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST); + return; } switch(pd->packet_type()) @@ -2213,20 +2272,53 @@ void DirectPlay8Peer::peer_accept(std::unique_lock &l) return; } else{ - /* TODO */ - abort(); + /* Hopefully this is temporary and doesn't go into a tight loop... */ + log_printf("Incoming connection failed: %s", win_strerror(err).c_str()); + return; } } + struct linger li; + li.l_onoff = 0; + li.l_linger = 0; + + if(setsockopt(newfd, SOL_SOCKET, SO_LINGER, (char*)(&li), sizeof(li)) != 0) + { + DWORD err = WSAGetLastError(); + log_printf("Failed to set SO_LINGER parameters on accepted connection: %s", win_strerror(err).c_str()); + + /* Not fatal, since this probably won't matter in production. */ + } + u_long non_blocking = 1; if(ioctlsocket(newfd, FIONBIO, &non_blocking) != 0) { - log_printf("ioctlsocket() failed, dropping peer"); + DWORD err = WSAGetLastError(); + log_printf("Failed to set accepted connection to non-blocking mode: %s", win_strerror(err).c_str()); + log_printf("Closing connection"); closesocket(newfd); return; } + /* Set SO_LINGER so that closesocket() does a hard close, immediately removing the socket + * address from the connection table. + * + * If this isn't done, then we are able to immediately bind() new sockets to the same + * local address (as we may when the port isn't specified), but then outgoing connections + * made from it will fail with WSAEADDRINUSE until the background close completes. + */ + + struct linger no_linger; + no_linger.l_onoff = 1; + no_linger.l_linger = 0; + + if(setsockopt(newfd, SOL_SOCKET, SO_LINGER, (char*)(&no_linger), sizeof(no_linger)) != 0) + { + DWORD err = WSAGetLastError(); + log_printf("Failed to set SO_LINGER on accepted connection: %s", win_strerror(err).c_str()); + } + unsigned int peer_id = next_peer_id++; Peer *peer = new Peer(Peer::PS_ACCEPTED, newfd, addr.sin_addr.s_addr, ntohs(addr.sin_port)); @@ -2286,69 +2378,82 @@ bool DirectPlay8Peer::peer_connect(Peer::PeerState initial_state, uint32_t remot return true; } -void DirectPlay8Peer::peer_destroy(std::unique_lock &l, unsigned int peer_id, HRESULT outstanding_op_result) +void DirectPlay8Peer::peer_destroy(std::unique_lock &l, unsigned int peer_id, HRESULT outstanding_op_result, DWORD destroy_player_reason) { - Peer *peer; + Peer *peer = get_peer_by_peer_id(peer_id); + assert(peer != NULL); - while((peer = get_peer_by_peer_id(peer_id)) != NULL) + /* Cancel any outstanding sends and notify the callbacks. */ + + for(SendQueue::SendOp *sqop; (sqop = peer->sq.get_pending()) != NULL;) { - /* Cancel any outstanding sends and notify the callbacks. */ + peer->sq.pop_pending(sqop); - SendQueue::SendOp *sqop; - if((sqop = peer->sq.get_pending()) != NULL) - { - peer->sq.pop_pending(sqop); - - sqop->invoke_callback(l, outstanding_op_result); - - delete sqop; - - /* Return to the start in case the peer has gone away while the lock was - * released to run the callback. - */ - continue; - } + sqop->invoke_callback(l, outstanding_op_result); + delete sqop; - /* Fail any outstanding acks and notify the callbacks. */ - - if(!peer->pending_acks.empty()) - { - auto ai = peer->pending_acks.begin(); - - std::function&, HRESULT)> callback = ai->second; - peer->pending_acks.erase(ai); - - callback(l, outstanding_op_result); - - /* Return to the start in case the peer has gone away while the lock was - * released to run the callback. - */ - continue; - } - - if(peer->state == Peer::PS_CONNECTED) - { - /* TODO: DPN_MSGID_DESTROY_PLAYER? */ - player_to_peer_id.erase(peer->player_id); - } - - worker_pool->remove_handle(peer->event); - - closesocket(peer->sock); - - peers.erase(peer_id); - delete peer; - - break; + RENEW_PEER_OR_RETURN(); } + + /* Fail any outstanding acks and notify the callbacks. */ + + while(!peer->pending_acks.empty()) + { + auto ai = peer->pending_acks.begin(); + + std::function&, HRESULT)> callback = ai->second; + peer->pending_acks.erase(ai); + + callback(l, outstanding_op_result); + + RENEW_PEER_OR_RETURN(); + } + + if(peer->state == Peer::PS_CONNECTED) + { + DPNMSG_DESTROY_PLAYER dp; + memset(&dp, 0, sizeof(dp)); + + dp.dwSize = sizeof(dp); + dp.dpnidPlayer = peer->player_id; + dp.pvPlayerContext = peer->player_ctx; + dp.dwReason = destroy_player_reason; + + peer->state = Peer::PS_CLOSING; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_DESTROY_PLAYER, &dp); + l.lock(); + + RENEW_PEER_OR_RETURN(); + + player_to_peer_id.erase(peer->player_id); + } + else if(state == STATE_CONNECTING && (peer->state == Peer::PS_CONNECTING_HOST || peer->state == Peer::PS_REQUESTING_HOST)) + { + connect_fail(l, outstanding_op_result, NULL, 0); + RENEW_PEER_OR_RETURN(); + } + else if(state == STATE_CONNECTING && (peer->state == Peer::PS_CONNECTING_PEER || peer->state == Peer::PS_REQUESTING_PEER)) + { + connect_fail(l, DPNERR_PLAYERNOTREACHABLE, NULL, 0); + RENEW_PEER_OR_RETURN(); + } + + worker_pool->remove_handle(peer->event); + + closesocket(peer->sock); + + peers.erase(peer_id); + delete peer; } /* Immediately close all sockets and erase all peers. */ -void DirectPlay8Peer::close_everything_now(std::unique_lock &l) +void DirectPlay8Peer::close_everything_now(std::unique_lock &l, HRESULT outstanding_op_result, DWORD destroy_player_reason) { while(!peers.empty()) { - peer_destroy(l, peers.begin()->first, DPNERR_GENERIC); + peer_destroy(l, peers.begin()->first, outstanding_op_result, destroy_player_reason); } if(discovery_socket != -1) @@ -2520,6 +2625,8 @@ void DirectPlay8Peer::handle_host_connect_request(std::unique_lock & connect_host_fail, NULL, [](std::unique_lock &l, HRESULT result) {}); + + peer->state = Peer::PS_CLOSING; }; if(state != STATE_HOSTING) @@ -2710,7 +2817,8 @@ void DirectPlay8Peer::handle_host_connect_ok(std::unique_lock &l, un if(peer->state != Peer::PS_REQUESTING_HOST) { - /* TODO: LOG ME */ + log_printf("Received unexpected DPLITE_MSGID_CONNECT_HOST_OK from peer %u, in state %u", + peer_id, (unsigned)(peer->state)); return; } @@ -2886,6 +2994,8 @@ void DirectPlay8Peer::handle_connect_peer(std::unique_lock &l, unsig connect_peer_fail, NULL, [](std::unique_lock &l, HRESULT result) {}); + + peer->state = Peer::PS_CLOSING; }; if(state != STATE_CONNECTED) @@ -2968,7 +3078,8 @@ void DirectPlay8Peer::handle_connect_peer_ok(std::unique_lock &l, un if(peer->state != Peer::PS_REQUESTING_PEER) { - /* TODO: LOG ME */ + log_printf("Received unexpected DPLITE_MSGID_CONNECT_PEER_OK from peer %u, in state %u", + peer_id, (unsigned)(peer->state)); return; } @@ -3094,9 +3205,17 @@ void DirectPlay8Peer::handle_playerinfo(std::unique_lock &l, unsigne std::pair data = pd.get_data(2); DWORD ack_id = pd.get_dword(3); - if(peer->state != Peer::PS_CONNECTED || player_id != peer->player_id) + if(peer->state != Peer::PS_CONNECTED) { - /* TODO: LOG ME */ + log_printf("Received unexpected DPLITE_MSGID_PLAYERINFO from peer %u, in state %u", + peer_id, (unsigned)(peer->state)); + return; + } + + if(player_id != peer->player_id) + { + log_printf("Received unexpected DPLITE_MSGID_PLAYERINFO from peer %u for player %u", + peer_id, (unsigned)(player_id)); return; } @@ -3237,7 +3356,6 @@ void DirectPlay8Peer::connect_check(std::unique_lock &l) /* Search for any outgoing connections we have initiated that haven't * completed or failed yet. */ - for(auto p = peers.begin(); p != peers.end(); ++p) { switch(p->second->state) @@ -3289,10 +3407,10 @@ void DirectPlay8Peer::connect_fail(std::unique_lock &l, HRESULT hRes { assert(state == STATE_CONNECTING); - close_everything_now(l); - state = STATE_CONNECT_FAILED; + close_everything_now(l, DPNERR_GENERIC, DPNDESTROYPLAYERREASON_CONNECTIONLOST); + DPNMSG_CONNECT_COMPLETE cc; memset(&cc, 0, sizeof(cc)); diff --git a/src/DirectPlay8Peer.hpp b/src/DirectPlay8Peer.hpp index 8c7b572..a43e58f 100644 --- a/src/DirectPlay8Peer.hpp +++ b/src/DirectPlay8Peer.hpp @@ -124,6 +124,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer long events; SendQueue sq; + bool send_open; /* Some messages require confirmation of success/failure from the other * peer. Each of these is assigned a rolling (per peer) ID, the callback @@ -187,9 +188,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer void peer_accept(std::unique_lock &l); bool peer_connect(Peer::PeerState initial_state, uint32_t remote_ip, uint16_t remote_port, DPNID player_id = 0); - void peer_destroy(std::unique_lock &l, unsigned int peer_id, HRESULT outstanding_op_result); + void peer_destroy(std::unique_lock &l, unsigned int peer_id, HRESULT outstanding_op_result, DWORD destroy_player_reason); - void close_everything_now(std::unique_lock &l); + void close_everything_now(std::unique_lock &l, HRESULT outstanding_op_result, DWORD destroy_player_reason); void handle_host_enum_request(std::unique_lock &l, const PacketDeserialiser &pd, const struct sockaddr_in *from_addr); void handle_host_connect_request(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); diff --git a/src/network.cpp b/src/network.cpp index e592d68..29a9807 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -106,6 +106,24 @@ int create_client_socket(uint32_t local_ipaddr, uint16_t local_port) return -1; } + /* Set SO_LINGER so that closesocket() does a hard close, immediately removing the socket + * address from the connection table. + * + * If this isn't done, then we are able to immediately bind() new sockets to the same + * local address (as we may when the port isn't specified), but then outgoing connections + * made from it will fail with WSAEADDRINUSE until the background close completes. + */ + + struct linger no_linger; + no_linger.l_onoff = 1; + no_linger.l_linger = 0; + + if(setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)(&no_linger), sizeof(no_linger)) == -1) + { + closesocket(sock); + return -1; + } + struct sockaddr_in l_addr; l_addr.sin_family = AF_INET; l_addr.sin_addr.s_addr = local_ipaddr;