diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index 2b04819..d4f44ea 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -2014,22 +2014,31 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int while((peer = get_peer_by_peer_id(peer_id)) != NULL) { - if(!rb_claimed && peer->recv_busy) + if(!rb_claimed) { - /* Another thread is already processing data from this socket. - * - * Only one thread at a time is allowed to handle reads, even when the - * other thread is in the application callback, so that the order of - * messages is preserved. - */ - return; + if(peer->recv_busy) + { + /* Another thread is already processing data from this socket. + * + * Only one thread at a time is allowed to handle reads, even when the + * other thread is in the application callback, so that the order of + * messages is preserved. + */ + return; + } + else{ + /* No other thread is processing data from this peer, we shall + * claim the throne and temporarily disable FD_READ events from it + * to avoid other workers spinning against the recv_busy lock. + */ + + peer->recv_busy = true; + rb_claimed = true; + + peer->disable_events(FD_READ | FD_CLOSE); + } } - peer->recv_busy = true; - rb_claimed = true; - - /* TODO: Mask read events to avoid workers spinning. */ - int r = recv(peer->sock, (char*)(peer->recv_buf) + peer->recv_buf_cur, sizeof(peer->recv_buf) - peer->recv_buf_cur, 0); if(r == 0) { @@ -2179,6 +2188,7 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int if(peer != NULL) { + peer->enable_events(FD_READ | FD_CLOSE); peer->recv_busy = false; } } @@ -2220,7 +2230,7 @@ void DirectPlay8Peer::peer_accept(std::unique_lock &l) unsigned int peer_id = next_peer_id++; Peer *peer = new Peer(Peer::PS_ACCEPTED, newfd, addr.sin_addr.s_addr, ntohs(addr.sin_port)); - if(WSAEventSelect(peer->sock, peer->event, FD_READ | FD_WRITE | FD_CLOSE) != 0) + if(!peer->enable_events(FD_READ | FD_WRITE | FD_CLOSE)) { log_printf("WSAEventSelect() failed, dropping peer"); @@ -2248,7 +2258,7 @@ bool DirectPlay8Peer::peer_connect(Peer::PeerState initial_state, uint32_t remot peer->player_id = player_id; - if(WSAEventSelect(peer->sock, peer->event, FD_CONNECT | FD_READ | FD_WRITE | FD_CLOSE) != 0) + if(!peer->enable_events(FD_CONNECT | FD_READ | FD_WRITE | FD_CLOSE)) { closesocket(peer->sock); delete peer; @@ -3306,9 +3316,39 @@ void DirectPlay8Peer::connect_fail(std::unique_lock &l, HRESULT hRes } DirectPlay8Peer::Peer::Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port): - state(state), sock(sock), ip(ip), port(port), recv_busy(false), recv_buf_cur(0), sq(event), next_ack_id(1) + state(state), sock(sock), ip(ip), port(port), recv_busy(false), recv_buf_cur(0), events(0), sq(event), next_ack_id(1) {} +bool DirectPlay8Peer::Peer::enable_events(long events) +{ + if(WSAEventSelect(sock, event, (this->events | events)) != 0) + { + DWORD err = WSAGetLastError(); + log_printf("WSAEventSelect() error: ", win_strerror(err).c_str()); + + return false; + } + + this->events |= events; + + return true; +} + +bool DirectPlay8Peer::Peer::disable_events(long events) +{ + if(WSAEventSelect(sock, event, (this->events & ~events)) != 0) + { + DWORD err = WSAGetLastError(); + log_printf("WSAEventSelect() error: ", win_strerror(err).c_str()); + + return false; + } + + this->events &= ~events; + + return true; +} + DWORD DirectPlay8Peer::Peer::alloc_ack_id() { DWORD id = next_ack_id++; diff --git a/src/DirectPlay8Peer.hpp b/src/DirectPlay8Peer.hpp index 5def1fd..8c7b572 100644 --- a/src/DirectPlay8Peer.hpp +++ b/src/DirectPlay8Peer.hpp @@ -121,6 +121,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer size_t recv_buf_cur; EventObject event; + long events; SendQueue sq; @@ -133,6 +134,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port); + bool enable_events(long events); + bool disable_events(long events); + DWORD alloc_ack_id(); void register_ack(DWORD id, const std::function&, HRESULT)> &callback); }; diff --git a/src/Log.cpp b/src/Log.cpp index 2202700..9c57213 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -82,3 +82,15 @@ void log_printf(const char *fmt, ...) fprintf(log_fh, "\n"); } } + +/* Convert a windows error number to an error message */ +std::string win_strerror(DWORD errnum) +{ + char buf[256]; + memset(buf, '\0', sizeof(buf)); + + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, buf, (sizeof(buf) - 1), NULL); + buf[strcspn(buf, "\r\n")] = '\0'; + + return buf; +} diff --git a/src/Log.hpp b/src/Log.hpp index f6cc6a2..3ac6548 100644 --- a/src/Log.hpp +++ b/src/Log.hpp @@ -1,10 +1,14 @@ #ifndef DPLITE_LOG_HPP #define DPLITE_LOG_HPP +#include + void log_init(); void log_fini(); bool log_trace_enabled(); void log_printf(const char *fmt, ...); +std::string win_strerror(DWORD errnum); + #endif /* !DPLITE_LOG_HPP */