1
0
mirror of https://github.com/solemnwarning/ipxwrapper synced 2024-12-30 16:45:37 +01:00

Merge branch 'coalesce'

This commit is contained in:
Daniel Collins 2023-12-10 13:00:43 +00:00
commit 7c31b89929
11 changed files with 626 additions and 69 deletions

View File

@ -86,7 +86,7 @@ dist: all
IPXWRAPPER_OBJS := src/ipxwrapper.o src/winsock.o src/ipxwrapper_stubs.o src/log.o src/common.o \
src/interface.o src/interface2.o src/router.o src/ipxwrapper.def src/addrcache.o src/config.o src/addr.o \
src/firewall.o src/ethernet.o src/funcprof.o
src/firewall.o src/ethernet.o src/funcprof.o src/coalesce.o
ipxwrapper.dll: $(IPXWRAPPER_OBJS)
echo 'const char *version_string = "$(VERSION)", *compile_time = "'`date`'";' | $(CC) -c -x c -o version.o -

293
src/coalesce.c Normal file
View File

@ -0,0 +1,293 @@
/* IPXWrapper - Packet coalescing
* Copyright (C) 2023 Daniel Collins <solemnwarning@solemnwarning.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#define WINSOCK_API_LINKAGE
#include <uthash.h>
#include <utlist.h>
#include <winsock2.h>
#include <windows.h>
#include "coalesce.h"
#include "ethernet.h"
#include "interface.h"
#include "ipxwrapper.h"
struct coalesce_table_key
{
addr32_t netnum;
addr48_t nodenum;
uint16_t socket;
};
typedef struct coalesce_table_key coalesce_table_key;
struct coalesce_dest
{
UT_hash_handle hh;
struct coalesce_dest *prev;
struct coalesce_dest *next;
coalesce_table_key dest;
bool active;
uint64_t send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT];
uint64_t payload_timestamp;
unsigned char payload[IPXWRAPPER_COALESCE_PACKET_MAX_SIZE];
int payload_used;
};
typedef struct coalesce_dest coalesce_dest;
/* coalesce_table provides access to all coalesce_dest structures and should
* be iterated using the 'hh' member.
*/
static coalesce_dest *coalesce_table = NULL;
/* coalesce_pending provides access to all active coalesce_dest structures
* which have data waiting to be transmitted, ordered by insertion date from
* oldest to newest.
*/
static coalesce_dest *coalesce_pending = NULL;
coalesce_dest *get_coalesce_by_dest(addr32_t netnum, addr48_t nodenum, uint16_t socket)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_get_coalesce_by_dest]));
if(!main_config.dosbox_coalesce)
{
/* Skip coalescing if disabled. */
return NULL;
}
coalesce_table_key dest = { netnum, nodenum, socket };
coalesce_dest *node;
HASH_FIND(hh, coalesce_table, &dest, sizeof(dest), node);
if(node == NULL)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_get_coalesce_by_dest_new]));
/* TODO: Limit maximum number of nodes, recycle old ones. */
node = malloc(sizeof(coalesce_dest));
if(node == NULL)
{
return NULL;
}
memset(node, 0, sizeof(*node));
node->dest = dest;
HASH_ADD(hh, coalesce_table, dest, sizeof(node->dest), node);
}
return node;
}
bool coalesce_register_send(coalesce_dest *node, uint64_t timestamp)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_register_send]));
memmove(node->send_timestamps, node->send_timestamps + 1, sizeof(node->send_timestamps) - sizeof(*(node->send_timestamps)));
node->send_timestamps[IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT - 1] = timestamp;
if((node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_START_THRESH) >= timestamp)
{
return true;
}
else if(node->active && (node->send_timestamps[0] + IPXWRAPPER_COALESCE_PACKET_STOP_THRESH) < timestamp)
{
return false;
}
else{
return node->active;
}
}
bool coalesce_add_data(coalesce_dest *cd, const void *data, int size, uint64_t now)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_add_data]));
if(cd->payload_used == 0)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_add_data_init]));
novell_ipx_packet header;
header.checksum = 0xFFFF;
header.hops = 0;
header.type = IPX_MAGIC_COALESCED;
addr32_out(header.dest_net, cd->dest.netnum);
addr48_out(header.dest_node, cd->dest.nodenum);
header.dest_socket = 0;
addr32_out(header.src_net, dosbox_local_netnum);
addr48_out(header.src_node, dosbox_local_nodenum);
header.src_socket = 0;
if((sizeof(header) + size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE)
{
return false;
}
cd->payload_timestamp = now;
DL_APPEND(coalesce_pending, cd);
memcpy(cd->payload, &header, sizeof(header));
cd->payload_used = sizeof(header);
}
if((cd->payload_used + size) <= IPXWRAPPER_COALESCE_PACKET_MAX_SIZE)
{
memcpy((cd->payload + cd->payload_used), data, size);
cd->payload_used += size;
return true;
}
else{
return false;
}
}
void coalesce_flush(coalesce_dest *cd)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_flush]));
assert(cd->payload_used > 0);
novell_ipx_packet *header = (novell_ipx_packet*)(cd->payload);
header->length = htons(cd->payload_used);
log_printf(LOG_DEBUG, "Sending coalesced packet (%d bytes)", cd->payload_used);
if(r_sendto(private_socket, (const void*)(cd->payload), cd->payload_used, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0)
{
log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(WSAGetLastError()));
}
else{
__atomic_add_fetch(&send_packets_udp, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&send_bytes_udp, cd->payload_used, __ATOMIC_RELAXED);
}
cd->payload_used = 0;
DL_DELETE(coalesce_pending, cd);
}
DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_send]));
/* We should always be called with an IPX header, even if the
* application is sending zero-byte payloads.
*/
assert(data_size > 0);
uint64_t now = get_uticks();
bool queued = false;
coalesce_dest *cd = get_coalesce_by_dest(dest_net, dest_node, dest_socket);
if(cd != NULL)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_send_cd]));
bool should_coalesce = coalesce_register_send(cd, now);
if(should_coalesce && !cd->active)
{
IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket);
log_printf(LOG_WARNING, "High send rate to %s detected, coalescing future packets\n", dest_addr);
cd->active = true;
}
else if(!should_coalesce && cd->active)
{
IPX_STRING_ADDR(dest_addr, dest_net, dest_node, dest_socket);
log_printf(LOG_INFO, "Send rate to %s has dropped, no longer coalescing packets\n", dest_addr);
cd->active = false;
}
if(
should_coalesce
&& (cd->payload_used + data_size) > IPXWRAPPER_COALESCE_PACKET_MAX_SIZE
&& data_size < (IPXWRAPPER_COALESCE_PACKET_MAX_SIZE / 2))
{
coalesce_flush(cd);
}
if(should_coalesce && coalesce_add_data(cd, data, data_size, now))
{
queued = true;
}
if(cd->payload_used > 0 && (cd->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now)
{
coalesce_flush(cd);
}
}
if(!queued)
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_coalesce_send_immediate]));
if(r_sendto(private_socket, (const void*)(data), data_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0)
{
DWORD error = WSAGetLastError();
log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error));
return error;
}
else{
__atomic_add_fetch(&send_packets_udp, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&send_bytes_udp, data_size, __ATOMIC_RELAXED);
}
}
return ERROR_SUCCESS;
}
void coalesce_flush_waiting(void)
{
uint64_t now = get_uticks();
while(coalesce_pending != NULL
&& (coalesce_pending->payload_timestamp + IPXWRAPPER_COALESCE_PACKET_MAX_DELAY) <= now)
{
coalesce_flush(coalesce_pending);
}
}
void coalesce_cleanup(void)
{
while(coalesce_pending != NULL)
{
coalesce_flush(coalesce_pending);
}
while(coalesce_table != NULL)
{
coalesce_dest *cd = coalesce_table;
HASH_DEL(coalesce_table, cd);
free(cd);
}
}

59
src/coalesce.h Normal file
View File

@ -0,0 +1,59 @@
/* IPXWrapper - Packet coalescing
* Copyright (C) 2023 Daniel Collins <solemnwarning@solemnwarning.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef IPXWRAPPER_COALESCE_H
#define IPXWRAPPER_COALESCE_H
#include <stdbool.h>
#include <stdint.h>
#include <windows.h>
#include "addr.h"
/* For each destination IPX address, track the timestamp of the past n send
* operations, we use this to determine how spammy the application is being
* with sendto() calls.
*/
#define IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT 512
/* Start coalescing when the rate of send operations to a single IPX address
* reaches IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT packets over the past
* IPXWRAPPER_COALESCE_PACKET_START_THRESH microseconds.
*/
#define IPXWRAPPER_COALESCE_PACKET_START_THRESH 2500000 /* 2.5s */
/* Stop coalescing when the rate of send operations to a single IPX address
* falls back under IPXWRAPPER_COALESCE_PACKET_TRACK_COUNT over the past
* IPXWRAPPER_COALESCE_PACKET_STOP_THRESH microseconds.
*/
#define IPXWRAPPER_COALESCE_PACKET_STOP_THRESH 10000000 /* 10s */
/* Delay the transmission of a packet for coalescing by no more than
* IPXWRAPPER_COALESCE_PACKET_MAX_DELAY microseconds.
*/
#define IPXWRAPPER_COALESCE_PACKET_MAX_DELAY 20000 /* 20ms */
/* Combine outgoing IPX packets up to IPXWRAPPER_COALESCE_PACKET_MAX_SIZE
* bytes of data in the UDP payload.
*/
#define IPXWRAPPER_COALESCE_PACKET_MAX_SIZE 1384
DWORD coalesce_send(const void *data, size_t data_size, addr32_t dest_net, addr48_t dest_node, uint16_t dest_socket);
void coalesce_flush_waiting(void);
void coalesce_cleanup(void);
#endif /* !IPXWRAPPER_COALESCE_H */

View File

@ -37,6 +37,7 @@ main_config_t get_main_config(void)
config.dosbox_server_addr = NULL;
config.dosbox_server_port = 213;
config.dosbox_coalesce = false;
HKEY reg = reg_open_main(false);
@ -64,6 +65,7 @@ main_config_t get_main_config(void)
config.dosbox_server_addr = reg_get_string(reg, "dosbox_server_addr", "");
config.dosbox_server_port = reg_get_dword(reg, "dosbox_server_port", config.dosbox_server_port);
config.dosbox_coalesce = reg_get_dword(reg, "dosbox_coalesce", config.dosbox_coalesce);
/* Check for valid frame_type */
@ -96,7 +98,8 @@ bool set_main_config(const main_config_t *config)
&& reg_set_dword(reg, "profile", config->profile)
&& reg_set_string(reg, "dosbox_server_addr", config->dosbox_server_addr)
&& reg_set_dword(reg, "dosbox_server_port", config->dosbox_server_port);
&& reg_set_dword(reg, "dosbox_server_port", config->dosbox_server_port)
&& reg_set_dword(reg, "dosbox_coalesce", config->dosbox_coalesce);
reg_close(reg);

View File

@ -50,6 +50,7 @@ typedef struct main_config {
char *dosbox_server_addr;
uint16_t dosbox_server_port;
bool dosbox_coalesce;
enum ipx_log_level log_level;
bool profile;

View File

@ -53,6 +53,7 @@ enum {
ID_DOSBOX_SERVER_ADDR = 51,
ID_DOSBOX_SERVER_PORT = 52,
ID_DOSBOX_COALESCE = 53,
ID_DOSBOX_FW_EXCEPT = 55,
ID_IPXWRAPPER_PORT = 61,
@ -133,6 +134,7 @@ static struct {
HWND dosbox_server_addr;
HWND dosbox_server_port_lbl;
HWND dosbox_server_port;
HWND dosbox_coalesce;
HWND dosbox_fw_except;
HWND box_ipx_options;
@ -207,6 +209,21 @@ static LRESULT CALLBACK main_wproc(HWND window, UINT msg, WPARAM wp, LPARAM lp)
break;
}
case ID_DOSBOX_COALESCE: {
bool coalesce = get_checkbox(wh.dosbox_coalesce);
if(coalesce)
{
int result = MessageBox(NULL, "Packet coalescing requires all players to be using IPXWrapper 0.7.1 or later.\nAre you sure you want to enable it?", "Warning", MB_YESNO | MB_TASKMODAL | MB_ICONWARNING);
if(result != IDYES)
{
set_checkbox(wh.dosbox_coalesce, false);
}
}
break;
}
case ID_OPT_LOG_DEBUG: {
main_window_update();
break;
@ -570,6 +587,7 @@ static bool save_config()
}
main_config.dosbox_server_port = port;
main_config.dosbox_coalesce = get_checkbox(wh.dosbox_coalesce);
main_config.fw_except = get_checkbox(wh.dosbox_fw_except);
}
else if(main_config.encap_type == ENCAP_TYPE_PCAP)
@ -760,6 +778,7 @@ static void main_window_init()
wh.dosbox_server_port_lbl = create_STATIC(wh.box_dosbox_options, "DOSBox IPX server port");
wh.dosbox_server_port = create_child(wh.box_dosbox_options, "EDIT", "", WS_TABSTOP, WS_EX_CLIENTEDGE, ID_DOSBOX_SERVER_PORT);
wh.dosbox_coalesce = create_checkbox(wh.box_dosbox_options, "Coalesce packets when saturated", ID_DOSBOX_COALESCE);
wh.dosbox_fw_except = create_checkbox(wh.box_dosbox_options, "Automatically create Windows Firewall exceptions", ID_DOSBOX_FW_EXCEPT);
/* Initialise controls. */
@ -771,6 +790,7 @@ static void main_window_init()
sprintf(port_s, "%hu", main_config.dosbox_server_port);
SetWindowText(wh.dosbox_server_port, port_s);
set_checkbox(wh.dosbox_coalesce, main_config.dosbox_coalesce);
set_checkbox(wh.dosbox_fw_except, main_config.fw_except);
}
@ -938,6 +958,9 @@ static void main_window_init()
box_dosbox_options_y += text_h; /* Padding. */
MoveWindow(wh.dosbox_coalesce, BOX_SIDE_PAD, box_dosbox_options_y, width - 20, text_h, TRUE);
box_dosbox_options_y += text_h;
MoveWindow(wh.dosbox_fw_except, BOX_SIDE_PAD, box_dosbox_options_y, width - 20, text_h, TRUE);
box_dosbox_options_y += text_h;

View File

@ -57,11 +57,16 @@ struct FuncStats ipxwrapper_fstats[] = {
#undef FPROF_DECL
};
static uint64_t perf_counter_freq = 0;
const unsigned int ipxwrapper_fstats_size = sizeof(ipxwrapper_fstats) / sizeof(*ipxwrapper_fstats);
unsigned int send_packets = 0, send_bytes = 0; /* Sent from emulated socket */
unsigned int recv_packets = 0, recv_bytes = 0; /* Forwarded to emulated socket */
unsigned int send_packets_udp = 0, send_bytes_udp = 0; /* Sent over UDP transport */
unsigned int recv_packets_udp = 0, recv_bytes_udp = 0; /* Received over UDP transport */
static void init_cs(CRITICAL_SECTION *cs)
{
if(!InitializeCriticalSectionAndSpinCount(cs, 0x80000000))
@ -82,8 +87,17 @@ static void report_packet_stats(void)
unsigned int my_recv_packets = __atomic_exchange_n(&recv_packets, 0, __ATOMIC_RELAXED);
unsigned int my_recv_bytes = __atomic_exchange_n(&recv_bytes, 0, __ATOMIC_RELAXED);
unsigned int my_send_packets_udp = __atomic_exchange_n(&send_packets_udp, 0, __ATOMIC_RELAXED);
unsigned int my_send_bytes_udp = __atomic_exchange_n(&send_bytes_udp, 0, __ATOMIC_RELAXED);
unsigned int my_recv_packets_udp = __atomic_exchange_n(&recv_packets_udp, 0, __ATOMIC_RELAXED);
unsigned int my_recv_bytes_udp = __atomic_exchange_n(&recv_bytes_udp, 0, __ATOMIC_RELAXED);
log_printf(LOG_INFO, "IPX sockets sent %u packets (%u bytes)", my_send_packets, my_send_bytes);
log_printf(LOG_INFO, "IPX sockets received %u packets (%u bytes)", my_recv_packets, my_recv_bytes);
log_printf(LOG_INFO, "UDP sockets sent %u packets (%u bytes)", my_send_packets_udp, my_send_bytes_udp);
log_printf(LOG_INFO, "UDP sockets received %u packets (%u bytes)", my_recv_packets_udp, my_recv_bytes_udp);
}
static DWORD WINAPI prof_thread_main(LPVOID lpParameter)
@ -104,6 +118,12 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved)
{
if(fdwReason == DLL_PROCESS_ATTACH)
{
LARGE_INTEGER pc_freq;
if(QueryPerformanceFrequency(&pc_freq))
{
perf_counter_freq = pc_freq.QuadPart;
}
fprof_init(stub_fstats, NUM_STUBS);
fprof_init(ipxwrapper_fstats, ipxwrapper_fstats_size);
@ -111,6 +131,7 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved)
log_printf(LOG_INFO, "IPXWrapper %s", version_string);
log_printf(LOG_INFO, "Compiled at %s", compile_time);
log_printf(LOG_INFO, "Performance counter: %lld Hz", perf_counter_freq);
if(!getenv("SystemRoot"))
{
@ -308,3 +329,19 @@ uint64_t get_ticks(void)
return GetTickCount();
}
}
uint64_t get_uticks(void)
{
LARGE_INTEGER pc_tick;
if(perf_counter_freq == 0 || !QueryPerformanceCounter(&pc_tick))
{
/* Fall back to GetTickCount() if there is no high-resolution
* performance counter available.
*/
return get_ticks() * 1000;
}
else{
return pc_tick.QuadPart / (perf_counter_freq / 1000000);
}
}

View File

@ -150,6 +150,7 @@ struct ipx_packet {
} __attribute__((__packed__));
#define IPX_MAGIC_SPXLOOKUP 1
#define IPX_MAGIC_COALESCED 2
typedef struct spxlookup_req spxlookup_req_t;
@ -200,11 +201,15 @@ enum {
extern unsigned int send_packets, send_bytes; /* Sent from emulated socket */
extern unsigned int recv_packets, recv_bytes; /* Forwarded to emulated socket */
extern unsigned int send_packets_udp, send_bytes_udp; /* Sent over UDP transport */
extern unsigned int recv_packets_udp, recv_bytes_udp; /* Received over UDP transport */
ipx_socket *get_socket(SOCKET sockfd);
ipx_socket *get_socket_wait_for_ready(SOCKET sockfd, int timeout_ms);
void lock_sockets(void);
void unlock_sockets(void);
uint64_t get_ticks(void);
uint64_t get_uticks(void);
void add_self_to_firewall(void);

View File

@ -3,3 +3,19 @@ FPROF_DECL(_handle_udp_recv)
FPROF_DECL(_handle_dosbox_recv)
FPROF_DECL(_handle_pcap_frame)
FPROF_DECL(lock_sockets)
FPROF_DECL(ioctlsocket_recv_pump)
FPROF_DECL(ioctlsocket_accumulate)
FPROF_DECL(recv_pump_select)
FPROF_DECL(recv_pump_find_slot)
FPROF_DECL(recv_pump_recv)
FPROF_DECL(recv_pump_reclaim_socket)
FPROF_DECL(get_coalesce_by_dest)
FPROF_DECL(get_coalesce_by_dest_new)
FPROF_DECL(coalesce_register_send)
FPROF_DECL(coalesce_add_data)
FPROF_DECL(coalesce_add_data_init)
FPROF_DECL(coalesce_flush)
FPROF_DECL(coalesce_send)
FPROF_DECL(coalesce_send_cd)
FPROF_DECL(coalesce_send_immediate)

View File

@ -25,6 +25,7 @@
#include <Win32-Extensions.h>
#include "router.h"
#include "coalesce.h"
#include "common.h"
#include "funcprof.h"
#include "ipxwrapper.h"
@ -34,6 +35,9 @@
#define IPX_SOCK_ECHO 2
/* Maximum number of packets to dispatch per iteration of the router loop. */
#define MAX_RECV_PER_LOOP 50
static bool router_running = false;
static WSAEVENT router_event = WSA_INVALID_EVENT;
static HANDLE router_thread = NULL;
@ -197,6 +201,8 @@ void router_cleanup(void)
/* Release resources. */
coalesce_cleanup();
if(private_socket != -1)
{
closesocket(private_socket);
@ -554,32 +560,88 @@ static void _handle_dosbox_recv(novell_ipx_packet *packet, size_t packet_size)
return;
}
if(min_log_level <= LOG_DEBUG)
if(packet->src_socket == 0 && packet->type == IPX_MAGIC_COALESCED)
{
IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket);
IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket);
/* Sanity check the lengths of each inner packet. */
log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr);
log_printf(LOG_DEBUG, "Recieved coalesced packet (%zu bytes)", packet_size);
novell_ipx_packet *inner_packets = (novell_ipx_packet*)(packet->data);
size_t remaining_data = packet_size - sizeof(novell_ipx_packet);
novell_ipx_packet *end = (novell_ipx_packet*)(packet->data + remaining_data);
for(novell_ipx_packet *p = inner_packets; p < end;)
{
if(remaining_data < sizeof(novell_ipx_packet) || ntohs(p->length) > remaining_data)
{
/* Doesn't look valid. */
log_printf(LOG_ERROR, "Recieved invalid IPX packet from DOSBox server, ignoring");
return;
}
p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length));
}
/* Deliver the inner packets. */
for(novell_ipx_packet *p = inner_packets; p < end;)
{
if(min_log_level <= LOG_DEBUG)
{
IPX_STRING_ADDR(src_addr, addr32_in(p->src_net), addr48_in(p->src_node), p->src_socket);
IPX_STRING_ADDR(dest_addr, addr32_in(p->dest_net), addr48_in(p->dest_node), p->dest_socket);
log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr);
}
size_t data_size = ntohs(p->length) - sizeof(novell_ipx_packet);
_deliver_packet(
p->type,
addr32_in(p->src_net),
addr48_in(p->src_node),
p->src_socket,
addr32_in(p->dest_net),
addr48_in(p->dest_node),
p->dest_socket,
p->data,
data_size);
p = (novell_ipx_packet*)((char*)(p) + ntohs(p->length));
}
}
size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet);
_deliver_packet(
packet->type,
else{
if(min_log_level <= LOG_DEBUG)
{
IPX_STRING_ADDR(src_addr, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket);
IPX_STRING_ADDR(dest_addr, addr32_in(packet->dest_net), addr48_in(packet->dest_node), packet->dest_socket);
log_printf(LOG_DEBUG, "Recieved packet from %s for %s", src_addr, dest_addr);
}
addr32_in(packet->src_net),
addr48_in(packet->src_node),
packet->src_socket,
size_t data_size = ntohs(packet->length) - sizeof(novell_ipx_packet);
addr32_in(packet->dest_net),
addr48_in(packet->dest_node),
packet->dest_socket,
packet->data,
data_size);
_deliver_packet(
packet->type,
addr32_in(packet->src_net),
addr48_in(packet->src_node),
packet->src_socket,
addr32_in(packet->dest_net),
addr48_in(packet->dest_node),
packet->dest_socket,
packet->data,
data_size);
}
}
static bool _do_udp_recv(int fd)
static int _do_udp_recv(int fd)
{
struct sockaddr_in addr;
int addrlen = sizeof(addr);
@ -588,14 +650,19 @@ static bool _do_udp_recv(int fd)
int len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr*)(&addr), &addrlen);
if(len == -1)
{
if(WSAGetLastError() == WSAEWOULDBLOCK || WSAGetLastError() == WSAECONNRESET)
{
return true;
}
DWORD err = WSAGetLastError();
return false;
switch(err)
{
case WSAEWOULDBLOCK: return 0;
case WSAECONNRESET: return 1;
default: return -1;
}
}
__atomic_add_fetch(&recv_packets_udp, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&recv_bytes_udp, len, __ATOMIC_RELAXED);
if(ipx_encap_type == ENCAP_TYPE_DOSBOX)
{
if(addr.sin_family != dosbox_server_addr.sin_family
@ -603,7 +670,7 @@ static bool _do_udp_recv(int fd)
|| addr.sin_port != dosbox_server_addr.sin_port)
{
/* Ignore packet from wrong address. */
return true;
return 1;
}
if(dosbox_state == DOSBOX_REGISTERING)
@ -623,7 +690,7 @@ static bool _do_udp_recv(int fd)
_handle_udp_recv((ipx_packet*)(buf), len, addr);
}
return true;
return 1;
}
static void _handle_pcap_frame(u_char *user, const struct pcap_pkthdr *pkt_header, const u_char *pkt_data)
@ -766,6 +833,10 @@ static DWORD router_main(void *arg)
if(ipx_encap_type == ENCAP_TYPE_DOSBOX)
{
lock_sockets();
coalesce_flush_waiting();
unlock_sockets();
if(dosbox_state == DOSBOX_DISCONNECTED)
{
uint64_t now = get_ticks();
@ -819,14 +890,43 @@ static DWORD router_main(void *arg)
}
else if(ipx_encap_type == ENCAP_TYPE_DOSBOX)
{
if(!_do_udp_recv(private_socket))
int status = 0;
for(int i = 0; i < MAX_RECV_PER_LOOP; ++i)
{
status = _do_udp_recv(private_socket);
if(status <= 0)
{
break;
}
}
if(status < 0)
{
exit_status = 1;
break;
}
}
else{
if(!_do_udp_recv(shared_socket) || !_do_udp_recv(private_socket))
int status = 0;
for(int i = 0; i < MAX_RECV_PER_LOOP; ++i)
{
int s1 = _do_udp_recv(shared_socket);
int s2 = _do_udp_recv(private_socket);
if(s1 < 0 || s2 < 0)
{
status = -1;
break;
}
else if(s1 == 0 && s2 == 0)
{
break;
}
}
if(status < 0)
{
exit_status = 1;
break;

View File

@ -25,6 +25,7 @@
#include <wsnwlink.h>
#include "ipxwrapper.h"
#include "coalesce.h"
#include "common.h"
#include "interface.h"
#include "router.h"
@ -770,23 +771,18 @@ static BOOL reclaim_socket(ipx_socket *sockptr, int lookup_fd)
static int recv_pump(ipx_socket *sockptr, BOOL block)
{
int fd = sockptr->fd;
u_long available = -1;
if(!block)
{
fd_set read_fds;
FD_ZERO(&read_fds);
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_select]));
FD_SET(fd, &read_fds);
struct timeval timeout = { 0, 0 };
int r = r_select(-1, &read_fds, NULL, NULL, &timeout);
if(r == -1)
if(r_ioctlsocket(fd, FIONREAD, &available) != 0)
{
unlock_sockets();
return -1;
}
else if(r == 0)
else if(available == 0)
{
/* No packet waiting in underlying recv buffer. */
return 0;
@ -797,15 +793,19 @@ static int recv_pump(ipx_socket *sockptr, BOOL block)
int recv_slot = -1;
for(int i = 0; i < RECV_QUEUE_MAX_PACKETS; ++i)
{
if(queue->sizes[i] == IPX_RECV_QUEUE_FREE)
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_find_slot]));
for(int i = 0; i < RECV_QUEUE_MAX_PACKETS; ++i)
{
queue->sizes[i] = IPX_RECV_QUEUE_LOCKED;
recv_queue_adjust_refcount(queue, 1);
recv_slot = i;
break;
if(queue->sizes[i] == IPX_RECV_QUEUE_FREE)
{
queue->sizes[i] = IPX_RECV_QUEUE_LOCKED;
recv_queue_adjust_refcount(queue, 1);
recv_slot = i;
break;
}
}
}
@ -817,9 +817,19 @@ static int recv_pump(ipx_socket *sockptr, BOOL block)
unlock_sockets();
int r = r_recv(fd, (char*)(queue->data[recv_slot]), MAX_PKT_SIZE, 0);
int r;
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_recv]));
r = r_recv(fd, (char*)(queue->data[recv_slot]), MAX_PKT_SIZE, 0);
}
if(!reclaim_socket(sockptr, fd))
bool ok;
{
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_recv_pump_reclaim_socket]));
ok = reclaim_socket(sockptr, fd);
}
if(!ok)
{
/* The application closed the socket while we were in the recv() call.
* Just discard our handle, let the queue be destroyed.
@ -1375,7 +1385,14 @@ static int send_packet(const ipx_packet *packet, int len, struct sockaddr *addr,
log_printf(LOG_DEBUG, "Sending packet from %s to %s (%s:%hu)", src_addr, dest_addr, inet_ntoa(v4->sin_addr), ntohs(v4->sin_port));
}
return (r_sendto(private_socket, (char*)packet, len, 0, addr, addrlen) == len);
int r = r_sendto(private_socket, (char*)packet, len, 0, addr, addrlen);
if(r == len)
{
__atomic_add_fetch(&send_packets_udp, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&send_bytes_udp, len, __ATOMIC_RELAXED);
}
return r;
}
static DWORD ipx_send_packet(
@ -1527,14 +1544,9 @@ static DWORD ipx_send_packet(
memcpy(packet->data, data, data_size);
DWORD error = ERROR_SUCCESS;
if(r_sendto(private_socket, (const void*)(packet), packet_size, 0, (struct sockaddr*)(&dosbox_server_addr), sizeof(dosbox_server_addr)) < 0)
DWORD error = coalesce_send(packet, packet_size, dest_net, dest_node, dest_socket);
if(error == ERROR_SUCCESS)
{
error = WSAGetLastError();
log_printf(LOG_ERROR, "Error sending DOSBox IPX packet: %s", w32_error(error));
}
else{
__atomic_add_fetch(&send_packets, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&send_bytes, data_size, __ATOMIC_RELAXED);
}
@ -1810,28 +1822,36 @@ int PASCAL ioctlsocket(SOCKET fd, long cmd, u_long *argp)
if(cmd == FIONREAD && !(sock->flags & IPX_IS_SPX))
{
while(1)
{
int r = recv_pump(sock, FALSE);
if(r < 0)
{
/* Error in recv_pump() */
return -1;
}
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_ioctlsocket_recv_pump]));
if(r == 0)
while(1)
{
/* No more packets ready to read from underlying socket. */
break;
int r = recv_pump(sock, FALSE);
if(r < 0)
{
/* Error in recv_pump() */
return -1;
}
if(r == 0)
{
/* No more packets ready to read from underlying socket. */
break;
}
}
}
unsigned long accumulated_packet_data = 0;
for(int i = 0; i < sock->recv_queue->n_ready; ++i)
{
const ipx_packet *packet = (const ipx_packet*)(sock->recv_queue->data[ sock->recv_queue->ready[i] ]);
accumulated_packet_data += packet->size;
FPROF_RECORD_SCOPE(&(ipxwrapper_fstats[IPXWRAPPER_FSTATS_ioctlsocket_accumulate]));
for(int i = 0; i < sock->recv_queue->n_ready; ++i)
{
const ipx_packet *packet = (const ipx_packet*)(sock->recv_queue->data[ sock->recv_queue->ready[i] ]);
accumulated_packet_data += packet->size;
}
}
unlock_sockets();