From b851354647da0465cdb415939b4535207459251d Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 21 Nov 2023 20:24:56 +0000 Subject: [PATCH] WIP: Use overlapped I/O for sending packets. This was an experiment to see if using overlapped WSASendTo() would improve performance over vanilla sendto() when sending an obscene number of tiny packets like Atomic Bomberman does (#14). Turns out this actually makes performance *worse*. The initial WSASendTo() calls take even longer than the sendto() calls did, not counting any time spent afterwards dispatching the completion callback and cleaning up. Back to the drawing board. --- Makefile | 2 +- src/ipxwrapper.c | 29 +++ src/ipxwrapper.h | 4 + src/ipxwrapper_stubs.txt | 1 + src/router.c | 3 +- src/sender.c | 424 +++++++++++++++++++++++++++++++++++++++ src/sender.h | 24 +++ src/winsock.c | 3 +- 8 files changed, 487 insertions(+), 3 deletions(-) create mode 100644 src/sender.c create mode 100644 src/sender.h diff --git a/Makefile b/Makefile index fbba7f3..5650478 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ dist: all IPXWRAPPER_OBJS := src/ipxwrapper.o src/winsock.o src/ipxwrapper_stubs.o src/log.o src/common.o \ src/interface.o src/interface2.o src/router.o src/ipxwrapper.def src/addrcache.o src/config.o src/addr.o \ - src/firewall.o src/ethernet.o src/funcprof.o + src/firewall.o src/ethernet.o src/funcprof.o src/sender.o ipxwrapper.dll: $(IPXWRAPPER_OBJS) echo 'const char *version_string = "$(VERSION)", *compile_time = "'`date`'";' | $(CC) -c -x c -o version.o - diff --git a/src/ipxwrapper.c b/src/ipxwrapper.c index 11070c0..07fe3bd 100644 --- a/src/ipxwrapper.c +++ b/src/ipxwrapper.c @@ -62,6 +62,11 @@ const unsigned int ipxwrapper_fstats_size = sizeof(ipxwrapper_fstats) / sizeof(* unsigned int send_packets = 0, send_bytes = 0; /* Sent from emulated socket */ unsigned int recv_packets = 0, recv_bytes = 0; /* Forwarded to emulated socket */ +#define NUM_SEND_THREADS 4 + +SenderQueue *send_queue = NULL; +static SenderThread *send_threads[NUM_SEND_THREADS] = { NULL }; + static void init_cs(CRITICAL_SECTION *cs) { if(!InitializeCriticalSectionAndSpinCount(cs, 0x80000000)) @@ -147,6 +152,21 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) return FALSE; } + send_queue = SenderQueue_new(); + if(send_queue == NULL) + { + abort(); + } + + for(int i = 0; i < NUM_SEND_THREADS; ++i) + { + send_threads[i] = SenderThread_new(send_queue); + if(send_threads[i] == NULL) + { + abort(); + } + } + router_init(); if(main_config.profile) @@ -208,6 +228,15 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) router_cleanup(); + for(int i = 0; i < NUM_SEND_THREADS; ++i) + { + SenderThread_destroy(send_threads[i]); + send_threads[i] = NULL; + } + + SenderQueue_destroy(send_queue); + send_queue = NULL; + WSACleanup(); DeleteCriticalSection(&sockets_cs); diff --git a/src/ipxwrapper.h b/src/ipxwrapper.h index 1b7e17c..46e2a82 100644 --- a/src/ipxwrapper.h +++ b/src/ipxwrapper.h @@ -29,6 +29,7 @@ #include "config.h" #include "funcprof.h" #include "router.h" +#include "sender.h" /* The standard Windows driver (in XP) only allows 1467 bytes anyway */ #define MAX_DATA_SIZE 8192 @@ -200,6 +201,8 @@ enum { extern unsigned int send_packets, send_bytes; /* Sent from emulated socket */ extern unsigned int recv_packets, recv_bytes; /* Forwarded to emulated socket */ +extern SenderQueue *send_queue; + ipx_socket *get_socket(SOCKET sockfd); ipx_socket *get_socket_wait_for_ready(SOCKET sockfd, int timeout_ms); void lock_sockets(void); @@ -228,6 +231,7 @@ int PASCAL r_getpeername(SOCKET fd, struct sockaddr *addr, int *addrlen); int PASCAL r_listen(SOCKET s, int backlog); SOCKET PASCAL r_accept(SOCKET s, struct sockaddr *addr, int *addrlen); int PASCAL r_WSAAsyncSelect(SOCKET s, HWND hWnd, unsigned int wMsg, long lEvent); +int PASCAL r_WSASendTo(SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, DWORD dwFlags, const struct sockaddr FAR *lpTo, int iToLen, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine); int WSAAPI r_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const PTIMEVAL timeout); #endif /* !IPXWRAPPER_H */ diff --git a/src/ipxwrapper_stubs.txt b/src/ipxwrapper_stubs.txt index 9471f51..52bc097 100644 --- a/src/ipxwrapper_stubs.txt +++ b/src/ipxwrapper_stubs.txt @@ -37,6 +37,7 @@ r_getpeername ws2_32.dll getpeername 12 inet_ntoa ws2_32.dll inet_ntoa 4 __WSAFDIsSet ws2_32.dll __WSAFDIsSet 8 r_WSAAsyncSelect ws2_32.dll WSAAsyncSelect 16 +r_WSASendTo ws2_32.dll WSASendTo 36 gethostbyname ws2_32.dll gethostbyname 4 pcap_open wpcap.dll pcap_open diff --git a/src/router.c b/src/router.c index 53c660b..0ef12c5 100644 --- a/src/router.c +++ b/src/router.c @@ -343,7 +343,8 @@ static void _deliver_packet( send_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); send_addr.sin_port = sock->port; - if(sendto(private_socket, (void*)(packet), packet_size, 0, (struct sockaddr*)(&send_addr), sizeof(send_addr)) == -1) + // if(sendto(private_socket, (void*)(packet), packet_size, 0, (struct sockaddr*)(&send_addr), sizeof(send_addr)) == -1) + if(!SenderQueue_send(send_queue, private_socket, NULL, 0, packet, packet_size, (struct sockaddr*)(&send_addr), sizeof(send_addr))) { log_printf(LOG_ERROR, "Error relaying packet: %s", w32_error(WSAGetLastError())); } diff --git a/src/sender.c b/src/sender.c new file mode 100644 index 0000000..58eec17 --- /dev/null +++ b/src/sender.c @@ -0,0 +1,424 @@ +#define WINSOCK_API_LINKAGE + +#include +#include +#include +#include + +#include "common.h" +#include "ipxwrapper.h" +#include "sender.h" + +#define SENDER_MAX_BUFFERS 32 + +#define SENDER_MAX_HEADER_SIZE 32 +#define SENDER_MAX_BUFFER_SIZE 65536 +#define SENDER_MAX_ADDR_SIZE sizeof(struct sockaddr_storage) + +struct SenderBuffer +{ + unsigned char header_data[SENDER_MAX_HEADER_SIZE]; + unsigned int header_size; + + unsigned char *data; + unsigned int data_buffer_size; + unsigned int data_size; + + int sock; + unsigned char addr[SENDER_MAX_ADDR_SIZE]; + unsigned int addrlen; + + WSABUF sendbufs[2]; + unsigned int num_sendbufs; + + WSAOVERLAPPED overlapped; + SenderQueue *queue; + + struct SenderBuffer *prev; + struct SenderBuffer *next; +}; + +typedef struct SenderBuffer SenderBuffer; + +struct SenderQueue +{ + CRITICAL_SECTION lock; + + unsigned int total_buffers; + + SenderBuffer *free_buffers; + HANDLE free_buffer_event; + + SenderBuffer *queued_buffers; + HANDLE queued_buffer_event; +}; + +struct SenderThread +{ + SenderQueue *queue; + HANDLE stop_thread_event; + + HANDLE thread; +}; + +static SenderBuffer *_SenderQueue_get_free_buffer(SenderQueue *queue); +static void _SenderQueue_put_free_buffer(SenderQueue *queue, SenderBuffer *buffer); +static SenderBuffer *_SenderQueue_get_queued_buffer(SenderQueue *queue); +static void _SenderQueue_put_queued_buffer(SenderQueue *queue, SenderBuffer *buffer); + +static DWORD WINAPI _SenderThread_main(LPVOID lpParameter); +static void WINAPI _SenderThread_send_completion(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped, DWORD dwFlags); + +SenderQueue *SenderQueue_new(void) +{ + SenderQueue *self = malloc(sizeof(SenderQueue)); + if(self == NULL) + { + log_printf(LOG_ERROR, "Unable to allocate SenderQueue structure"); + return NULL; + } + + self->total_buffers = 0; + self->free_buffers = NULL; + self->queued_buffers = NULL; + + if(!InitializeCriticalSectionAndSpinCount(&(self->lock), 0x80000000)) + { + log_printf(LOG_ERROR, "Failed to initialise critical section: %s", w32_error(GetLastError())); + + free(self); + return NULL; + } + + self->free_buffer_event = CreateEvent(NULL, FALSE, FALSE, NULL); + if(self->free_buffer_event == NULL) + { + log_printf(LOG_ERROR, + "Unable to create free_buffer_event event object: %s", + w32_error(GetLastError())); + + DeleteCriticalSection(&(self->lock)); + free(self); + return NULL; + } + + self->queued_buffer_event = CreateEvent(NULL, FALSE, FALSE, NULL); + if(self->queued_buffer_event == NULL) + { + log_printf(LOG_ERROR, + "Unable to create queued_buffer_event event object: %s", + w32_error(GetLastError())); + + CloseHandle(self->free_buffer_event); + DeleteCriticalSection(&(self->lock)); + free(self); + return NULL; + } + + return self; +} + +void SenderQueue_destroy(SenderQueue *self) +{ + if(self == NULL) + { + return; + } + + while(self->free_buffers != NULL) + { + SenderBuffer *buffer = self->free_buffers; + + DL_DELETE(self->free_buffers, buffer); + free(buffer); + } + + while(self->queued_buffers != NULL) + { + SenderBuffer *buffer = self->queued_buffers; + + DL_DELETE(self->queued_buffers, buffer); + free(buffer); + } + + CloseHandle(self->queued_buffer_event); + CloseHandle(self->free_buffer_event); + DeleteCriticalSection(&(self->lock)); + free(self); +} + +bool SenderQueue_send(struct SenderQueue *queue, int sock, const void *header_data, unsigned int header_size, const void *data, unsigned int data_size, const struct sockaddr *addr, int addrlen) +{ + assert(header_size <= SENDER_MAX_HEADER_SIZE); + assert(data_size <= SENDER_MAX_BUFFER_SIZE); + assert(addrlen <= SENDER_MAX_ADDR_SIZE); + + SenderBuffer *buffer = _SenderQueue_get_free_buffer(queue); + if(!buffer) + { + return false; + } + + if(buffer->data_buffer_size < data_size) + { + void *newbuf = realloc(buffer->data, data_size); + if(newbuf == NULL) + { + log_printf(LOG_ERROR, "Unable to allocate %u byte data buffer", data_size); + + _SenderQueue_put_free_buffer(queue, buffer); + return false; + } + + buffer->data = newbuf; + buffer->data_buffer_size = data_size; + } + + buffer->sock = sock; + + memcpy(buffer->header_data, header_data, header_size); + buffer->header_size = header_size; + + memcpy(buffer->data, data, data_size); + buffer->data_size = data_size; + + memcpy(buffer->addr, addr, addrlen); + buffer->addrlen = addrlen; + + buffer->num_sendbufs = 0; + + if(buffer->header_size > 0) + { + buffer->sendbufs[buffer->num_sendbufs].buf = (char*)(buffer->header_data); + buffer->sendbufs[buffer->num_sendbufs].len = buffer->header_size; + + ++(buffer->num_sendbufs); + } + + buffer->sendbufs[buffer->num_sendbufs].buf = (char*)(buffer->data); + buffer->sendbufs[buffer->num_sendbufs].len = buffer->data_size; + ++(buffer->num_sendbufs); + + _SenderQueue_put_queued_buffer(queue, buffer); + + return true; +} + +static SenderBuffer *_SenderQueue_get_free_buffer(SenderQueue *self) +{ + SenderBuffer *buffer = NULL; + + while(buffer == NULL) + { + EnterCriticalSection(&(self->lock)); + + if(self->free_buffers != NULL) + { + buffer = self->free_buffers; + DL_DELETE(self->free_buffers, buffer); + } + else if(self->total_buffers < SENDER_MAX_BUFFERS) + { + /* Allocate a new buffer. */ + + buffer = malloc(sizeof(SenderBuffer)); + if(buffer == NULL) + { + LeaveCriticalSection(&(self->lock)); + return NULL; + } + + buffer->data = NULL; + buffer->data_buffer_size = 0; + + buffer->queue = self; + + buffer->prev = NULL; + buffer->next = NULL; + + ++(self->total_buffers); + } + + LeaveCriticalSection(&(self->lock)); + + if(buffer == NULL) + { + /* wait for buffer to be available */ + WaitForSingleObject(self->free_buffer_event, INFINITE); + } + } + + return buffer; +} + +static void _SenderQueue_put_free_buffer(SenderQueue *queue, SenderBuffer *buffer) +{ + EnterCriticalSection(&(queue->lock)); + DL_APPEND(queue->free_buffers, buffer); + LeaveCriticalSection(&(queue->lock)); + + SetEvent(queue->free_buffer_event); +} + +static SenderBuffer *_SenderQueue_get_queued_buffer(SenderQueue *queue) +{ + EnterCriticalSection(&(queue->lock)); + + SenderBuffer *buffer = NULL; + + if(queue->queued_buffers != NULL) + { + buffer = queue->queued_buffers; + DL_DELETE(queue->queued_buffers, buffer); + } + + LeaveCriticalSection(&(queue->lock)); + + return buffer; +} + +static void _SenderQueue_put_queued_buffer(SenderQueue *queue, SenderBuffer *buffer) +{ + EnterCriticalSection(&(queue->lock)); + DL_APPEND(queue->queued_buffers, buffer); + LeaveCriticalSection(&(queue->lock)); + + SetEvent(queue->queued_buffer_event); +} + +SenderThread *SenderThread_new(SenderQueue *queue) +{ + SenderThread *self = malloc(sizeof(SenderThread)); + if(self == NULL) + { + log_printf(LOG_ERROR, "Unable to allocate SenderThread structure\n"); + return NULL; + } + + self->queue = queue; + + self->stop_thread_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if(self->stop_thread_event == NULL) + { + log_printf(LOG_ERROR, + "Unable to create stop_thread_event event object: %s", + w32_error(GetLastError())); + + free(self); + return NULL; + } + + self->thread = CreateThread( + NULL, /* lpThreadAttributes */ + 0, /* dwStackSize */ + &_SenderThread_main, /* lpStartAddress */ + self, /* lpParameter */ + 0, /* dwCreationFlags */ + NULL); /* lpThreadId */ + + if(self->thread == NULL) + { + log_printf(LOG_ERROR, + "Unable to create SenderThread thread: %s", + w32_error(GetLastError())); + + CloseHandle(self->stop_thread_event); + free(self); + + return NULL; + } + + return self; +} + +void SenderThread_destroy(SenderThread *self) +{ + if(self == NULL) + { + return; + } + + SetEvent(self->stop_thread_event); + WaitForSingleObject(self->thread, INFINITE); + + CloseHandle(self->thread); + CloseHandle(self->stop_thread_event); + free(self); +} + +static DWORD WINAPI _SenderThread_main(LPVOID lpParameter) +{ + SenderThread *self = (SenderThread*)(lpParameter); + + while(true) + { + HANDLE handles[] = { + self->stop_thread_event, + self->queue->queued_buffer_event, + }; + + //DWORD result = WaitForMultipleObjects(2, handles, FALSE, INFINITE); + DWORD result = WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, TRUE); + + if(result == WAIT_OBJECT_0) + { + break; + } + else if(result == (WAIT_OBJECT_0 + 1)) + { + log_printf(LOG_DEBUG, "_SenderThread_main woke to send packet"); + + SenderBuffer *buffer; + + while((buffer = _SenderQueue_get_queued_buffer(self->queue)) != NULL) + { + log_printf(LOG_DEBUG, "_SenderThread_main sending a packet"); + + /* TODO: Investigate using overlapped I/O */ + + memset(&(buffer->overlapped), 0, sizeof(buffer->overlapped)); + buffer->overlapped.hEvent = buffer; + + DWORD bytes_sent; + int send_result = r_WSASendTo(buffer->sock, buffer->sendbufs, buffer->num_sendbufs, &bytes_sent, 0, (struct sockaddr*)(buffer->addr), buffer->addrlen, &(buffer->overlapped), &_SenderThread_send_completion); + + if(send_result != 0) + { + DWORD error = WSAGetLastError(); + + if(error == WSA_IO_PENDING) + { + /* Send operation is running asyncronously. */ + continue; + } + else{ + log_printf(LOG_ERROR, "WSASendTo failed (error code %u)", (unsigned)(WSAGetLastError())); + } + } + + _SenderQueue_put_free_buffer(self->queue, buffer); + } + } + else if(result == WAIT_IO_COMPLETION) + { + /* no-op */ + } + else{ + log_printf(LOG_ERROR, "_SenderThread_main result %u GetLastError %u", (unsigned)(result), (unsigned)(GetLastError())); + } + } + + return 0; +} + +static void WINAPI _SenderThread_send_completion(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped, DWORD dwFlags) +{ + SenderBuffer *buffer = (SenderBuffer*)(lpOverlapped->hEvent); + + if(dwErrorCode != 0) + { + log_printf(LOG_ERROR, "WSASendTo failed (error code %u)", (unsigned)(dwErrorCode)); + } + + _SenderQueue_put_free_buffer(buffer->queue, buffer); +} diff --git a/src/sender.h b/src/sender.h new file mode 100644 index 0000000..2b0c81f --- /dev/null +++ b/src/sender.h @@ -0,0 +1,24 @@ +#ifndef IPXWRAPPER_SENDER_H +#define IPXWRAPPER_SENDER_H + +#include +#include + +struct SenderQueue; +typedef struct SenderQueue SenderQueue; + +struct SenderThread; +typedef struct SenderThread SenderThread; + +SenderQueue *SenderQueue_new(void); +void SenderQueue_destroy(SenderQueue *self); + +bool SenderQueue_send(SenderQueue *queue, int sock, + const void *header_data, unsigned int header_size, + const void *data, unsigned int data_size, + const struct sockaddr *addr, int addrlen); + +SenderThread *SenderThread_new(SenderQueue *queue); +void SenderThread_destroy(SenderThread *self); + +#endif /* !IPXWRAPPER_SENDER_H */ diff --git a/src/winsock.c b/src/winsock.c index 3bb3fac..dd3c88b 100644 --- a/src/winsock.c +++ b/src/winsock.c @@ -1529,7 +1529,8 @@ static DWORD ipx_send_packet( DWORD error = ERROR_SUCCESS; - if(r_sendto(private_socket, (const void*)(packet), packet_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + // if(r_sendto(private_socket, (const void*)(packet), packet_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + if(!SenderQueue_send(send_queue, private_socket, NULL, 0, packet, packet_size, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr))) { error = WSAGetLastError(); log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error));