diff --git a/Makefile b/Makefile index fbba7f3..7e0811d 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ dist: all IPXWRAPPER_OBJS := src/ipxwrapper.o src/winsock.o src/ipxwrapper_stubs.o src/log.o src/common.o \ src/interface.o src/interface2.o src/router.o src/ipxwrapper.def src/addrcache.o src/config.o src/addr.o \ - src/firewall.o src/ethernet.o src/funcprof.o + src/firewall.o src/ethernet.o src/funcprof.o src/coalesce.o ipxwrapper.dll: $(IPXWRAPPER_OBJS) echo 'const char *version_string = "$(VERSION)", *compile_time = "'`date`'";' | $(CC) -c -x c -o version.o - diff --git a/src/coalesce.c b/src/coalesce.c new file mode 100644 index 0000000..16feba6 --- /dev/null +++ b/src/coalesce.c @@ -0,0 +1,293 @@ +/* IPXWrapper - Packet coalescing + * Copyright (C) 2023 Daniel Collins + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#define WINSOCK_API_LINKAGE + +#include +#include +#include +#include + +#include "coalesce.h" +#include "ethernet.h" +#include "interface.h" +#include "ipxwrapper.h" + +struct coalesce_table_key +{ + addr32_t netnum; + addr48_t nodenum; + uint16_t socket; +}; + +typedef struct coalesce_table_key coalesce_table_key; + +struct coalesce_dest +{ + UT_hash_handle hh; + + struct coalesce_dest *prev; + struct coalesce_dest *next; + + coalesce_table_key dest; + bool active; + + uint64_t send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT]; + + uint64_t payload_timestamp; + unsigned char payload[IPXWRAPPER_COALESCE_PACKET_MAX_SIZE]; + int payload_used; +}; + +typedef struct coalesce_dest coalesce_dest; + +/* coalesce_table provides access to all coalesce_dest structures and should + * be iterated using the 'hh' member. +*/ +static coalesce_dest *coalesce_table = NULL; + +/* coalesce_pending provides access to all active coalesce_dest structures + * which have data waiting to be transmitted, ordered by insertion date from + * oldest to newest. +*/ +static coalesce_dest *coalesce_pending = NULL; + +coalesce_dest *get_coalesce_by_dest(addr32_t netnum, addr48_t nodenum, uint16_t socket) +{ + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_get_coalesce_by_dest])); + + if(!main_config.dosbox_coalesce) + { + /* Skip coalescing if disabled. */ + return NULL; + } + + coalesce_table_key dest = { netnum, nodenum, socket }; + + coalesce_dest *node; + HASH_FIND(hh, coalesce_table, &dest, sizeof(dest), node); + + if(node == NULL) + { + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_get_coalesce_by_dest_new])); + + /* TODO: Limit maximum number of nodes, recycle old ones. */ + + node = malloc(sizeof(coalesce_dest)); + if(node == NULL) + { + return NULL; + } + + memset(node, 0, sizeof(*node)); + node->dest = dest; + + HASH_ADD(hh, coalesce_table, dest, sizeof(node->dest), node); + } + + return node; +} + +bool coalesce_register_send(coalesce_dest *node, uint64_t timestamp) +{ + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_register_send])); + + memmove(node->send_timestamps, node->send_timestamps + 1, sizeof(node->send_timestamps) - sizeof(*(node->send_timestamps))); + node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1] = timestamp; + + if((node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_START_THRESH) >= timestamp) + { + return true; + } + else if(node->active && (node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_STOP_THRESH) < timestamp) + { + return false; + } + else{ + return node->active; + } +} + +bool coalesce_add_data(coalesce_dest *cd, const void *data, int size, uint64_t now) +{ + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_add_data])); + + if(cd->payload_used == 0) + { + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_add_data_init])); + + novell_ipx_packet header; + + header.checksum = 0xFFFF; + header.hops = 0; + header.type = IPX_MAGIC_COALESCED; + + addr32_out(header.dest_net, cd->dest.netnum); + addr48_out(header.dest_node, cd->dest.nodenum); + header.dest_socket = 0; + + addr32_out(header.src_net, dosbox_local_netnum); + addr48_out(header.src_node, dosbox_local_nodenum); + header.src_socket = 0; + + if((sizeof(header) + size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE) + { + return false; + } + + cd->payload_timestamp = now; + DL_APPEND(coalesce_pending, cd); + + memcpy(cd->payload, &header, sizeof(header)); + cd->payload_used = sizeof(header); + } + + if((cd->payload_used + size) <= IPXWRAPPER_COALESCE_PACKET_MAX_SIZE) + { + memcpy((cd->payload + cd->payload_used), data, size); + cd->payload_used += size; + + return true; + } + else{ + return false; + } +} + +void coalesce_flush(coalesce_dest *cd) +{ + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_flush])); + + assert(cd->payload_used > 0); + + novell_ipx_packet *header = (novell_ipx_packet*)(cd->payload); + header->length = htons(cd->payload_used); + + log_printf(LOG_DEBUG, "Sending coalesced packet (%d bytes)", cd->payload_used); + + if(r_sendto(private_socket, (const void*)(cd->payload), cd->payload_used, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + { + log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(WSAGetLastError())); + } + else{ + __atomic_add_fetch(&send_packets_udp, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&send_bytes_udp, cd->payload_used, __ATOMIC_RELAXED); + } + + cd->payload_used = 0; + DL_DELETE(coalesce_pending, cd); +} + +DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket) +{ + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_send])); + + /* We should always be called with an IPX header, even if the + * application is sending zero-byte payloads. + */ + assert(data_size > 0); + + uint64_t now = get_uticks(); + bool queued = false; + + coalesce_dest *cd = get_coalesce_by_dest(dest_net, dest_node, dest_socket); + if(cd != NULL) + { + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_send_cd])); + + bool should_coalesce = coalesce_register_send(cd, now); + + if(should_coalesce && !cd->active) + { + IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket); + log_printf(LOG_WARNING, "High send rate to %s detected, coalescing future packets\n", dest_addr); + + cd->active = true; + } + else if(!should_coalesce && cd->active) + { + IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket); + log_printf(LOG_INFO, "Send rate to %s has dropped, no longer coalescing packets\n", dest_addr); + + cd->active = false; + } + + if( + should_coalesce + && (cd->payload_used + data_size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE + && data_size < (IPXWRAPPER_COALESCE_PACKET_MAX_SIZE / 2)) + { + coalesce_flush(cd); + } + + if(should_coalesce && coalesce_add_data(cd, data, data_size, now)) + { + queued = true; + } + + if(cd->payload_used > 0 && (cd->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now) + { + coalesce_flush(cd); + } + } + + if(!queued) + { + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_send_immediate])); + + if(r_sendto(private_socket, (const void*)(data), data_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + { + DWORD error = WSAGetLastError(); + log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error)); + + return error; + } + else{ + __atomic_add_fetch(&send_packets_udp, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&send_bytes_udp, data_size, __ATOMIC_RELAXED); + } + } + + return ERROR_SUCCESS; +} + +void coalesce_flush_waiting(void) +{ + uint64_t now = get_uticks(); + + while(coalesce_pending != NULL + && (coalesce_pending->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now) + { + coalesce_flush(coalesce_pending); + } +} + +void coalesce_cleanup(void) +{ + while(coalesce_pending != NULL) + { + coalesce_flush(coalesce_pending); + } + + while(coalesce_table != NULL) + { + coalesce_dest *cd = coalesce_table; + + HASH_DEL(coalesce_table, cd); + free(cd); + } +} diff --git a/src/coalesce.h b/src/coalesce.h new file mode 100644 index 0000000..229f895 --- /dev/null +++ b/src/coalesce.h @@ -0,0 +1,59 @@ +/* IPXWrapper - Packet coalescing + * Copyright (C) 2023 Daniel Collins + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#ifndef IPXWRAPPER_COALESCE_H +#define IPXWRAPPER_COALESCE_H + +#include +#include +#include + +#include "addr.h" + +/* For each destination IPX address, track the timestamp of the past n send + * operations, we use this to determine how spammy the application is being + * with sendto() calls. +*/ +#define IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT 512 + +/* Start coalescing when the rate of send operations to a single IPX address + * reaches IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT packets over the past + * IPXWRAPPER_COALESCE_PACKET_START_THRESH microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_START_THRESH 2500000 /* 2.5s */ + +/* Stop coalescing when the rate of send operations to a single IPX address + * falls back under IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT over the past + * IPXWRAPPER_COALESCE_PACKET_STOP_THRESH microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_STOP_THRESH 10000000 /* 10s */ + +/* Delay the transmission of a packet for coalescing by no more than + * IPXWRAPPER_COALESCE_PACKET_MAX_DELAY microseconds. +*/ +#define IPXWRAPPER_COALESCE_PACKET_MAX_DELAY 20000 /* 20ms */ + +/* Combine outgoing IPX packets up to IPXWRAPPER_COALESCE_PACKET_MAX_SIZE + * bytes of data in the UDP payload. +*/ +#define IPXWRAPPER_COALESCE_PACKET_MAX_SIZE 1384 + +DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket); +void coalesce_flush_waiting(void); +void coalesce_cleanup(void); + +#endif /* !IPXWRAPPER_COALESCE_H */ diff --git a/src/config.c b/src/config.c index 15e6e21..cb2597b 100644 --- a/src/config.c +++ b/src/config.c @@ -37,6 +37,7 @@ main_config_t get_main_config(void) config.dosbox_server_addr = NULL; config.dosbox_server_port = 213; + config.dosbox_coalesce = false; HKEY reg = reg_open_main(false); @@ -64,6 +65,7 @@ main_config_t get_main_config(void) config.dosbox_server_addr = reg_get_string(reg, "dosbox_server_addr", ""); config.dosbox_server_port = reg_get_dword(reg, "dosbox_server_port", config.dosbox_server_port); + config.dosbox_coalesce = reg_get_dword(reg, "dosbox_coalesce", config.dosbox_coalesce); /* Check for valid frame_type */ @@ -96,7 +98,8 @@ bool set_main_config(const main_config_t *config) && reg_set_dword(reg, "profile", config->profile) && reg_set_string(reg, "dosbox_server_addr", config->dosbox_server_addr) - && reg_set_dword(reg, "dosbox_server_port", config->dosbox_server_port); + && reg_set_dword(reg, "dosbox_server_port", config->dosbox_server_port) + && reg_set_dword(reg, "dosbox_coalesce", config->dosbox_coalesce); reg_close(reg); diff --git a/src/config.h b/src/config.h index b815f31..6fd8570 100644 --- a/src/config.h +++ b/src/config.h @@ -50,6 +50,7 @@ typedef struct main_config { char *dosbox_server_addr; uint16_t dosbox_server_port; + bool dosbox_coalesce; enum ipx_log_level log_level; bool profile; diff --git a/src/ipxconfig.cpp b/src/ipxconfig.cpp index e70da19..a20abf9 100644 --- a/src/ipxconfig.cpp +++ b/src/ipxconfig.cpp @@ -53,6 +53,7 @@ enum { ID_DOSBOX_SERVER_ADDR = 51, ID_DOSBOX_SERVER_PORT = 52, + ID_DOSBOX_COALESCE = 53, ID_DOSBOX_FW_EXCEPT = 55, ID_IPXWRAPPER_PORT = 61, @@ -133,6 +134,7 @@ static struct { HWND dosbox_server_addr; HWND dosbox_server_port_lbl; HWND dosbox_server_port; + HWND dosbox_coalesce; HWND dosbox_fw_except; HWND box_ipx_options; @@ -207,6 +209,21 @@ static LRESULT CALLBACK main_wproc(HWND window, UINT msg, WPARAM wp, LPARAM lp) break; } + case ID_DOSBOX_COALESCE: { + bool coalesce = get_checkbox(wh.dosbox_coalesce); + + if(coalesce) + { + int result = MessageBox(NULL, "Packet coalescing requires all players to be using IPXWrapper 0.7.1 or later.\nAre you sure you want to enable it?", "Warning", MB_YESNO | MB_TASKMODAL | MB_ICONWARNING); + if(result != IDYES) + { + set_checkbox(wh.dosbox_coalesce, false); + } + } + + break; + } + case ID_OPT_LOG_DEBUG: { main_window_update(); break; @@ -570,6 +587,7 @@ static bool save_config() } main_config.dosbox_server_port = port; + main_config.dosbox_coalesce = get_checkbox(wh.dosbox_coalesce); main_config.fw_except = get_checkbox(wh.dosbox_fw_except); } else if(main_config.encap_type == ENCAP_TYPE_PCAP) @@ -760,6 +778,7 @@ static void main_window_init() wh.dosbox_server_port_lbl = create_STATIC(wh.box_dosbox_options, "DOSBox IPX server port"); wh.dosbox_server_port = create_child(wh.box_dosbox_options, "EDIT", "", WS_TABSTOP, WS_EX_CLIENTEDGE, ID_DOSBOX_SERVER_PORT); + wh.dosbox_coalesce = create_checkbox(wh.box_dosbox_options, "Coalesce packets when saturated", ID_DOSBOX_COALESCE); wh.dosbox_fw_except = create_checkbox(wh.box_dosbox_options, "Automatically create Windows Firewall exceptions", ID_DOSBOX_FW_EXCEPT); /* Initialise controls. */ @@ -771,6 +790,7 @@ static void main_window_init() sprintf(port_s, "%hu", main_config.dosbox_server_port); SetWindowText(wh.dosbox_server_port, port_s); + set_checkbox(wh.dosbox_coalesce, main_config.dosbox_coalesce); set_checkbox(wh.dosbox_fw_except, main_config.fw_except); } @@ -938,6 +958,9 @@ static void main_window_init() box_dosbox_options_y += text_h; /* Padding. */ + MoveWindow(wh.dosbox_coalesce, BOX_SIDE_PAD, box_dosbox_options_y, width - 20, text_h, TRUE); + box_dosbox_options_y += text_h; + MoveWindow(wh.dosbox_fw_except, BOX_SIDE_PAD, box_dosbox_options_y, width - 20, text_h, TRUE); box_dosbox_options_y += text_h; diff --git a/src/ipxwrapper.c b/src/ipxwrapper.c index 11070c0..a6d7c54 100644 --- a/src/ipxwrapper.c +++ b/src/ipxwrapper.c @@ -57,11 +57,16 @@ struct FuncStats ipxwrapper_fstats[] = { #undef FPROF_DECL }; +static uint64_t perf_counter_freq = 0; + const unsigned int ipxwrapper_fstats_size = sizeof(ipxwrapper_fstats) / sizeof(*ipxwrapper_fstats); unsigned int send_packets = 0, send_bytes = 0; /* Sent from emulated socket */ unsigned int recv_packets = 0, recv_bytes = 0; /* Forwarded to emulated socket */ +unsigned int send_packets_udp = 0, send_bytes_udp = 0; /* Sent over UDP transport */ +unsigned int recv_packets_udp = 0, recv_bytes_udp = 0; /* Received over UDP transport */ + static void init_cs(CRITICAL_SECTION *cs) { if(!InitializeCriticalSectionAndSpinCount(cs, 0x80000000)) @@ -82,8 +87,17 @@ static void report_packet_stats(void) unsigned int my_recv_packets = __atomic_exchange_n(&recv_packets, 0, __ATOMIC_RELAXED); unsigned int my_recv_bytes = __atomic_exchange_n(&recv_bytes, 0, __ATOMIC_RELAXED); + unsigned int my_send_packets_udp = __atomic_exchange_n(&send_packets_udp, 0, __ATOMIC_RELAXED); + unsigned int my_send_bytes_udp = __atomic_exchange_n(&send_bytes_udp, 0, __ATOMIC_RELAXED); + + unsigned int my_recv_packets_udp = __atomic_exchange_n(&recv_packets_udp, 0, __ATOMIC_RELAXED); + unsigned int my_recv_bytes_udp = __atomic_exchange_n(&recv_bytes_udp, 0, __ATOMIC_RELAXED); + log_printf(LOG_INFO, "IPX sockets sent %u packets (%u bytes)", my_send_packets, my_send_bytes); log_printf(LOG_INFO, "IPX sockets received %u packets (%u bytes)", my_recv_packets, my_recv_bytes); + + log_printf(LOG_INFO, "UDP sockets sent %u packets (%u bytes)", my_send_packets_udp, my_send_bytes_udp); + log_printf(LOG_INFO, "UDP sockets received %u packets (%u bytes)", my_recv_packets_udp, my_recv_bytes_udp); } static DWORD WINAPI prof_thread_main(LPVOID lpParameter) @@ -104,6 +118,12 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) { if(fdwReason == DLL_PROCESS_ATTACH) { + LARGE_INTEGER pc_freq; + if(QueryPerformanceFrequency(&pc_freq)) + { + perf_counter_freq = pc_freq.QuadPart; + } + fprof_init(stub_fstats, NUM_STUBS); fprof_init(ipxwrapper_fstats, ipxwrapper_fstats_size); @@ -111,6 +131,7 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) log_printf(LOG_INFO, "IPXWrapper %s", version_string); log_printf(LOG_INFO, "Compiled at %s", compile_time); + log_printf(LOG_INFO, "Performance counter: %lld Hz", perf_counter_freq); if(!getenv("SystemRoot")) { @@ -308,3 +329,19 @@ uint64_t get_ticks(void) return GetTickCount(); } } + +uint64_t get_uticks(void) +{ + LARGE_INTEGER pc_tick; + + if(perf_counter_freq == 0 || !QueryPerformanceCounter(&pc_tick)) + { + /* Fall back to GetTickCount() if there is no high-resolution + * performance counter available. + */ + return get_ticks() * 1000; + } + else{ + return pc_tick.QuadPart / (perf_counter_freq / 1000000); + } +} diff --git a/src/ipxwrapper.h b/src/ipxwrapper.h index 1b7e17c..559e1eb 100644 --- a/src/ipxwrapper.h +++ b/src/ipxwrapper.h @@ -150,6 +150,7 @@ struct ipx_packet { } __attribute__((__packed__)); #define IPX_MAGIC_SPXLOOKUP 1 +#define IPX_MAGIC_COALESCED 2 typedef struct spxlookup_req spxlookup_req_t; @@ -200,11 +201,15 @@ enum { extern unsigned int send_packets, send_bytes; /* Sent from emulated socket */ extern unsigned int recv_packets, recv_bytes; /* Forwarded to emulated socket */ +extern unsigned int send_packets_udp, send_bytes_udp; /* Sent over UDP transport */ +extern unsigned int recv_packets_udp, recv_bytes_udp; /* Received over UDP transport */ + ipx_socket *get_socket(SOCKET sockfd); ipx_socket *get_socket_wait_for_ready(SOCKET sockfd, int timeout_ms); void lock_sockets(void); void unlock_sockets(void); uint64_t get_ticks(void); +uint64_t get_uticks(void); void add_self_to_firewall(void); diff --git a/src/ipxwrapper_prof_defs.h b/src/ipxwrapper_prof_defs.h index 052f52a..47c2229 100644 --- a/src/ipxwrapper_prof_defs.h +++ b/src/ipxwrapper_prof_defs.h @@ -3,3 +3,19 @@ FPROF_DECL(_handle_udp_recv) FPROF_DECL(_handle_dosbox_recv) FPROF_DECL(_handle_pcap_frame) FPROF_DECL(lock_sockets) +FPROF_DECL(ioctlsocket_recv_pump) +FPROF_DECL(ioctlsocket_accumulate) +FPROF_DECL(recv_pump_select) +FPROF_DECL(recv_pump_find_slot) +FPROF_DECL(recv_pump_recv) +FPROF_DECL(recv_pump_reclaim_socket) + +FPROF_DECL(get_coalesce_by_dest) +FPROF_DECL(get_coalesce_by_dest_new) +FPROF_DECL(coalesce_register_send) +FPROF_DECL(coalesce_add_data) +FPROF_DECL(coalesce_add_data_init) +FPROF_DECL(coalesce_flush) +FPROF_DECL(coalesce_send) +FPROF_DECL(coalesce_send_cd) +FPROF_DECL(coalesce_send_immediate) diff --git a/src/router.c b/src/router.c index 53c660b..0bec648 100644 --- a/src/router.c +++ b/src/router.c @@ -25,6 +25,7 @@ #include #include "router.h" +#include "coalesce.h" #include "common.h" #include "funcprof.h" #include "ipxwrapper.h" @@ -34,6 +35,9 @@ #define IPX_SOCK_ECHO 2 +/* Maximum number of packets to dispatch per iteration of the router loop. */ +#define MAX_RECV_PER_LOOP 50 + static bool router_running = false; static WSAEVENT router_event = WSA_INVALID_EVENT; static HANDLE router_thread = NULL; @@ -197,6 +201,8 @@ void router_cleanup(void) /* Release resources. */ + coalesce_cleanup(); + if(private_socket != -1) { closesocket(private_socket); @@ -554,32 +560,88 @@ static void _handle_dosbox_recv(novell_ipx_packet *packet, size_t packet_size) return; } - if(min_log_level <= LOG_DEBUG) + if(packet->src_socket == 0 && packet->type == IPX_MAGIC_COALESCED) { - IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); - IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket); + /* Sanity check the lengths of each inner packet. */ - log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + log_printf(LOG_DEBUG, "Recieved coalesced packet (%zu bytes)", packet_size); + + novell_ipx_packet *inner_packets = (novell_ipx_packet*)(packet->data); + size_t remaining_data = packet_size - sizeof(novell_ipx_packet); + + novell_ipx_packet *end = (novell_ipx_packet*)(packet->data + remaining_data); + + for(novell_ipx_packet *p = inner_packets; p < end;) + { + if(remaining_data < sizeof(novell_ipx_packet) || ntohs(p->length) > remaining_data) + { + /* Doesn't look valid. */ + log_printf(LOG_ERROR, "Recieved invalid IPX packet from DOSBox server, ignoring"); + return; + } + + p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length)); + } + + /* Deliver the inner packets. */ + + for(novell_ipx_packet *p = inner_packets; p < end;) + { + if(min_log_level <= LOG_DEBUG) + { + IPX_STRING_ADDR(src_addr, addr32_in(p->src_net), addr48_in(p->src_node), p->src_socket); + IPX_STRING_ADDR(dest_addr, addr32_in(p->dest_net), addr48_in(p->dest_node), p->dest_socket); + + log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + } + + size_t data_size = ntohs(p->length) - sizeof(novell_ipx_packet); + + _deliver_packet( + p->type, + + addr32_in(p->src_net), + addr48_in(p->src_node), + p->src_socket, + + addr32_in(p->dest_net), + addr48_in(p->dest_node), + p->dest_socket, + + p->data, + data_size); + + p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length)); + } } - - size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet); - - _deliver_packet( - packet->type, + else{ + if(min_log_level <= LOG_DEBUG) + { + IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); + IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket); + + log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr); + } - addr32_in(packet->src_net), - addr48_in(packet->src_node), - packet->src_socket, + size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet); - addr32_in(packet->dest_net), - addr48_in(packet->dest_node), - packet->dest_socket, - - packet->data, - data_size); + _deliver_packet( + packet->type, + + addr32_in(packet->src_net), + addr48_in(packet->src_node), + packet->src_socket, + + addr32_in(packet->dest_net), + addr48_in(packet->dest_node), + packet->dest_socket, + + packet->data, + data_size); + } } -static bool _do_udp_recv(int fd) +static int _do_udp_recv(int fd) { struct sockaddr_in addr; int addrlen = sizeof(addr); @@ -588,14 +650,19 @@ static bool _do_udp_recv(int fd) int len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr*)(&addr), &addrlen); if(len == -1) { - if(WSAGetLastError() == WSAEWOULDBLOCK || WSAGetLastError() == WSAECONNRESET) - { - return true; - } + DWORD err = WSAGetLastError(); - return false; + switch(err) + { + case WSAEWOULDBLOCK: return 0; + case WSAECONNRESET: return 1; + default: return -1; + } } + __atomic_add_fetch(&recv_packets_udp, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&recv_bytes_udp, len, __ATOMIC_RELAXED); + if(ipx_encap_type == ENCAP_TYPE_DOSBOX) { if(addr.sin_family != dosbox_server_addr.sin_family @@ -603,7 +670,7 @@ static bool _do_udp_recv(int fd) || addr.sin_port != dosbox_server_addr.sin_port) { /* Ignore packet from wrong address. */ - return true; + return 1; } if(dosbox_state == DOSBOX_REGISTERING) @@ -623,7 +690,7 @@ static bool _do_udp_recv(int fd) _handle_udp_recv((ipx_packet*)(buf), len, addr); } - return true; + return 1; } static void _handle_pcap_frame(u_char *user, const struct pcap_pkthdr *pkt_header, const u_char *pkt_data) @@ -766,6 +833,10 @@ static DWORD router_main(void *arg) if(ipx_encap_type == ENCAP_TYPE_DOSBOX) { + lock_sockets(); + coalesce_flush_waiting(); + unlock_sockets(); + if(dosbox_state == DOSBOX_DISCONNECTED) { uint64_t now = get_ticks(); @@ -819,14 +890,43 @@ static DWORD router_main(void *arg) } else if(ipx_encap_type == ENCAP_TYPE_DOSBOX) { - if(!_do_udp_recv(private_socket)) + int status = 0; + + for(int i = 0; i < MAX_RECV_PER_LOOP; ++i) + { + status = _do_udp_recv(private_socket); + if(status <= 0) + { + break; + } + } + + if(status < 0) { exit_status = 1; break; } } else{ - if(!_do_udp_recv(shared_socket) || !_do_udp_recv(private_socket)) + int status = 0; + + for(int i = 0; i < MAX_RECV_PER_LOOP; ++i) + { + int s1 = _do_udp_recv(shared_socket); + int s2 = _do_udp_recv(private_socket); + + if(s1 < 0 || s2 < 0) + { + status = -1; + break; + } + else if(s1 == 0 && s2 == 0) + { + break; + } + } + + if(status < 0) { exit_status = 1; break; diff --git a/src/winsock.c b/src/winsock.c index 3bb3fac..186739d 100644 --- a/src/winsock.c +++ b/src/winsock.c @@ -25,6 +25,7 @@ #include #include "ipxwrapper.h" +#include "coalesce.h" #include "common.h" #include "interface.h" #include "router.h" @@ -770,23 +771,18 @@ static BOOL reclaim_socket(ipx_socket *sockptr, int lookup_fd) static int recv_pump(ipx_socket *sockptr, BOOL block) { int fd = sockptr->fd; + u_long available = -1; if(!block) { - fd_set read_fds; - FD_ZERO(&read_fds); + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_select])); - FD_SET(fd, &read_fds); - - struct timeval timeout = { 0, 0 }; - - int r = r_select(-1, &read_fds, NULL, NULL, &timeout); - if(r == -1) + if(r_ioctlsocket(fd, FIONREAD, &available) != 0) { unlock_sockets(); return -1; } - else if(r == 0) + else if(available == 0) { /* No packet waiting in underlying recv buffer. */ return 0; @@ -797,15 +793,19 @@ static int recv_pump(ipx_socket *sockptr, BOOL block) int recv_slot = -1; - for(int i = 0; i < RECV_QUEUE_MAX_PACKETS; ++i) { - if(queue->sizes[i] == IPX_RECV_QUEUE_FREE) + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_find_slot])); + + for(int i = 0; i < RECV_QUEUE_MAX_PACKETS; ++i) { - queue->sizes[i] = IPX_RECV_QUEUE_LOCKED; - recv_queue_adjust_refcount(queue, 1); - - recv_slot = i; - break; + if(queue->sizes[i] == IPX_RECV_QUEUE_FREE) + { + queue->sizes[i] = IPX_RECV_QUEUE_LOCKED; + recv_queue_adjust_refcount(queue, 1); + + recv_slot = i; + break; + } } } @@ -817,9 +817,19 @@ static int recv_pump(ipx_socket *sockptr, BOOL block) unlock_sockets(); - int r = r_recv(fd, (char*)(queue->data[recv_slot]), MAX_PKT_SIZE, 0); + int r; + { + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_recv])); + r = r_recv(fd, (char*)(queue->data[recv_slot]), MAX_PKT_SIZE, 0); + } - if(!reclaim_socket(sockptr, fd)) + bool ok; + { + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_reclaim_socket])); + ok = reclaim_socket(sockptr, fd); + } + + if(!ok) { /* The application closed the socket while we were in the recv() call. * Just discard our handle, let the queue be destroyed. @@ -1375,7 +1385,14 @@ static int send_packet(const ipx_packet *packet, int len, struct sockaddr *addr, log_printf(LOG_DEBUG, "Sending packet from %s to %s (%s:%hu)", src_addr, dest_addr, inet_ntoa(v4->sin_addr), ntohs(v4->sin_port)); } - return (r_sendto(private_socket, (char*)packet, len, 0, addr, addrlen) == len); + int r = r_sendto(private_socket, (char*)packet, len, 0, addr, addrlen); + if(r == len) + { + __atomic_add_fetch(&send_packets_udp, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&send_bytes_udp, len, __ATOMIC_RELAXED); + } + + return r; } static DWORD ipx_send_packet( @@ -1527,14 +1544,9 @@ static DWORD ipx_send_packet( memcpy(packet->data, data, data_size); - DWORD error = ERROR_SUCCESS; - - if(r_sendto(private_socket, (const void*)(packet), packet_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0) + DWORD error = coalesce_send(packet, packet_size, dest_net, dest_node, dest_socket); + if(error == ERROR_SUCCESS) { - error = WSAGetLastError(); - log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error)); - } - else{ __atomic_add_fetch(&send_packets, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&send_bytes, data_size, __ATOMIC_RELAXED); } @@ -1810,28 +1822,36 @@ int PASCAL ioctlsocket(SOCKET fd, long cmd, u_long *argp) if(cmd == FIONREAD && !(sock->flags & IPX_IS_SPX)) { - while(1) { - int r = recv_pump(sock, FALSE); - if(r < 0) - { - /* Error in recv_pump() */ - return -1; - } + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_ioctlsocket_recv_pump])); - if(r == 0) + while(1) { - /* No more packets ready to read from underlying socket. */ - break; + int r = recv_pump(sock, FALSE); + if(r < 0) + { + /* Error in recv_pump() */ + return -1; + } + + if(r == 0) + { + /* No more packets ready to read from underlying socket. */ + break; + } } } unsigned long accumulated_packet_data = 0; - for(int i = 0; i < sock->recv_queue->n_ready; ++i) { - const ipx_packet *packet = (const ipx_packet*)(sock->recv_queue->data[ sock->recv_queue->ready[i] ]); - accumulated_packet_data += packet->size; + FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_ioctlsocket_accumulate])); + + for(int i = 0; i < sock->recv_queue->n_ready; ++i) + { + const ipx_packet *packet = (const ipx_packet*)(sock->recv_queue->data[ sock->recv_queue->ready[i] ]); + accumulated_packet_data += packet->size; + } } unlock_sockets();