1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Don't poll for TCP recv() events when processing one.

Clearing FD_RECV and FD_CLOSE from the event flags when processing a
recv() event will prevent the other worker threads associated with that
socket from waking up repeatedly, checking recv_busy, then sleeping
without having done anything, wasting time and potentially preventing
them from servicing their other sockets.
This commit is contained in:
Daniel Collins 2018-10-06 14:07:51 +01:00
parent f000152a7a
commit fffbfe3ce4
4 changed files with 76 additions and 16 deletions

View File

@ -2014,22 +2014,31 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
while((peer = get_peer_by_peer_id(peer_id)) != NULL)
{
if(!rb_claimed && peer->recv_busy)
if(!rb_claimed)
{
/* Another thread is already processing data from this socket.
*
* Only one thread at a time is allowed to handle reads, even when the
* other thread is in the application callback, so that the order of
* messages is preserved.
*/
return;
if(peer->recv_busy)
{
/* Another thread is already processing data from this socket.
*
* Only one thread at a time is allowed to handle reads, even when the
* other thread is in the application callback, so that the order of
* messages is preserved.
*/
return;
}
else{
/* No other thread is processing data from this peer, we shall
* claim the throne and temporarily disable FD_READ events from it
* to avoid other workers spinning against the recv_busy lock.
*/
peer->recv_busy = true;
rb_claimed = true;
peer->disable_events(FD_READ | FD_CLOSE);
}
}
peer->recv_busy = true;
rb_claimed = true;
/* TODO: Mask read events to avoid workers spinning. */
int r = recv(peer->sock, (char*)(peer->recv_buf) + peer->recv_buf_cur, sizeof(peer->recv_buf) - peer->recv_buf_cur, 0);
if(r == 0)
{
@ -2179,6 +2188,7 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
if(peer != NULL)
{
peer->enable_events(FD_READ | FD_CLOSE);
peer->recv_busy = false;
}
}
@ -2220,7 +2230,7 @@ void DirectPlay8Peer::peer_accept(std::unique_lock<std::mutex> &l)
unsigned int peer_id = next_peer_id++;
Peer *peer = new Peer(Peer::PS_ACCEPTED, newfd, addr.sin_addr.s_addr, ntohs(addr.sin_port));
if(WSAEventSelect(peer->sock, peer->event, FD_READ | FD_WRITE | FD_CLOSE) != 0)
if(!peer->enable_events(FD_READ | FD_WRITE | FD_CLOSE))
{
log_printf("WSAEventSelect() failed, dropping peer");
@ -2248,7 +2258,7 @@ bool DirectPlay8Peer::peer_connect(Peer::PeerState initial_state, uint32_t remot
peer->player_id = player_id;
if(WSAEventSelect(peer->sock, peer->event, FD_CONNECT | FD_READ | FD_WRITE | FD_CLOSE) != 0)
if(!peer->enable_events(FD_CONNECT | FD_READ | FD_WRITE | FD_CLOSE))
{
closesocket(peer->sock);
delete peer;
@ -3306,9 +3316,39 @@ void DirectPlay8Peer::connect_fail(std::unique_lock<std::mutex> &l, HRESULT hRes
}
DirectPlay8Peer::Peer::Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port):
state(state), sock(sock), ip(ip), port(port), recv_busy(false), recv_buf_cur(0), sq(event), next_ack_id(1)
state(state), sock(sock), ip(ip), port(port), recv_busy(false), recv_buf_cur(0), events(0), sq(event), next_ack_id(1)
{}
bool DirectPlay8Peer::Peer::enable_events(long events)
{
if(WSAEventSelect(sock, event, (this->events | events)) != 0)
{
DWORD err = WSAGetLastError();
log_printf("WSAEventSelect() error: ", win_strerror(err).c_str());
return false;
}
this->events |= events;
return true;
}
bool DirectPlay8Peer::Peer::disable_events(long events)
{
if(WSAEventSelect(sock, event, (this->events & ~events)) != 0)
{
DWORD err = WSAGetLastError();
log_printf("WSAEventSelect() error: ", win_strerror(err).c_str());
return false;
}
this->events &= ~events;
return true;
}
DWORD DirectPlay8Peer::Peer::alloc_ack_id()
{
DWORD id = next_ack_id++;

View File

@ -121,6 +121,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer
size_t recv_buf_cur;
EventObject event;
long events;
SendQueue sq;
@ -133,6 +134,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer
Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port);
bool enable_events(long events);
bool disable_events(long events);
DWORD alloc_ack_id();
void register_ack(DWORD id, const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback);
};

View File

@ -82,3 +82,15 @@ void log_printf(const char *fmt, ...)
fprintf(log_fh, "\n");
}
}
/* Convert a windows error number to an error message */
std::string win_strerror(DWORD errnum)
{
char buf[256];
memset(buf, '\0', sizeof(buf));
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, buf, (sizeof(buf) - 1), NULL);
buf[strcspn(buf, "\r\n")] = '\0';
return buf;
}

View File

@ -1,10 +1,14 @@
#ifndef DPLITE_LOG_HPP
#define DPLITE_LOG_HPP
#include <string>
void log_init();
void log_fini();
bool log_trace_enabled();
void log_printf(const char *fmt, ...);
std::string win_strerror(DWORD errnum);
#endif /* !DPLITE_LOG_HPP */