1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Don't leak peer sockets and handle I/O errors on them.

This commit is contained in:
Daniel Collins 2018-10-06 22:06:05 +01:00
parent fffbfe3ce4
commit 61e8d23da8
3 changed files with 234 additions and 97 deletions

View File

@ -1474,7 +1474,6 @@ HRESULT DirectPlay8Peer::EnumHosts(PDPN_APPLICATION_DESC CONST pApplicationDesc,
std::unique_lock<std::mutex> l(lock);
host_enums.erase(handle);
l.unlock();
}));
return DPNSUCCESS_PENDING;
@ -1972,36 +1971,79 @@ void DirectPlay8Peer::io_peer_send(std::unique_lock<std::mutex> &l, unsigned int
Peer *peer;
SendQueue::SendOp *sqop;
while((peer = get_peer_by_peer_id(peer_id)) != NULL && (sqop = peer->sq.get_pending()) != NULL)
while((peer = get_peer_by_peer_id(peer_id)) != NULL)
{
std::pair<const void*, size_t> d = sqop->get_pending_data();
int s = send(peer->sock, (const char*)(d.first), d.second, 0);
if(s < 0)
if((sqop = peer->sq.get_pending()) != NULL)
{
DWORD err = WSAGetLastError();
std::pair<const void*, size_t> d = sqop->get_pending_data();
if(err == WSAEWOULDBLOCK)
int s = send(peer->sock, (const char*)(d.first), d.second, 0);
if(s < 0)
{
break;
DWORD err = WSAGetLastError();
if(err == WSAEWOULDBLOCK)
{
/* Send buffer full. Try again later. */
break;
}
else{
/* Write error. */
log_printf("Write error on peer %u: %s", peer_id, win_strerror(err).c_str());
log_printf("Closing connection");
peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST);
return;
}
}
else{
/* TODO */
return;
sqop->inc_sent_data(s);
if(s == d.second)
{
peer->sq.pop_pending(sqop);
if(peer->sq.get_pending() != NULL)
{
/* There is another message in the send queue.
*
* Wake another worker to dispatch it in case we have to
* block within the application for a while.
*/
SetEvent(peer->event);
}
sqop->invoke_callback(l, S_OK);
delete sqop;
}
}
sqop->inc_sent_data(s);
if(s == d.second)
{
peer->sq.pop_pending(sqop);
else{
if(peer->state == Peer::PS_CLOSING && peer->send_open)
{
/* Peer is in a closing state and send queue has been cleared.
*
* Begin a graceful shutdown from our end to ensure the peer
* receives any informational messages about the close. It will do
* a hard close once it receives our EOF.
*/
if(shutdown(peer->sock, SD_SEND) != 0)
{
DWORD err = WSAGetLastError();
log_printf(
"shutdown(SD_SEND) on peer %u failed: %s",
peer_id, win_strerror(err));
peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST);
return;
}
peer->send_open = false;
}
/* TODO: Poke event, send/recv mutexes per TCP socket. */
sqop->invoke_callback(l, S_OK);
delete sqop;
break;
}
}
}
@ -2042,11 +2084,14 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
int r = recv(peer->sock, (char*)(peer->recv_buf) + peer->recv_buf_cur, sizeof(peer->recv_buf) - peer->recv_buf_cur, 0);
if(r == 0)
{
/* Connection closed */
/* TODO */
break;
/* When the remote end initiates a graceful close, it will no longer
* process anything we send it. Just close the connection.
*/
peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_NORMAL);
return;
}
else if(r == -1)
else if(r < 0)
{
DWORD err = WSAGetLastError();
@ -2057,15 +2102,19 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
}
else{
/* Read error. */
/* TODO */
break;
log_printf("Read error on peer %u: %s", peer_id, win_strerror(err).c_str());
log_printf("Closing connection");
peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST);
return;
}
}
if(peer->state == Peer::PS_CLOSING)
{
/* When a peer is in PS_CLOSING, we keep the socket open until the send queue has
* been flushed and discard anything we read until we get EOF.
* been flushed and discard anything we read until we get EOF.
*/
continue;
@ -2081,8 +2130,13 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
if(full_packet_size > MAX_PACKET_SIZE)
{
/* Malformed packet received - TCP stream invalid! */
/* TODO */
abort();
log_printf(
"Received over-size packet from peer %u, dropping connection",
peer_id);
peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST);
return;
}
if(peer->recv_buf_cur >= full_packet_size)
@ -2096,8 +2150,13 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
catch(const PacketDeserialiser::Error &e)
{
/* Malformed packet received - TCP stream invalid! */
/* TODO */
abort();
log_printf(
"Received malformed packet (%s) from peer %u, dropping connection",
e.what(), peer_id);
peer_destroy(l, peer_id, DPNERR_CONNECTIONLOST, DPNDESTROYPLAYERREASON_CONNECTIONLOST);
return;
}
switch(pd->packet_type())
@ -2213,20 +2272,53 @@ void DirectPlay8Peer::peer_accept(std::unique_lock<std::mutex> &l)
return;
}
else{
/* TODO */
abort();
/* Hopefully this is temporary and doesn't go into a tight loop... */
log_printf("Incoming connection failed: %s", win_strerror(err).c_str());
return;
}
}
struct linger li;
li.l_onoff = 0;
li.l_linger = 0;
if(setsockopt(newfd, SOL_SOCKET, SO_LINGER, (char*)(&li), sizeof(li)) != 0)
{
DWORD err = WSAGetLastError();
log_printf("Failed to set SO_LINGER parameters on accepted connection: %s", win_strerror(err).c_str());
/* Not fatal, since this probably won't matter in production. */
}
u_long non_blocking = 1;
if(ioctlsocket(newfd, FIONBIO, &non_blocking) != 0)
{
log_printf("ioctlsocket() failed, dropping peer");
DWORD err = WSAGetLastError();
log_printf("Failed to set accepted connection to non-blocking mode: %s", win_strerror(err).c_str());
log_printf("Closing connection");
closesocket(newfd);
return;
}
/* Set SO_LINGER so that closesocket() does a hard close, immediately removing the socket
* address from the connection table.
*
* If this isn't done, then we are able to immediately bind() new sockets to the same
* local address (as we may when the port isn't specified), but then outgoing connections
* made from it will fail with WSAEADDRINUSE until the background close completes.
*/
struct linger no_linger;
no_linger.l_onoff = 1;
no_linger.l_linger = 0;
if(setsockopt(newfd, SOL_SOCKET, SO_LINGER, (char*)(&no_linger), sizeof(no_linger)) != 0)
{
DWORD err = WSAGetLastError();
log_printf("Failed to set SO_LINGER on accepted connection: %s", win_strerror(err).c_str());
}
unsigned int peer_id = next_peer_id++;
Peer *peer = new Peer(Peer::PS_ACCEPTED, newfd, addr.sin_addr.s_addr, ntohs(addr.sin_port));
@ -2286,69 +2378,82 @@ bool DirectPlay8Peer::peer_connect(Peer::PeerState initial_state, uint32_t remot
return true;
}
void DirectPlay8Peer::peer_destroy(std::unique_lock<std::mutex> &l, unsigned int peer_id, HRESULT outstanding_op_result)
void DirectPlay8Peer::peer_destroy(std::unique_lock<std::mutex> &l, unsigned int peer_id, HRESULT outstanding_op_result, DWORD destroy_player_reason)
{
Peer *peer;
Peer *peer = get_peer_by_peer_id(peer_id);
assert(peer != NULL);
while((peer = get_peer_by_peer_id(peer_id)) != NULL)
/* Cancel any outstanding sends and notify the callbacks. */
for(SendQueue::SendOp *sqop; (sqop = peer->sq.get_pending()) != NULL;)
{
/* Cancel any outstanding sends and notify the callbacks. */
peer->sq.pop_pending(sqop);
SendQueue::SendOp *sqop;
if((sqop = peer->sq.get_pending()) != NULL)
{
peer->sq.pop_pending(sqop);
sqop->invoke_callback(l, outstanding_op_result);
delete sqop;
/* Return to the start in case the peer has gone away while the lock was
* released to run the callback.
*/
continue;
}
sqop->invoke_callback(l, outstanding_op_result);
delete sqop;
/* Fail any outstanding acks and notify the callbacks. */
if(!peer->pending_acks.empty())
{
auto ai = peer->pending_acks.begin();
std::function<void(std::unique_lock<std::mutex>&, HRESULT)> callback = ai->second;
peer->pending_acks.erase(ai);
callback(l, outstanding_op_result);
/* Return to the start in case the peer has gone away while the lock was
* released to run the callback.
*/
continue;
}
if(peer->state == Peer::PS_CONNECTED)
{
/* TODO: DPN_MSGID_DESTROY_PLAYER? */
player_to_peer_id.erase(peer->player_id);
}
worker_pool->remove_handle(peer->event);
closesocket(peer->sock);
peers.erase(peer_id);
delete peer;
break;
RENEW_PEER_OR_RETURN();
}
/* Fail any outstanding acks and notify the callbacks. */
while(!peer->pending_acks.empty())
{
auto ai = peer->pending_acks.begin();
std::function<void(std::unique_lock<std::mutex>&, HRESULT)> callback = ai->second;
peer->pending_acks.erase(ai);
callback(l, outstanding_op_result);
RENEW_PEER_OR_RETURN();
}
if(peer->state == Peer::PS_CONNECTED)
{
DPNMSG_DESTROY_PLAYER dp;
memset(&dp, 0, sizeof(dp));
dp.dwSize = sizeof(dp);
dp.dpnidPlayer = peer->player_id;
dp.pvPlayerContext = peer->player_ctx;
dp.dwReason = destroy_player_reason;
peer->state = Peer::PS_CLOSING;
l.unlock();
message_handler(message_handler_ctx, DPN_MSGID_DESTROY_PLAYER, &dp);
l.lock();
RENEW_PEER_OR_RETURN();
player_to_peer_id.erase(peer->player_id);
}
else if(state == STATE_CONNECTING && (peer->state == Peer::PS_CONNECTING_HOST || peer->state == Peer::PS_REQUESTING_HOST))
{
connect_fail(l, outstanding_op_result, NULL, 0);
RENEW_PEER_OR_RETURN();
}
else if(state == STATE_CONNECTING && (peer->state == Peer::PS_CONNECTING_PEER || peer->state == Peer::PS_REQUESTING_PEER))
{
connect_fail(l, DPNERR_PLAYERNOTREACHABLE, NULL, 0);
RENEW_PEER_OR_RETURN();
}
worker_pool->remove_handle(peer->event);
closesocket(peer->sock);
peers.erase(peer_id);
delete peer;
}
/* Immediately close all sockets and erase all peers. */
void DirectPlay8Peer::close_everything_now(std::unique_lock<std::mutex> &l)
void DirectPlay8Peer::close_everything_now(std::unique_lock<std::mutex> &l, HRESULT outstanding_op_result, DWORD destroy_player_reason)
{
while(!peers.empty())
{
peer_destroy(l, peers.begin()->first, DPNERR_GENERIC);
peer_destroy(l, peers.begin()->first, outstanding_op_result, destroy_player_reason);
}
if(discovery_socket != -1)
@ -2520,6 +2625,8 @@ void DirectPlay8Peer::handle_host_connect_request(std::unique_lock<std::mutex> &
connect_host_fail,
NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) {});
peer->state = Peer::PS_CLOSING;
};
if(state != STATE_HOSTING)
@ -2710,7 +2817,8 @@ void DirectPlay8Peer::handle_host_connect_ok(std::unique_lock<std::mutex> &l, un
if(peer->state != Peer::PS_REQUESTING_HOST)
{
/* TODO: LOG ME */
log_printf("Received unexpected DPLITE_MSGID_CONNECT_HOST_OK from peer %u, in state %u",
peer_id, (unsigned)(peer->state));
return;
}
@ -2886,6 +2994,8 @@ void DirectPlay8Peer::handle_connect_peer(std::unique_lock<std::mutex> &l, unsig
connect_peer_fail,
NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) {});
peer->state = Peer::PS_CLOSING;
};
if(state != STATE_CONNECTED)
@ -2968,7 +3078,8 @@ void DirectPlay8Peer::handle_connect_peer_ok(std::unique_lock<std::mutex> &l, un
if(peer->state != Peer::PS_REQUESTING_PEER)
{
/* TODO: LOG ME */
log_printf("Received unexpected DPLITE_MSGID_CONNECT_PEER_OK from peer %u, in state %u",
peer_id, (unsigned)(peer->state));
return;
}
@ -3094,9 +3205,17 @@ void DirectPlay8Peer::handle_playerinfo(std::unique_lock<std::mutex> &l, unsigne
std::pair<const void*, size_t> data = pd.get_data(2);
DWORD ack_id = pd.get_dword(3);
if(peer->state != Peer::PS_CONNECTED || player_id != peer->player_id)
if(peer->state != Peer::PS_CONNECTED)
{
/* TODO: LOG ME */
log_printf("Received unexpected DPLITE_MSGID_PLAYERINFO from peer %u, in state %u",
peer_id, (unsigned)(peer->state));
return;
}
if(player_id != peer->player_id)
{
log_printf("Received unexpected DPLITE_MSGID_PLAYERINFO from peer %u for player %u",
peer_id, (unsigned)(player_id));
return;
}
@ -3237,7 +3356,6 @@ void DirectPlay8Peer::connect_check(std::unique_lock<std::mutex> &l)
/* Search for any outgoing connections we have initiated that haven't
* completed or failed yet.
*/
for(auto p = peers.begin(); p != peers.end(); ++p)
{
switch(p->second->state)
@ -3289,10 +3407,10 @@ void DirectPlay8Peer::connect_fail(std::unique_lock<std::mutex> &l, HRESULT hRes
{
assert(state == STATE_CONNECTING);
close_everything_now(l);
state = STATE_CONNECT_FAILED;
close_everything_now(l, DPNERR_GENERIC, DPNDESTROYPLAYERREASON_CONNECTIONLOST);
DPNMSG_CONNECT_COMPLETE cc;
memset(&cc, 0, sizeof(cc));

View File

@ -124,6 +124,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer
long events;
SendQueue sq;
bool send_open;
/* Some messages require confirmation of success/failure from the other
* peer. Each of these is assigned a rolling (per peer) ID, the callback
@ -187,9 +188,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer
void peer_accept(std::unique_lock<std::mutex> &l);
bool peer_connect(Peer::PeerState initial_state, uint32_t remote_ip, uint16_t remote_port, DPNID player_id = 0);
void peer_destroy(std::unique_lock<std::mutex> &l, unsigned int peer_id, HRESULT outstanding_op_result);
void peer_destroy(std::unique_lock<std::mutex> &l, unsigned int peer_id, HRESULT outstanding_op_result, DWORD destroy_player_reason);
void close_everything_now(std::unique_lock<std::mutex> &l);
void close_everything_now(std::unique_lock<std::mutex> &l, HRESULT outstanding_op_result, DWORD destroy_player_reason);
void handle_host_enum_request(std::unique_lock<std::mutex> &l, const PacketDeserialiser &pd, const struct sockaddr_in *from_addr);
void handle_host_connect_request(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);

View File

@ -106,6 +106,24 @@ int create_client_socket(uint32_t local_ipaddr, uint16_t local_port)
return -1;
}
/* Set SO_LINGER so that closesocket() does a hard close, immediately removing the socket
* address from the connection table.
*
* If this isn't done, then we are able to immediately bind() new sockets to the same
* local address (as we may when the port isn't specified), but then outgoing connections
* made from it will fail with WSAEADDRINUSE until the background close completes.
*/
struct linger no_linger;
no_linger.l_onoff = 1;
no_linger.l_linger = 0;
if(setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)(&no_linger), sizeof(no_linger)) == -1)
{
closesocket(sock);
return -1;
}
struct sockaddr_in l_addr;
l_addr.sin_family = AF_INET;
l_addr.sin_addr.s_addr = local_ipaddr;