diff --git a/build.bat b/build.bat index e831dfe..2562457 100644 --- a/build.bat +++ b/build.bat @@ -13,6 +13,7 @@ SET CPP_OBJS=^ src/COMAPIException.obj^ src/DirectPlay8Address.obj^ src/DirectPlay8Peer.obj^ + src/EventObject.obj^ src/HandleHandlingPool.obj^ src/HostEnumerator.obj^ src/network.obj^ @@ -32,6 +33,7 @@ SET TEST_OBJS=^ src/COMAPIException.obj^ src/DirectPlay8Address.obj^ src/DirectPlay8Peer.obj^ + src/EventObject.obj^ src/HandleHandlingPool.obj^ src/HostEnumerator.obj^ src/network.obj^ diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index 0ad56a2..a2c6cf9 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -20,19 +20,21 @@ fprintf(stderr, "Unimplemented method: " fmt "\n", ## __VA_ARGS__); \ return E_NOTIMPL; +#define THREADS_PER_POOL 4 +#define MAX_HANDLES_PER_POOL 16 + DirectPlay8Peer::DirectPlay8Peer(std::atomic *global_refcount): global_refcount(global_refcount), local_refcount(0), state(STATE_NEW), udp_socket(-1), listener_socket(-1), - discovery_socket(-1) + discovery_socket(-1), + worker_pool(THREADS_PER_POOL, MAX_HANDLES_PER_POOL), + udp_sq(udp_socket_event) { - io_event = CreateEvent(NULL, FALSE, FALSE, NULL); - if(io_event == NULL) - { - throw std::runtime_error("Cannot create event object"); - } + worker_pool.add_handle(udp_socket_event, [this]() { handle_udp_socket_event(); }); + worker_pool.add_handle(other_socket_event, [this]() { handle_other_socket_event(); }); AddRef(); } @@ -44,7 +46,8 @@ DirectPlay8Peer::~DirectPlay8Peer() Close(0); } - CloseHandle(io_event); + worker_pool.remove_handle(other_socket_event); + worker_pool.remove_handle(udp_socket_event); } HRESULT DirectPlay8Peer::QueryInterface(REFIID riid, void **ppvObject) @@ -321,8 +324,8 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir } } - if(WSAEventSelect(udp_socket, io_event, FD_READ | FD_WRITE) != 0 - || WSAEventSelect(listener_socket, io_event, FD_ACCEPT) != 0) + if(WSAEventSelect(udp_socket, udp_socket_event, FD_READ | FD_WRITE) != 0 + || WSAEventSelect(listener_socket, other_socket_event, FD_ACCEPT) != 0) { return DPNERR_GENERIC; } @@ -332,15 +335,12 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir discovery_socket = create_discovery_socket(); if(discovery_socket == -1 - || WSAEventSelect(discovery_socket, io_event, FD_READ) != 0) + || WSAEventSelect(discovery_socket, other_socket_event, FD_READ) != 0) { return DPNERR_GENERIC; } } - io_run = true; - io_thread = std::thread(&DirectPlay8Peer::io_main, this); - state = STATE_HOSTING; return S_OK; @@ -512,14 +512,6 @@ HRESULT DirectPlay8Peer::Close(CONST DWORD dwFlags) return DPNERR_UNINITIALIZED; } - if(state == STATE_HOSTING || state == STATE_CONNECTED) - { - io_run = false; - SetEvent(io_event); - - io_thread.join(); - } - CancelAsyncOperation(0, DPNCANCEL_ALL_OPERATIONS); /* TODO: Wait properly. */ @@ -688,80 +680,93 @@ HRESULT DirectPlay8Peer::TerminateSession(void* CONST pvTerminateData, CONST DWO UNIMPLEMENTED("DirectPlay8Peer::TerminateSession"); } -void DirectPlay8Peer::io_main() -{ - while(io_run) - { - WaitForSingleObject(io_event, INFINITE); - - io_udp_recv(udp_socket); - io_udp_send(udp_socket, udp_sq); - - if(discovery_socket != -1) - { - io_udp_recv(discovery_socket); - io_udp_send(discovery_socket, discovery_sq); - } - - io_listener_accept(listener_socket); - - std::unique_lock l(lock); - - for(auto p = peers.begin(); p != peers.end();) - { - auto next_p = std::next(p); - - if(!io_tcp_recv(&*p) || !io_tcp_send(&*p)) - { - /* TODO: Complete outstanding sends (failed), drop player */ - closesocket(p->sock); - peers.erase(p); - } - - p = next_p; - } - } -} - -void DirectPlay8Peer::io_udp_recv(int sock) +void DirectPlay8Peer::handle_udp_socket_event() { struct sockaddr_in from_addr; int fa_len = sizeof(from_addr); - int r = recvfrom(sock, (char*)(recv_buf), sizeof(recv_buf), 0, (struct sockaddr*)(&from_addr), &fa_len); - if(r <= 0) - { - return; - } + unsigned char recv_buf[MAX_PACKET_SIZE]; - /* Process message */ - std::unique_ptr pd; - - try { - pd.reset(new PacketDeserialiser(recv_buf, r)); - } - catch(const PacketDeserialiser::Error &e) + int r = recvfrom(udp_socket, (char*)(recv_buf), sizeof(recv_buf), 0, (struct sockaddr*)(&from_addr), &fa_len); + if(r > 0) { - /* Malformed packet received */ - return; - } - - switch(pd->packet_type()) - { - case DPLITE_MSGID_HOST_ENUM_REQUEST: + /* Process message */ + std::unique_ptr pd; + + try { + pd.reset(new PacketDeserialiser(recv_buf, r)); + } + catch(const PacketDeserialiser::Error &e) { - if(state == STATE_HOSTING) - { - handle_host_enum_request(*pd, &from_addr); - } - - break; + /* Malformed packet received */ + return; } - default: - /* TODO: Log "unrecognised packet type" */ - break; + switch(pd->packet_type()) + { + case DPLITE_MSGID_HOST_ENUM_REQUEST: + { + if(state == STATE_HOSTING) + { + handle_host_enum_request(*pd, &from_addr); + } + + break; + } + + default: + /* TODO: Log "unrecognised packet type" */ + break; + } } + + io_udp_send(udp_socket, udp_sq); +} + +void DirectPlay8Peer::handle_other_socket_event() +{ + if(discovery_socket != -1) + { + struct sockaddr_in from_addr; + int fa_len = sizeof(from_addr); + + unsigned char recv_buf[MAX_PACKET_SIZE]; + + int r = recvfrom(discovery_socket, (char*)(recv_buf), sizeof(recv_buf), 0, (struct sockaddr*)(&from_addr), &fa_len); + if(r > 0) + { + /* Process message */ + std::unique_ptr pd; + + try { + pd.reset(new PacketDeserialiser(recv_buf, r)); + } + catch(const PacketDeserialiser::Error &e) + { + /* Malformed packet received */ + return; + } + + switch(pd->packet_type()) + { + case DPLITE_MSGID_HOST_ENUM_REQUEST: + { + if(state == STATE_HOSTING) + { + handle_host_enum_request(*pd, &from_addr); + } + + break; + } + + default: + /* TODO: Log "unrecognised packet type" */ + break; + } + } + } + + io_listener_accept(listener_socket); } void DirectPlay8Peer::io_udp_send(int sock, SendQueue &sq) diff --git a/src/DirectPlay8Peer.hpp b/src/DirectPlay8Peer.hpp index daaaf18..1d1a3ec 100644 --- a/src/DirectPlay8Peer.hpp +++ b/src/DirectPlay8Peer.hpp @@ -11,6 +11,8 @@ #include #include "AsyncHandleAllocator.hpp" +#include "EventObject.hpp" +#include "HandleHandlingPool.hpp" #include "HostEnumerator.hpp" #include "network.hpp" #include "packet.hpp" @@ -47,14 +49,12 @@ class DirectPlay8Peer: public IDirectPlay8Peer int listener_socket; /* TCP listener socket. */ int discovery_socket; /* Discovery UDP sockets, RECIEVES broadcasts only. */ - unsigned char recv_buf[MAX_PACKET_SIZE]; + EventObject udp_socket_event; + EventObject other_socket_event; + + HandleHandlingPool worker_pool; SendQueue udp_sq; - SendQueue discovery_sq; - - HANDLE io_event; - std::thread io_thread; - std::atomic io_run; struct Player { @@ -95,6 +95,8 @@ class DirectPlay8Peer: public IDirectPlay8Peer unsigned char recv_buf[MAX_PACKET_SIZE]; size_t recv_buf_cur; + EventObject event; + SendQueue sq; SendQueue::Buffer *sqb; @@ -102,7 +104,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer size_t send_remain; Player(int sock, uint32_t ip, uint16_t port): - state(PS_INIT), sock(sock), ip(ip), port(port), recv_buf_cur(0), send_buf(NULL) {} + state(PS_INIT), sock(sock), ip(ip), port(port), recv_buf_cur(0), sq(event), send_buf(NULL) {} }; std::list peers; @@ -118,7 +120,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer void io_main(); - void io_udp_recv(int sock); + void handle_udp_socket_event(); + void handle_other_socket_event(); + void io_udp_send(int sock, SendQueue &q); void io_listener_accept(int sock); bool io_tcp_recv(Player *player); diff --git a/src/EventObject.cpp b/src/EventObject.cpp new file mode 100644 index 0000000..9827aad --- /dev/null +++ b/src/EventObject.cpp @@ -0,0 +1,24 @@ +#include +#include +#include + +#include "EventObject.hpp" + +EventObject::EventObject(BOOL bManualReset, BOOL bInitialState) +{ + handle = CreateEvent(NULL, bManualReset, bInitialState, NULL); + if(handle == NULL) + { + throw std::runtime_error("Unable to create event object"); + } +} + +EventObject::~EventObject() +{ + CloseHandle(handle); +} + +EventObject::operator HANDLE() const +{ + return handle; +} diff --git a/src/EventObject.hpp b/src/EventObject.hpp new file mode 100644 index 0000000..76ae335 --- /dev/null +++ b/src/EventObject.hpp @@ -0,0 +1,19 @@ +#ifndef DPLITE_EVENTOBJECT_HPP +#define DPLITE_EVENTOBJECT_HPP + +#include +#include + +class EventObject +{ + private: + HANDLE handle; + + public: + EventObject(BOOL bManualReset = FALSE, BOOL bInitialState = FALSE); + ~EventObject(); + + operator HANDLE() const; +}; + +#endif /* !DPLITE_EVENTOBJECT_HPP */ diff --git a/src/SendQueue.cpp b/src/SendQueue.cpp index d495104..4fdc3af 100644 --- a/src/SendQueue.cpp +++ b/src/SendQueue.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include "SendQueue.hpp" @@ -18,6 +20,8 @@ void SendQueue::send(SendPriority priority, Buffer *buffer) high_queue.push_back(buffer); break; } + + SetEvent(signal_on_queue); } SendQueue::Buffer *SendQueue::get_next() diff --git a/src/SendQueue.hpp b/src/SendQueue.hpp index 3a16ff9..62072aa 100644 --- a/src/SendQueue.hpp +++ b/src/SendQueue.hpp @@ -8,6 +8,7 @@ #include #include #include +#include class SendQueue { @@ -45,8 +46,10 @@ class SendQueue Buffer *current; + HANDLE signal_on_queue; + public: - SendQueue(): current(NULL) {} + SendQueue(HANDLE signal_on_queue): current(NULL), signal_on_queue(signal_on_queue) {} /* No copy c'tor. */ SendQueue(const SendQueue &src) = delete; diff --git a/tests/HandleHandlingPool.cpp b/tests/HandleHandlingPool.cpp index f877f4e..2f5affd 100644 --- a/tests/HandleHandlingPool.cpp +++ b/tests/HandleHandlingPool.cpp @@ -4,32 +4,9 @@ #include #include +#include "../src/EventObject.hpp" #include "../src/HandleHandlingPool.hpp" -struct EventObject -{ - HANDLE handle; - - EventObject(BOOL bManualReset = FALSE, BOOL bInitialState = FALSE) - { - handle = CreateEvent(NULL, bManualReset, bInitialState, NULL); - if(handle == NULL) - { - throw std::runtime_error("Unable to create event object"); - } - } - - ~EventObject() - { - CloseHandle(handle); - } - - operator HANDLE() const - { - return handle; - } -}; - TEST(HandleHandlingPool, SingleThreadBasic) { HandleHandlingPool pool(1, 32);