diff --git a/Makefile b/Makefile index fbba7f3..7e0811d 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ dist: all IPXWRAPPER_OBJS := src/ipxwrapper.o src/winsock.o src/ipxwrapper_stubs.o src/log.o src/common.o \ src/interface.o src/interface2.o src/router.o src/ipxwrapper.def src/addrcache.o src/config.o src/addr.o \ - src/firewall.o src/ethernet.o src/funcprof.o + src/firewall.o src/ethernet.o src/funcprof.o src/coalesce.o ipxwrapper.dll: $(IPXWRAPPER_OBJS) echo 'const char *version_string = "$(VERSION)", *compile_time = "'`date`'";' | $(CC) -c -x c -o version.o - diff --git a/src/coalesce.c b/src/coalesce.c new file mode 100644 index 0000000..5ea0173 --- /dev/null +++ b/src/coalesce.c @@ -0,0 +1,259 @@ +/* IPXWrapper - Packet coalescing + * Copyright (C) 2023 Daniel Collins + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#define WINSOCK_API_LINKAGE + +#include +#include +#include +#include + +#include "coalesce.h" +#include "ethernet.h" +#include "interface.h" +#include "ipxwrapper.h" + +struct coalesce_table_key +{ + addr32_t netnum; + addr48_t nodenum; + uint16_t socket; +}; + +typedef struct coalesce_table_key coalesce_table_key; + +struct coalesce_dest +{ + UT_hash_handle hh; + + struct coalesce_dest *prev; + struct coalesce_dest *next; + + coalesce_table_key dest; + bool active; + + uint64_t send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT]; + + uint64_t payload_timestamp; + unsigned char payload[IPXWRAPPER_COALESCE_PACKET_MAX_SIZE]; + int payload_used; +}; + +typedef struct coalesce_dest coalesce_dest; + +/* coalesce_table provides access to all coalesce_dest structures and should + * be iterated using the 'hh' member. +*/ +static coalesce_dest *coalesce_table = NULL; + +/* coalesce_pending provides access to all active coalesce_dest structures + * which have data waiting to be transmitted, ordered by insertion date from + * oldest to newest. +*/ +static coalesce_dest *coalesce_pending = NULL; + +coalesce_dest *get_coalesce_by_dest(addr32_t netnum, addr48_t nodenum, uint16_t socket) +{ + coalesce_table_key dest = { netnum, nodenum, socket }; + + coalesce_dest *node; + HASH_FIND(hh, coalesce_table, &dest, sizeof(dest), node); + + if(node == NULL) + { + /* TODO: Limit maximum number of nodes, recycle old ones. */ + + node = malloc(sizeof(coalesce_dest)); + if(node == NULL) + { + return NULL; + } + + memset(node, 0, sizeof(*node)); + node->dest = dest; + + HASH_ADD(hh, coalesce_table, dest, sizeof(node->dest), node); + } + + return node; +} + +bool coalesce_register_send(coalesce_dest *node, uint64_t timestamp) +{ + memmove(node->send_timestamps, node->send_timestamps + 1, sizeof(node->send_timestamps) - sizeof(*(node->send_timestamps))); + node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1] = timestamp; + + if(node->send_timestamps[0] < node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1] + && (node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_START_THRESH) >= node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1]) + { + return true; + } + else if(node->active && (node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_STOP_THRESH) < node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1]) + { + return false; + } + else{ + return node->active; + } +} + +bool coalesce_add_data(coalesce_dest *cd, const void *data, int size, uint64_t now) +{ + if(cd->payload_used == 0) + { + novell_ipx_packet header; + + header.checksum = 0xFFFF; + header.hops = 0; + header.type = IPX_MAGIC_COALESCED; + + addr32_out(header.dest_net, cd->dest.netnum); + addr48_out(header.dest_node, cd->dest.nodenum); + header.dest_socket = 0; + + addr32_out(header.src_net, dosbox_local_netnum); + addr48_out(header.src_node, dosbox_local_nodenum); + header.src_socket = 0; + + if((sizeof(header) + size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE) + { + return false; + } + + cd->payload_timestamp = now; + DL_APPEND(coalesce_pending, cd); + + memcpy(cd->payload, &header, sizeof(header)); + cd->payload_used = sizeof(header); + } + + if((cd->payload_used + size) <= IPXWRAPPER_COALESCE_PACKET_MAX_SIZE) + { + memcpy((cd->payload + cd->payload_used), data, size); + cd->payload_used += size; + + return true; + } + else{ + return false; + } +} + +void coalesce_flush(coalesce_dest *cd) +{ + assert(cd->payload_used > 0); + + novell_ipx_packet *header = (novell_ipx_packet*)(cd->payload); + header->length = htons(cd->payload_used); + + log_printf(LOG_DEBUG, "Sending coalesced packet (%d bytes)", cd->payload_used); + + if(r_sendto(private_socket, (const void*)(cd->payload), cd->payload_used, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + { + log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(WSAGetLastError())); + } + + cd->payload_used = 0; + DL_DELETE(coalesce_pending, cd); +} + +DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket) +{ + /* We should always be called with an IPX header, even if the + * application is sending zero-byte payloads. + */ + assert(data_size > 0); + + uint64_t now = get_uticks(); + bool queued = false; + + coalesce_dest *cd = get_coalesce_by_dest(dest_net, dest_node, dest_socket); + if(cd != NULL) + { + bool should_coalesce = coalesce_register_send(cd, now); + + if(should_coalesce && !cd->active) + { + IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket); + log_printf(LOG_WARNING, "High send rate to %s detected, coalescing future packets\n", dest_addr); + + cd->active = true; + } + else if(!should_coalesce && cd->active) + { + IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket); + log_printf(LOG_INFO, "Send rate to %s has dropped, no longer coalescing packets\n", dest_addr); + + cd->active = false; + } + + if( + should_coalesce + && (cd->payload_used + data_size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE + && data_size < (IPXWRAPPER_COALESCE_PACKET_MAX_SIZE / 2)) + { + coalesce_flush(cd); + } + + if(should_coalesce && coalesce_add_data(cd, data, data_size, now)) + { + queued = true; + } + + if(cd->payload_used > 0 && (cd->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now) + { + coalesce_flush(cd); + } + } + + if(!queued && r_sendto(private_socket, (const void*)(data), data_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + { + DWORD error = WSAGetLastError(); + log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error)); + + return error; + } + + return ERROR_SUCCESS; +} + +void coalesce_flush_waiting(void) +{ + uint64_t now = get_uticks(); + + while(coalesce_pending != NULL + && (coalesce_pending->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now) + { + coalesce_flush(coalesce_pending); + } +} + +void coalesce_cleanup(void) +{ + while(coalesce_pending != NULL) + { + coalesce_flush(coalesce_pending); + } + + while(coalesce_table != NULL) + { + coalesce_dest *cd = coalesce_table; + + HASH_DEL(coalesce_table, cd); + free(cd); + } +} diff --git a/src/coalesce.h b/src/coalesce.h new file mode 100644 index 0000000..229f895 --- /dev/null +++ b/src/coalesce.h @@ -0,0 +1,59 @@ +/* IPXWrapper - Packet coalescing + * Copyright (C) 2023 Daniel Collins + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#ifndef IPXWRAPPER_COALESCE_H +#define IPXWRAPPER_COALESCE_H + +#include +#include +#include + +#include "addr.h" + +/* For each destination IPX address, track the timestamp of the past n send + * operations, we use this to determine how spammy the application is being + * with sendto() calls. +*/ +#define IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT 512 + +/* Start coalescing when the rate of send operations to a single IPX address + * reaches IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT packets over the past + * IPXWRAPPER_COALESCE_PACKET_START_THRESH microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_START_THRESH 2500000 /* 2.5s */ + +/* Stop coalescing when the rate of send operations to a single IPX address + * falls back under IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT over the past + * IPXWRAPPER_COALESCE_PACKET_STOP_THRESH microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_STOP_THRESH 10000000 /* 10s */ + +/* Delay the transmission of a packet for coalescing by no more than + * IPXWRAPPER_COALESCE_PACKET_MAX_DELAY microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_MAX_DELAY 20000 /* 20ms */ + +/* Combine outgoing IPX packets up to IPXWRAPPER_COALESCE_PACKET_MAX_SIZE + * bytes of data in the UDP payload. +*/ +#define IPXWRAPPER_COALESCE_PACKET_MAX_SIZE 1384 + +DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket); +void coalesce_flush_waiting(void); +void coalesce_cleanup(void); + +#endif /* !IPXWRAPPER_COALESCE_H */ diff --git a/src/ipxwrapper.c b/src/ipxwrapper.c index 11070c0..55745bf 100644 --- a/src/ipxwrapper.c +++ b/src/ipxwrapper.c @@ -57,6 +57,8 @@ struct FuncStats ipxwrapper_fstats[] = { #undef FPROF_DECL }; +static uint64_t perf_counter_freq = 0; + const unsigned int ipxwrapper_fstats_size = sizeof(ipxwrapper_fstats) / sizeof(*ipxwrapper_fstats); unsigned int send_packets = 0, send_bytes = 0; /* Sent from emulated socket */ @@ -104,6 +106,12 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) { if(fdwReason == DLL_PROCESS_ATTACH) { + LARGE_INTEGER pc_freq; + if(QueryPerformanceFrequency(&pc_freq)) + { + perf_counter_freq = pc_freq.QuadPart; + } + fprof_init(stub_fstats, NUM_STUBS); fprof_init(ipxwrapper_fstats, ipxwrapper_fstats_size); @@ -308,3 +316,19 @@ uint64_t get_ticks(void) return GetTickCount(); } } + +uint64_t get_uticks(void) +{ + LARGE_INTEGER pc_tick; + + if(perf_counter_freq == 0 || !QueryPerformanceCounter(&pc_tick)) + { + /* Fall back to GetTickCount() if there is no high-resolution + * performance counter available. + */ + return get_ticks() * 1000; + } + else{ + return pc_tick.QuadPart / (perf_counter_freq / 1000000); + } +} diff --git a/src/ipxwrapper.h b/src/ipxwrapper.h index 1b7e17c..8c064c3 100644 --- a/src/ipxwrapper.h +++ b/src/ipxwrapper.h @@ -150,6 +150,7 @@ struct ipx_packet { } __attribute__((__packed__)); #define IPX_MAGIC_SPXLOOKUP 1 +#define IPX_MAGIC_COALESCED 2 typedef struct spxlookup_req spxlookup_req_t; @@ -205,6 +206,7 @@ ipx_socket *get_socket_wait_for_ready(SOCKET sockfd, int timeout_ms); void lock_sockets(void); void unlock_sockets(void); uint64_t get_ticks(void); +uint64_t get_uticks(void); void add_self_to_firewall(void); diff --git a/src/router.c b/src/router.c index 53c660b..696f1f9 100644 --- a/src/router.c +++ b/src/router.c @@ -25,6 +25,7 @@ #include #include "router.h" +#include "coalesce.h" #include "common.h" #include "funcprof.h" #include "ipxwrapper.h" @@ -197,6 +198,8 @@ void router_cleanup(void) /* Release resources. */ + coalesce_cleanup(); + if(private_socket != -1) { closesocket(private_socket); @@ -554,29 +557,85 @@ static void _handle_dosbox_recv(novell_ipx_packet *packet, size_t packet_size) return; } - if(min_log_level <= LOG_DEBUG) + if(packet->src_socket == 0 && packet->type == IPX_MAGIC_COALESCED) { - IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); - IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket); + /* Sanity check the lengths of each inner packet. */ - log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + log_printf(LOG_DEBUG, "Recieved coalesced packet (%zu bytes)", packet_size); + + novell_ipx_packet *inner_packets = (novell_ipx_packet*)(packet->data); + size_t remaining_data = packet_size - sizeof(novell_ipx_packet); + + novell_ipx_packet *end = (novell_ipx_packet*)(packet->data + remaining_data); + + for(novell_ipx_packet *p = inner_packets; p < end;) + { + if(remaining_data < sizeof(novell_ipx_packet) || ntohs(p->length) > remaining_data) + { + /* Doesn't look valid. */ + log_printf(LOG_ERROR, "Recieved invalid IPX packet from DOSBox server, ignoring"); + return; + } + + p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length)); + } + + /* Deliver the inner packets. */ + + for(novell_ipx_packet *p = inner_packets; p < end;) + { + if(min_log_level <= LOG_DEBUG) + { + IPX_STRING_ADDR(src_addr, addr32_in(p->src_net), addr48_in(p->src_node), p->src_socket); + IPX_STRING_ADDR(dest_addr, addr32_in(p->dest_net), addr48_in(p->dest_node), p->dest_socket); + + log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + } + + size_t data_size = ntohs(p->length) - sizeof(novell_ipx_packet); + + _deliver_packet( + p->type, + + addr32_in(p->src_net), + addr48_in(p->src_node), + p->src_socket, + + addr32_in(p->dest_net), + addr48_in(p->dest_node), + p->dest_socket, + + p->data, + data_size); + + p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length)); + } } - - size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet); - - _deliver_packet( - packet->type, + else{ + if(min_log_level <= LOG_DEBUG) + { + IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); + IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket); + + log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + } - addr32_in(packet->src_net), - addr48_in(packet->src_node), - packet->src_socket, + size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet); - addr32_in(packet->dest_net), - addr48_in(packet->dest_node), - packet->dest_socket, - - packet->data, - data_size); + _deliver_packet( + packet->type, + + addr32_in(packet->src_net), + addr48_in(packet->src_node), + packet->src_socket, + + addr32_in(packet->dest_net), + addr48_in(packet->dest_node), + packet->dest_socket, + + packet->data, + data_size); + } } static bool _do_udp_recv(int fd) @@ -766,6 +825,10 @@ static DWORD router_main(void *arg) if(ipx_encap_type == ENCAP_TYPE_DOSBOX) { + lock_sockets(); + coalesce_flush_waiting(); + unlock_sockets(); + if(dosbox_state == DOSBOX_DISCONNECTED) { uint64_t now = get_ticks(); diff --git a/src/winsock.c b/src/winsock.c index 3bb3fac..6f22f65 100644 --- a/src/winsock.c +++ b/src/winsock.c @@ -25,6 +25,7 @@ #include #include "ipxwrapper.h" +#include "coalesce.h" #include "common.h" #include "interface.h" #include "router.h" @@ -1527,14 +1528,9 @@ static DWORD ipx_send_packet( memcpy(packet->data, data, data_size); - DWORD error = ERROR_SUCCESS; - - if(r_sendto(private_socket, (const void*)(packet), packet_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + DWORD error = coalesce_send(packet, packet_size, dest_net, dest_node, dest_socket); + if(error == ERROR_SUCCESS) { - error = WSAGetLastError(); - log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error)); - } - else{ __atomic_add_fetch(&send_packets, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&send_bytes, data_size, __ATOMIC_RELAXED); }