From c16b73bce43776ae974131ec127f2890126da1b7 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 26 Nov 2023 20:20:13 +0000 Subject: [PATCH] Initial packet coalescing prototype. This introduces "packet coalescing" for the DOSBox transport - when an application is sending an absurd number of tiny IPX packets in rapid succession we try to batch them up into a container packet to be sent through the DOSBox server. This will hopefully improve how certain games behave when using the DOSBox transport option, but will break compatibility with anything not specifically supporting this special message framing. --- Makefile | 2 +- src/coalesce.c | 259 +++++++++++++++++++++++++++++++++++++++++++++++ src/coalesce.h | 59 +++++++++++ src/ipxwrapper.c | 24 +++++ src/ipxwrapper.h | 2 + src/router.c | 99 ++++++++++++++---- src/winsock.c | 10 +- 7 files changed, 429 insertions(+), 26 deletions(-) create mode 100644 src/coalesce.c create mode 100644 src/coalesce.h diff --git a/Makefile b/Makefile index fbba7f3..7e0811d 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ dist: all IPXWRAPPER_OBJS := src/ipxwrapper.o src/winsock.o src/ipxwrapper_stubs.o src/log.o src/common.o \ src/interface.o src/interface2.o src/router.o src/ipxwrapper.def src/addrcache.o src/config.o src/addr.o \ - src/firewall.o src/ethernet.o src/funcprof.o + src/firewall.o src/ethernet.o src/funcprof.o src/coalesce.o ipxwrapper.dll: $(IPXWRAPPER_OBJS) echo 'const char *version_string = "$(VERSION)", *compile_time = "'`date`'";' | $(CC) -c -x c -o version.o - diff --git a/src/coalesce.c b/src/coalesce.c new file mode 100644 index 0000000..5ea0173 --- /dev/null +++ b/src/coalesce.c @@ -0,0 +1,259 @@ +/* IPXWrapper - Packet coalescing + * Copyright (C) 2023 Daniel Collins + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#define WINSOCK_API_LINKAGE + +#include +#include +#include +#include + +#include "coalesce.h" +#include "ethernet.h" +#include "interface.h" +#include "ipxwrapper.h" + +struct coalesce_table_key +{ + addr32_t netnum; + addr48_t nodenum; + uint16_t socket; +}; + +typedef struct coalesce_table_key coalesce_table_key; + +struct coalesce_dest +{ + UT_hash_handle hh; + + struct coalesce_dest *prev; + struct coalesce_dest *next; + + coalesce_table_key dest; + bool active; + + uint64_t send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT]; + + uint64_t payload_timestamp; + unsigned char payload[IPXWRAPPER_COALESCE_PACKET_MAX_SIZE]; + int payload_used; +}; + +typedef struct coalesce_dest coalesce_dest; + +/* coalesce_table provides access to all coalesce_dest structures and should + * be iterated using the 'hh' member. +*/ +static coalesce_dest *coalesce_table = NULL; + +/* coalesce_pending provides access to all active coalesce_dest structures + * which have data waiting to be transmitted, ordered by insertion date from + * oldest to newest. +*/ +static coalesce_dest *coalesce_pending = NULL; + +coalesce_dest *get_coalesce_by_dest(addr32_t netnum, addr48_t nodenum, uint16_t socket) +{ + coalesce_table_key dest = { netnum, nodenum, socket }; + + coalesce_dest *node; + HASH_FIND(hh, coalesce_table, &dest, sizeof(dest), node); + + if(node == NULL) + { + /* TODO: Limit maximum number of nodes, recycle old ones. */ + + node = malloc(sizeof(coalesce_dest)); + if(node == NULL) + { + return NULL; + } + + memset(node, 0, sizeof(*node)); + node->dest = dest; + + HASH_ADD(hh, coalesce_table, dest, sizeof(node->dest), node); + } + + return node; +} + +bool coalesce_register_send(coalesce_dest *node, uint64_t timestamp) +{ + memmove(node->send_timestamps, node->send_timestamps + 1, sizeof(node->send_timestamps) - sizeof(*(node->send_timestamps))); + node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1] = timestamp; + + if(node->send_timestamps[0] < node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1] + && (node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_START_THRESH) >= node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1]) + { + return true; + } + else if(node->active && (node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_STOP_THRESH) < node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1]) + { + return false; + } + else{ + return node->active; + } +} + +bool coalesce_add_data(coalesce_dest *cd, const void *data, int size, uint64_t now) +{ + if(cd->payload_used == 0) + { + novell_ipx_packet header; + + header.checksum = 0xFFFF; + header.hops = 0; + header.type = IPX_MAGIC_COALESCED; + + addr32_out(header.dest_net, cd->dest.netnum); + addr48_out(header.dest_node, cd->dest.nodenum); + header.dest_socket = 0; + + addr32_out(header.src_net, dosbox_local_netnum); + addr48_out(header.src_node, dosbox_local_nodenum); + header.src_socket = 0; + + if((sizeof(header) + size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE) + { + return false; + } + + cd->payload_timestamp = now; + DL_APPEND(coalesce_pending, cd); + + memcpy(cd->payload, &header, sizeof(header)); + cd->payload_used = sizeof(header); + } + + if((cd->payload_used + size) <= IPXWRAPPER_COALESCE_PACKET_MAX_SIZE) + { + memcpy((cd->payload + cd->payload_used), data, size); + cd->payload_used += size; + + return true; + } + else{ + return false; + } +} + +void coalesce_flush(coalesce_dest *cd) +{ + assert(cd->payload_used > 0); + + novell_ipx_packet *header = (novell_ipx_packet*)(cd->payload); + header->length = htons(cd->payload_used); + + log_printf(LOG_DEBUG, "Sending coalesced packet (%d bytes)", cd->payload_used); + + if(r_sendto(private_socket, (const void*)(cd->payload), cd->payload_used, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + { + log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(WSAGetLastError())); + } + + cd->payload_used = 0; + DL_DELETE(coalesce_pending, cd); +} + +DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket) +{ + /* We should always be called with an IPX header, even if the + * application is sending zero-byte payloads. + */ + assert(data_size > 0); + + uint64_t now = get_uticks(); + bool queued = false; + + coalesce_dest *cd = get_coalesce_by_dest(dest_net, dest_node, dest_socket); + if(cd != NULL) + { + bool should_coalesce = coalesce_register_send(cd, now); + + if(should_coalesce && !cd->active) + { + IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket); + log_printf(LOG_WARNING, "High send rate to %s detected, coalescing future packets\n", dest_addr); + + cd->active = true; + } + else if(!should_coalesce && cd->active) + { + IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket); + log_printf(LOG_INFO, "Send rate to %s has dropped, no longer coalescing packets\n", dest_addr); + + cd->active = false; + } + + if( + should_coalesce + && (cd->payload_used + data_size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE + && data_size < (IPXWRAPPER_COALESCE_PACKET_MAX_SIZE / 2)) + { + coalesce_flush(cd); + } + + if(should_coalesce && coalesce_add_data(cd, data, data_size, now)) + { + queued = true; + } + + if(cd->payload_used > 0 && (cd->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now) + { + coalesce_flush(cd); + } + } + + if(!queued && r_sendto(private_socket, (const void*)(data), data_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + { + DWORD error = WSAGetLastError(); + log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error)); + + return error; + } + + return ERROR_SUCCESS; +} + +void coalesce_flush_waiting(void) +{ + uint64_t now = get_uticks(); + + while(coalesce_pending != NULL + && (coalesce_pending->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now) + { + coalesce_flush(coalesce_pending); + } +} + +void coalesce_cleanup(void) +{ + while(coalesce_pending != NULL) + { + coalesce_flush(coalesce_pending); + } + + while(coalesce_table != NULL) + { + coalesce_dest *cd = coalesce_table; + + HASH_DEL(coalesce_table, cd); + free(cd); + } +} diff --git a/src/coalesce.h b/src/coalesce.h new file mode 100644 index 0000000..229f895 --- /dev/null +++ b/src/coalesce.h @@ -0,0 +1,59 @@ +/* IPXWrapper - Packet coalescing + * Copyright (C) 2023 Daniel Collins + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#ifndef IPXWRAPPER_COALESCE_H +#define IPXWRAPPER_COALESCE_H + +#include +#include +#include + +#include "addr.h" + +/* For each destination IPX address, track the timestamp of the past n send + * operations, we use this to determine how spammy the application is being + * with sendto() calls. +*/ +#define IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT 512 + +/* Start coalescing when the rate of send operations to a single IPX address + * reaches IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT packets over the past + * IPXWRAPPER_COALESCE_PACKET_START_THRESH microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_START_THRESH 2500000 /* 2.5s */ + +/* Stop coalescing when the rate of send operations to a single IPX address + * falls back under IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT over the past + * IPXWRAPPER_COALESCE_PACKET_STOP_THRESH microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_STOP_THRESH 10000000 /* 10s */ + +/* Delay the transmission of a packet for coalescing by no more than + * IPXWRAPPER_COALESCE_PACKET_MAX_DELAY microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_MAX_DELAY 20000 /* 20ms */ + +/* Combine outgoing IPX packets up to IPXWRAPPER_COALESCE_PACKET_MAX_SIZE + * bytes of data in the UDP payload. +*/ +#define IPXWRAPPER_COALESCE_PACKET_MAX_SIZE 1384 + +DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket); +void coalesce_flush_waiting(void); +void coalesce_cleanup(void); + +#endif /* !IPXWRAPPER_COALESCE_H */ diff --git a/src/ipxwrapper.c b/src/ipxwrapper.c index 11070c0..55745bf 100644 --- a/src/ipxwrapper.c +++ b/src/ipxwrapper.c @@ -57,6 +57,8 @@ struct FuncStats ipxwrapper_fstats[] = { #undef FPROF_DECL }; +static uint64_t perf_counter_freq = 0; + const unsigned int ipxwrapper_fstats_size = sizeof(ipxwrapper_fstats) / sizeof(*ipxwrapper_fstats); unsigned int send_packets = 0, send_bytes = 0; /* Sent from emulated socket */ @@ -104,6 +106,12 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) { if(fdwReason == DLL_PROCESS_ATTACH) { + LARGE_INTEGER pc_freq; + if(QueryPerformanceFrequency(&pc_freq)) + { + perf_counter_freq = pc_freq.QuadPart; + } + fprof_init(stub_fstats, NUM_STUBS); fprof_init(ipxwrapper_fstats, ipxwrapper_fstats_size); @@ -308,3 +316,19 @@ uint64_t get_ticks(void) return GetTickCount(); } } + +uint64_t get_uticks(void) +{ + LARGE_INTEGER pc_tick; + + if(perf_counter_freq == 0 || !QueryPerformanceCounter(&pc_tick)) + { + /* Fall back to GetTickCount() if there is no high-resolution + * performance counter available. + */ + return get_ticks() * 1000; + } + else{ + return pc_tick.QuadPart / (perf_counter_freq / 1000000); + } +} diff --git a/src/ipxwrapper.h b/src/ipxwrapper.h index 1b7e17c..8c064c3 100644 --- a/src/ipxwrapper.h +++ b/src/ipxwrapper.h @@ -150,6 +150,7 @@ struct ipx_packet { } __attribute__((__packed__)); #define IPX_MAGIC_SPXLOOKUP 1 +#define IPX_MAGIC_COALESCED 2 typedef struct spxlookup_req spxlookup_req_t; @@ -205,6 +206,7 @@ ipx_socket *get_socket_wait_for_ready(SOCKET sockfd, int timeout_ms); void lock_sockets(void); void unlock_sockets(void); uint64_t get_ticks(void); +uint64_t get_uticks(void); void add_self_to_firewall(void); diff --git a/src/router.c b/src/router.c index 53c660b..696f1f9 100644 --- a/src/router.c +++ b/src/router.c @@ -25,6 +25,7 @@ #include #include "router.h" +#include "coalesce.h" #include "common.h" #include "funcprof.h" #include "ipxwrapper.h" @@ -197,6 +198,8 @@ void router_cleanup(void) /* Release resources. */ + coalesce_cleanup(); + if(private_socket != -1) { closesocket(private_socket); @@ -554,29 +557,85 @@ static void _handle_dosbox_recv(novell_ipx_packet *packet, size_t packet_size) return; } - if(min_log_level <= LOG_DEBUG) + if(packet->src_socket == 0 && packet->type == IPX_MAGIC_COALESCED) { - IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); - IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket); + /* Sanity check the lengths of each inner packet. */ - log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + log_printf(LOG_DEBUG, "Recieved coalesced packet (%zu bytes)", packet_size); + + novell_ipx_packet *inner_packets = (novell_ipx_packet*)(packet->data); + size_t remaining_data = packet_size - sizeof(novell_ipx_packet); + + novell_ipx_packet *end = (novell_ipx_packet*)(packet->data + remaining_data); + + for(novell_ipx_packet *p = inner_packets; p < end;) + { + if(remaining_data < sizeof(novell_ipx_packet) || ntohs(p->length) > remaining_data) + { + /* Doesn't look valid. */ + log_printf(LOG_ERROR, "Recieved invalid IPX packet from DOSBox server, ignoring"); + return; + } + + p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length)); + } + + /* Deliver the inner packets. */ + + for(novell_ipx_packet *p = inner_packets; p < end;) + { + if(min_log_level <= LOG_DEBUG) + { + IPX_STRING_ADDR(src_addr, addr32_in(p->src_net), addr48_in(p->src_node), p->src_socket); + IPX_STRING_ADDR(dest_addr, addr32_in(p->dest_net), addr48_in(p->dest_node), p->dest_socket); + + log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + } + + size_t data_size = ntohs(p->length) - sizeof(novell_ipx_packet); + + _deliver_packet( + p->type, + + addr32_in(p->src_net), + addr48_in(p->src_node), + p->src_socket, + + addr32_in(p->dest_net), + addr48_in(p->dest_node), + p->dest_socket, + + p->data, + data_size); + + p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length)); + } } - - size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet); - - _deliver_packet( - packet->type, + else{ + if(min_log_level <= LOG_DEBUG) + { + IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); + IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket); + + log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + } - addr32_in(packet->src_net), - addr48_in(packet->src_node), - packet->src_socket, + size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet); - addr32_in(packet->dest_net), - addr48_in(packet->dest_node), - packet->dest_socket, - - packet->data, - data_size); + _deliver_packet( + packet->type, + + addr32_in(packet->src_net), + addr48_in(packet->src_node), + packet->src_socket, + + addr32_in(packet->dest_net), + addr48_in(packet->dest_node), + packet->dest_socket, + + packet->data, + data_size); + } } static bool _do_udp_recv(int fd) @@ -766,6 +825,10 @@ static DWORD router_main(void *arg) if(ipx_encap_type == ENCAP_TYPE_DOSBOX) { + lock_sockets(); + coalesce_flush_waiting(); + unlock_sockets(); + if(dosbox_state == DOSBOX_DISCONNECTED) { uint64_t now = get_ticks(); diff --git a/src/winsock.c b/src/winsock.c index 3bb3fac..6f22f65 100644 --- a/src/winsock.c +++ b/src/winsock.c @@ -25,6 +25,7 @@ #include #include "ipxwrapper.h" +#include "coalesce.h" #include "common.h" #include "interface.h" #include "router.h" @@ -1527,14 +1528,9 @@ static DWORD ipx_send_packet( memcpy(packet->data, data, data_size); - DWORD error = ERROR_SUCCESS; - - if(r_sendto(private_socket, (const void*)(packet), packet_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + DWORD error = coalesce_send(packet, packet_size, dest_net, dest_node, dest_socket); + if(error == ERROR_SUCCESS) { - error = WSAGetLastError(); - log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error)); - } - else{ __atomic_add_fetch(&send_packets, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&send_bytes, data_size, __ATOMIC_RELAXED); }