From 15b53ae001ed47c6b454ccb9969d88960fe0f10b Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 15 Oct 2023 12:56:21 +0100 Subject: [PATCH] Fix handling of FIONREAD socket ioctl. When there are no packets waiting, ioctlsocket() should return success and set *argp to zero. When there are multiple packets waiting, ioctlsocket() should set *argp to the accumulated size of all received payloads. As far as I can tell, there's no way to inspect beyond the first packet queued on a socket, so we must receive and queue all packets from the socket to be able to know how much is available. The recv pipeline has been reworked to queue packets in this manner, and select() will try to emulate the normal behaviour we previously got for free by passing the socket fd straight through in readfds. I don't *think* WSAAsyncSelect() needs any changes since the socket will still raise window messages as appropriate before we have any opportunity to shunt the packet into the receive queue. --- src/ipxwrapper.h | 52 ++++++ src/ipxwrapper_stubs.txt | 2 +- src/winsock.c | 336 ++++++++++++++++++++++++++++++++------- 3 files changed, 332 insertions(+), 58 deletions(-) diff --git a/src/ipxwrapper.h b/src/ipxwrapper.h index ef02c4e..91fe6ab 100644 --- a/src/ipxwrapper.h +++ b/src/ipxwrapper.h @@ -60,6 +60,55 @@ typedef struct ipx_socket ipx_socket; typedef struct ipx_packet ipx_packet; +#define RECV_QUEUE_MAX_PACKETS 32 + +#define IPX_RECV_QUEUE_FREE -1 +#define IPX_RECV_QUEUE_LOCKED -2 + +/* Any AF_IPX IPX socket has an associated recv_queue. + * + * When a recv_pump() operation is running, the sockets lock has to be released + * in case the recv() blocks, which means the socket could be closed before it + * regains the lock. + * + * An ipx_recv_queue isn't destroyed until the refcount reaches zero. The + * ipx_socket holds one reference and each in-progress recv_pump() also holds a + * reference while the sockets lock isn't held. + * + * Access to the refcount is protected by refcount_lock. + * + * data[] holds an array of buffers for each queued packet, the status and size + * of which is indicated by the sizes[] array. + * + * If sizes[x] is IPX_RECV_QUEUE_FREE, the buffer is available to be claimed by + * a recv_pump() operation, which then sets it to IPX_RECV_QUEUE_LOCKED until + * it completes, which prevents a recv_pump() in another thread from trying to + * use the same receive buffer. Once a packet is read in, sizes[x] is set to + * the size of the packet and x is added to the end of the ready[] array. + * + * When a read is requested, the packet will be read from the data[] index + * stored in ready[0], and unless MSG_PEEK was used, that slot will then be + * released (sizes[x] set to IPX_RECV_QUEUE_FREE) and any subsequent slots in + * read[] will be advanced for the next read to pick up from ready[0]. + * + * Access to the ready, n_ready and sizes members is only permitted when a + * thread holds the main sockets lock. +*/ + +struct ipx_recv_queue +{ + CRITICAL_SECTION refcount_lock; + int refcount; + + int ready[RECV_QUEUE_MAX_PACKETS]; + int n_ready; + + unsigned char data[RECV_QUEUE_MAX_PACKETS][MAX_PKT_SIZE]; + int sizes[RECV_QUEUE_MAX_PACKETS]; +}; + +typedef struct ipx_recv_queue ipx_recv_queue; + struct ipx_socket { SOCKET fd; @@ -79,6 +128,8 @@ struct ipx_socket { /* Address used with connect call, only set when IPX_CONNECTED is */ struct sockaddr_ipx remote_addr; + struct ipx_recv_queue *recv_queue; + UT_hash_handle hh; }; @@ -165,5 +216,6 @@ int PASCAL r_getpeername(SOCKET fd, struct sockaddr *addr, int *addrlen); int PASCAL r_listen(SOCKET s, int backlog); SOCKET PASCAL r_accept(SOCKET s, struct sockaddr *addr, int *addrlen); int PASCAL r_WSAAsyncSelect(SOCKET s, HWND hWnd, unsigned int wMsg, long lEvent); +int WSAAPI r_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const PTIMEVAL timeout); #endif /* !IPXWRAPPER_H */ diff --git a/src/ipxwrapper_stubs.txt b/src/ipxwrapper_stubs.txt index a1eea56..e4acacb 100644 --- a/src/ipxwrapper_stubs.txt +++ b/src/ipxwrapper_stubs.txt @@ -7,7 +7,7 @@ htonl:4 ntohl:4 htons:4 ntohs:4 -select:4 +r_select:4 r_listen:4 r_accept:4 WSACreateEvent:4 diff --git a/src/winsock.c b/src/winsock.c index 2c920f1..e6c17c5 100644 --- a/src/winsock.c +++ b/src/winsock.c @@ -303,6 +303,26 @@ INT WINAPI WSHEnumProtocols(LPINT protocols, LPWSTR ign, LPVOID buf, LPDWORD bsp return do_EnumProtocols(protocols, buf, bsptr, false); } +static int recv_queue_adjust_refcount(ipx_recv_queue *recv_queue, int adj) +{ + EnterCriticalSection(&(recv_queue->refcount_lock)); + int new_refcount = (recv_queue->refcount += adj); + LeaveCriticalSection(&(recv_queue->refcount_lock)); + + return new_refcount; +} + +static void release_recv_queue(ipx_recv_queue *recv_queue) +{ + int new_refcount = recv_queue_adjust_refcount(recv_queue, -1); + + if(new_refcount == 0) + { + DeleteCriticalSection(&(recv_queue->refcount_lock)); + free(recv_queue); + } +} + SOCKET WSAAPI socket(int af, int type, int protocol) { log_printf(LOG_DEBUG, "socket(%d, %d, %d)", af, type, protocol); @@ -318,10 +338,38 @@ SOCKET WSAAPI socket(int af, int type, int protocol) return -1; } + ipx_recv_queue *recv_queue = malloc(sizeof(ipx_recv_queue)); + if(recv_queue == NULL) + { + free(nsock); + + WSASetLastError(ERROR_OUTOFMEMORY); + return -1; + } + + if(!InitializeCriticalSectionAndSpinCount(&(recv_queue->refcount_lock), 0x80000000)) + { + log_printf(LOG_ERROR, "Failed to initialise critical section: %s", w32_error(GetLastError())); + WSASetLastError(GetLastError()); + + free(recv_queue); + free(nsock); + return -1; + } + + recv_queue->refcount = 1; + recv_queue->n_ready = 0; + + for(int i = 0; i < RECV_QUEUE_MAX_PACKETS; ++i) + { + recv_queue->sizes[i] = IPX_RECV_QUEUE_FREE; + } + if((nsock->fd = r_socket(AF_INET, SOCK_DGRAM, 0)) == -1) { log_printf(LOG_ERROR, "Cannot create UDP socket: %s", w32_error(WSAGetLastError())); + release_recv_queue(recv_queue); free(nsock); return -1; } @@ -329,6 +377,8 @@ SOCKET WSAAPI socket(int af, int type, int protocol) nsock->flags = IPX_SEND | IPX_RECV | IPX_RECV_BCAST; nsock->s_ptype = (protocol ? protocol - NSPROTO_IPX : 0); + nsock->recv_queue = recv_queue; + log_printf(LOG_INFO, "IPX socket created (fd = %d)", nsock->fd); lock_sockets(); @@ -384,6 +434,8 @@ SOCKET WSAAPI socket(int af, int type, int protocol) nsock->flags |= IPX_IS_SPXII; } + nsock->recv_queue = NULL; + log_printf(LOG_INFO, "SPX socket created (fd = %d)", nsock->fd); lock_sockets(); @@ -425,6 +477,11 @@ int WSAAPI closesocket(SOCKET sockfd) log_printf(LOG_INFO, "Socket %d (%s) closed", sockfd, (sock->flags & IPX_IS_SPX ? "SPX" : "IPX")); + if(sock->recv_queue != NULL) + { + release_recv_queue(sock->recv_queue); + } + if(sock->flags & IPX_BOUND) { CloseHandle(sock->sock_mut); @@ -687,6 +744,121 @@ int WSAAPI getsockname(SOCKET fd, struct sockaddr *addr, int *addrlen) } } +static BOOL reclaim_socket(ipx_socket *sockptr, int lookup_fd) +{ + /* Reclaim the lock, ensure the socket hasn't been + * closed by the application (naughty!) while we were + * waiting. + */ + + ipx_socket *reclaim_sock = get_socket(lookup_fd); + if(sockptr != reclaim_sock) + { + log_printf(LOG_DEBUG, "Application closed socket while inside a WinSock call!"); + + if(reclaim_sock) + { + unlock_sockets(); + } + + return FALSE; + } + + return TRUE; +} + +static int recv_pump(ipx_socket *sockptr, BOOL block) +{ + int fd = sockptr->fd; + + if(!block) + { + fd_set read_fds; + FD_ZERO(&read_fds); + + FD_SET(fd, &read_fds); + + struct timeval timeout = { 0, 0 }; + + int r = r_select(-1, &read_fds, NULL, NULL, &timeout); + if(r == -1) + { + unlock_sockets(); + return -1; + } + else if(r == 0) + { + /* No packet waiting in underlying recv buffer. */ + return 0; + } + } + + ipx_recv_queue *queue = sockptr->recv_queue; + + int recv_slot = -1; + + for(int i = 0; i < RECV_QUEUE_MAX_PACKETS; ++i) + { + if(queue->sizes[i] == IPX_RECV_QUEUE_FREE) + { + queue->sizes[i] = IPX_RECV_QUEUE_LOCKED; + recv_queue_adjust_refcount(queue, 1); + + recv_slot = i; + break; + } + } + + if(recv_slot < 0) + { + /* No free recv_queue slots. */ + return 0; + } + + unlock_sockets(); + + int r = r_recv(fd, (char*)(queue->data[recv_slot]), MAX_PKT_SIZE, 0); + + if(!reclaim_socket(sockptr, fd)) + { + /* The application closed the socket while we were in the recv() call. + * Just discard our handle, let the queue be destroyed. + */ + + release_recv_queue(queue); + WSASetLastError(WSAENOTSOCK); + return -1; + } + + if(r == -1) + { + queue->sizes[recv_slot] = IPX_RECV_QUEUE_FREE; + release_recv_queue(queue); + unlock_sockets(); + return -1; + } + + struct ipx_packet *packet = (struct ipx_packet*)(queue->data[recv_slot]); + + if(r < sizeof(ipx_packet) - 1 || r != packet->size + sizeof(ipx_packet) - 1) + { + log_printf(LOG_ERROR, "Invalid packet received on loopback port!"); + + queue->sizes[recv_slot] = IPX_RECV_QUEUE_FREE; + release_recv_queue(queue); + + WSASetLastError(WSAEWOULDBLOCK); + unlock_sockets(); + return -1; + } + + queue->sizes[recv_slot] = r; + queue->ready[queue->n_ready] = recv_slot; + ++(queue->n_ready); + + return 1; +} + /* Recieve a packet from an IPX socket * addr must be NULL or a region of memory big enough for a sockaddr_ipx * @@ -694,40 +866,31 @@ int WSAAPI getsockname(SOCKET fd, struct sockaddr *addr, int *addrlen) * The size of the packet will be returned on success, even if it was truncated */ static int recv_packet(ipx_socket *sockptr, char *buf, int bufsize, int flags, struct sockaddr_ipx_ext *addr, int addrlen) { - SOCKET fd = sockptr->fd; - int is_bound = sockptr->flags & IPX_BOUND; - int extended_addr = sockptr->flags & IPX_EXT_ADDR; - - unlock_sockets(); - - if(!is_bound) { + if(!(sockptr->flags & IPX_BOUND)) + { + unlock_sockets(); + WSASetLastError(WSAEINVAL); return -1; } - char *recvbuf = malloc(MAX_PKT_SIZE); - if(!recvbuf) { - WSASetLastError(ERROR_OUTOFMEMORY); - return -1; - } - - struct ipx_packet *packet = (struct ipx_packet*)(recvbuf); - - int rval = r_recv(fd, recvbuf, MAX_PKT_SIZE, flags); - if(rval == -1) { - free(recvbuf); - return -1; - } - - if(rval < sizeof(ipx_packet) - 1 || rval != packet->size + sizeof(ipx_packet) - 1) + /* Loop here in case some crazy application does concurrent recv() calls + * and they race between putting packets on the queue and handling them. + */ + while(sockptr->recv_queue->n_ready < 1) { - log_printf(LOG_ERROR, "Invalid packet received on loopback port!"); - - free(recvbuf); - WSASetLastError(WSAEWOULDBLOCK); - return -1; + if(recv_pump(sockptr, TRUE) < 0) + { + /* Socket closed or recv() error. */ + return -1; + } } + int slot = sockptr->recv_queue->ready[0]; + + struct ipx_packet *packet = (struct ipx_packet*)(sockptr->recv_queue->data[slot]); + assert(sockptr->recv_queue->sizes[slot] >= 0); + if(min_log_level <= LOG_DEBUG) { IPX_STRING_ADDR(addr_s, addr32_in(packet->src_net), addr48_in(packet->src_node), packet->src_socket); @@ -741,7 +904,7 @@ static int recv_packet(ipx_socket *sockptr, char *buf, int bufsize, int flags, s memcpy(addr->sa_nodenum, packet->src_node, 6); addr->sa_socket = packet->src_socket; - if(extended_addr) { + if(sockptr->flags & IPX_EXT_ADDR) { if(addrlen >= sizeof(struct sockaddr_ipx_ext)) { addr->sa_ptype = packet->ptype; addr->sa_flags = 0; @@ -774,8 +937,17 @@ static int recv_packet(ipx_socket *sockptr, char *buf, int bufsize, int flags, s } memcpy(buf, packet->data, packet->size <= bufsize ? packet->size : bufsize); - rval = packet->size; - free(recvbuf); + int rval = packet->size; + + if((flags & MSG_PEEK) == 0) + { + sockptr->recv_queue->sizes[slot] = IPX_RECV_QUEUE_FREE; + + --(sockptr->recv_queue->n_ready); + memmove(&(sockptr->recv_queue->ready[0]), &(sockptr->recv_queue->ready[1]), (sockptr->recv_queue->n_ready * sizeof(int))); + } + + unlock_sockets(); return rval; } @@ -1625,39 +1797,33 @@ int PASCAL ioctlsocket(SOCKET fd, long cmd, u_long *argp) if(cmd == FIONREAD && !(sock->flags & IPX_IS_SPX)) { - /* Test to see if data is waiting. */ - - fd_set fdset; - struct timeval tv = {0,0}; - - FD_ZERO(&fdset); - FD_SET(sock->fd, &fdset); - - int r = select(1, &fdset, NULL, NULL, &tv); - - if(r == -1) + while(1) { - unlock_sockets(); - return -1; - } - else if(r == 0) - { - *(unsigned long*)(argp) = 0; + int r = recv_pump(sock, FALSE); + if(r < 0) + { + /* Error in recv_pump() */ + return -1; + } - unlock_sockets(); - return -1; + if(r == 0) + { + /* No more packets ready to read from underlying socket. */ + break; + } } - /* Get the size of the packet. */ + unsigned long accumulated_packet_data = 0; - char tmp_buf; - - if((r = recv_packet(sock, &tmp_buf, 1, MSG_PEEK, NULL, 0)) == -1) + for(int i = 0; i < sock->recv_queue->n_ready; ++i) { - return -1; + const ipx_packet *packet = (const ipx_packet*)(sock->recv_queue->data[ sock->recv_queue->ready[i] ]); + accumulated_packet_data += packet->size; } - *(unsigned long*)(argp) = r; + unlock_sockets(); + + *(unsigned long*)(argp) = accumulated_packet_data; return 0; } @@ -1898,7 +2064,7 @@ static int _connect_spx(ipx_socket *sock, struct sockaddr_ipx *ipxaddr) .tv_usec = ((wait_until - now) % 1000) * 1000 }; - if(select(1, &fdset, NULL, NULL, &tv) == -1) + if(r_select(1, &fdset, NULL, NULL, &tv) == -1) { closesocket(lookup_fd); free(packet); @@ -2011,7 +2177,7 @@ static int _connect_spx(ipx_socket *sock, struct sockaddr_ipx *ipxaddr) FD_ZERO(&e_fdset); FD_SET(sock->fd, &e_fdset); - if(select(1, NULL, &w_fdset, &e_fdset, NULL) == 1 && FD_ISSET(sock->fd, &w_fdset)) + if(r_select(1, NULL, &w_fdset, &e_fdset, NULL) == 1 && FD_ISSET(sock->fd, &w_fdset)) { goto CONNECTED; } @@ -2493,3 +2659,59 @@ int PASCAL WSAAsyncSelect(SOCKET s, HWND hWnd, unsigned int wMsg, long lEvent) return r_WSAAsyncSelect(s, hWnd, wMsg, lEvent); } + +int WSAAPI select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const PTIMEVAL timeout) +{ + const struct timeval TIMEOUT_IMMEDIATE = { 0, 0 }; + const struct timeval *use_timeout = timeout; + + fd_set force_read_fds; + FD_ZERO(&force_read_fds); + + for(unsigned int i = 0; i < readfds->fd_count; ++i) + { + int fd = readfds->fd_array[i]; + + ipx_socket *sockptr = get_socket(fd); + if(sockptr != NULL) + { + if(sockptr->flags & IPX_IS_SPX) + { + unlock_sockets(); + continue; + } + + if(sockptr->recv_queue->n_ready > 0) + { + /* There is data in the receive queue for this socket, but the + * underlying socket isn't necessarily readable, so we reduce the + * select() timeout to zero to ensure it returns immediately and + * inject this fd back into readfds at the end if necessary. + */ + + FD_SET(fd, &force_read_fds); + use_timeout = &TIMEOUT_IMMEDIATE; + } + + unlock_sockets(); + } + } + + int r = r_select(nfds, readfds, writefds, exceptfds, (const PTIMEVAL)(use_timeout)); + + if(r >= 0) + { + for(unsigned int i = 0; i < force_read_fds.fd_count; ++i) + { + int fd = force_read_fds.fd_array[i]; + + if(!FD_ISSET(fd, readfds)) + { + FD_SET(fd, readfds); + ++r; + } + } + } + + return r; +}