1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Use HandleHandlingPool to run DirectPlay8Peer worker threads.

This commit is contained in:
Daniel Collins 2018-09-15 23:24:54 +01:00
parent f06f219d61
commit da3cd1a55f
8 changed files with 156 additions and 118 deletions

View File

@ -13,6 +13,7 @@ SET CPP_OBJS=^
src/COMAPIException.obj^
src/DirectPlay8Address.obj^
src/DirectPlay8Peer.obj^
src/EventObject.obj^
src/HandleHandlingPool.obj^
src/HostEnumerator.obj^
src/network.obj^
@ -32,6 +33,7 @@ SET TEST_OBJS=^
src/COMAPIException.obj^
src/DirectPlay8Address.obj^
src/DirectPlay8Peer.obj^
src/EventObject.obj^
src/HandleHandlingPool.obj^
src/HostEnumerator.obj^
src/network.obj^

View File

@ -20,19 +20,21 @@
fprintf(stderr, "Unimplemented method: " fmt "\n", ## __VA_ARGS__); \
return E_NOTIMPL;
#define THREADS_PER_POOL 4
#define MAX_HANDLES_PER_POOL 16
DirectPlay8Peer::DirectPlay8Peer(std::atomic<unsigned int> *global_refcount):
global_refcount(global_refcount),
local_refcount(0),
state(STATE_NEW),
udp_socket(-1),
listener_socket(-1),
discovery_socket(-1)
discovery_socket(-1),
worker_pool(THREADS_PER_POOL, MAX_HANDLES_PER_POOL),
udp_sq(udp_socket_event)
{
io_event = CreateEvent(NULL, FALSE, FALSE, NULL);
if(io_event == NULL)
{
throw std::runtime_error("Cannot create event object");
}
worker_pool.add_handle(udp_socket_event, [this]() { handle_udp_socket_event(); });
worker_pool.add_handle(other_socket_event, [this]() { handle_other_socket_event(); });
AddRef();
}
@ -44,7 +46,8 @@ DirectPlay8Peer::~DirectPlay8Peer()
Close(0);
}
CloseHandle(io_event);
worker_pool.remove_handle(other_socket_event);
worker_pool.remove_handle(udp_socket_event);
}
HRESULT DirectPlay8Peer::QueryInterface(REFIID riid, void **ppvObject)
@ -321,8 +324,8 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir
}
}
if(WSAEventSelect(udp_socket, io_event, FD_READ | FD_WRITE) != 0
|| WSAEventSelect(listener_socket, io_event, FD_ACCEPT) != 0)
if(WSAEventSelect(udp_socket, udp_socket_event, FD_READ | FD_WRITE) != 0
|| WSAEventSelect(listener_socket, other_socket_event, FD_ACCEPT) != 0)
{
return DPNERR_GENERIC;
}
@ -332,15 +335,12 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir
discovery_socket = create_discovery_socket();
if(discovery_socket == -1
|| WSAEventSelect(discovery_socket, io_event, FD_READ) != 0)
|| WSAEventSelect(discovery_socket, other_socket_event, FD_READ) != 0)
{
return DPNERR_GENERIC;
}
}
io_run = true;
io_thread = std::thread(&DirectPlay8Peer::io_main, this);
state = STATE_HOSTING;
return S_OK;
@ -512,14 +512,6 @@ HRESULT DirectPlay8Peer::Close(CONST DWORD dwFlags)
return DPNERR_UNINITIALIZED;
}
if(state == STATE_HOSTING || state == STATE_CONNECTED)
{
io_run = false;
SetEvent(io_event);
io_thread.join();
}
CancelAsyncOperation(0, DPNCANCEL_ALL_OPERATIONS);
/* TODO: Wait properly. */
@ -688,80 +680,93 @@ HRESULT DirectPlay8Peer::TerminateSession(void* CONST pvTerminateData, CONST DWO
UNIMPLEMENTED("DirectPlay8Peer::TerminateSession");
}
void DirectPlay8Peer::io_main()
{
while(io_run)
{
WaitForSingleObject(io_event, INFINITE);
io_udp_recv(udp_socket);
io_udp_send(udp_socket, udp_sq);
if(discovery_socket != -1)
{
io_udp_recv(discovery_socket);
io_udp_send(discovery_socket, discovery_sq);
}
io_listener_accept(listener_socket);
std::unique_lock<std::mutex> l(lock);
for(auto p = peers.begin(); p != peers.end();)
{
auto next_p = std::next(p);
if(!io_tcp_recv(&*p) || !io_tcp_send(&*p))
{
/* TODO: Complete outstanding sends (failed), drop player */
closesocket(p->sock);
peers.erase(p);
}
p = next_p;
}
}
}
void DirectPlay8Peer::io_udp_recv(int sock)
void DirectPlay8Peer::handle_udp_socket_event()
{
struct sockaddr_in from_addr;
int fa_len = sizeof(from_addr);
int r = recvfrom(sock, (char*)(recv_buf), sizeof(recv_buf), 0, (struct sockaddr*)(&from_addr), &fa_len);
if(r <= 0)
{
return;
}
unsigned char recv_buf[MAX_PACKET_SIZE];
/* Process message */
std::unique_ptr<PacketDeserialiser> pd;
try {
pd.reset(new PacketDeserialiser(recv_buf, r));
}
catch(const PacketDeserialiser::Error &e)
int r = recvfrom(udp_socket, (char*)(recv_buf), sizeof(recv_buf), 0, (struct sockaddr*)(&from_addr), &fa_len);
if(r > 0)
{
/* Malformed packet received */
return;
}
switch(pd->packet_type())
{
case DPLITE_MSGID_HOST_ENUM_REQUEST:
/* Process message */
std::unique_ptr<PacketDeserialiser> pd;
try {
pd.reset(new PacketDeserialiser(recv_buf, r));
}
catch(const PacketDeserialiser::Error &e)
{
if(state == STATE_HOSTING)
{
handle_host_enum_request(*pd, &from_addr);
}
break;
/* Malformed packet received */
return;
}
default:
/* TODO: Log "unrecognised packet type" */
break;
switch(pd->packet_type())
{
case DPLITE_MSGID_HOST_ENUM_REQUEST:
{
if(state == STATE_HOSTING)
{
handle_host_enum_request(*pd, &from_addr);
}
break;
}
default:
/* TODO: Log "unrecognised packet type" */
break;
}
}
io_udp_send(udp_socket, udp_sq);
}
void DirectPlay8Peer::handle_other_socket_event()
{
if(discovery_socket != -1)
{
struct sockaddr_in from_addr;
int fa_len = sizeof(from_addr);
unsigned char recv_buf[MAX_PACKET_SIZE];
int r = recvfrom(discovery_socket, (char*)(recv_buf), sizeof(recv_buf), 0, (struct sockaddr*)(&from_addr), &fa_len);
if(r > 0)
{
/* Process message */
std::unique_ptr<PacketDeserialiser> pd;
try {
pd.reset(new PacketDeserialiser(recv_buf, r));
}
catch(const PacketDeserialiser::Error &e)
{
/* Malformed packet received */
return;
}
switch(pd->packet_type())
{
case DPLITE_MSGID_HOST_ENUM_REQUEST:
{
if(state == STATE_HOSTING)
{
handle_host_enum_request(*pd, &from_addr);
}
break;
}
default:
/* TODO: Log "unrecognised packet type" */
break;
}
}
}
io_listener_accept(listener_socket);
}
void DirectPlay8Peer::io_udp_send(int sock, SendQueue &sq)

View File

@ -11,6 +11,8 @@
#include <windows.h>
#include "AsyncHandleAllocator.hpp"
#include "EventObject.hpp"
#include "HandleHandlingPool.hpp"
#include "HostEnumerator.hpp"
#include "network.hpp"
#include "packet.hpp"
@ -47,14 +49,12 @@ class DirectPlay8Peer: public IDirectPlay8Peer
int listener_socket; /* TCP listener socket. */
int discovery_socket; /* Discovery UDP sockets, RECIEVES broadcasts only. */
unsigned char recv_buf[MAX_PACKET_SIZE];
EventObject udp_socket_event;
EventObject other_socket_event;
HandleHandlingPool worker_pool;
SendQueue udp_sq;
SendQueue discovery_sq;
HANDLE io_event;
std::thread io_thread;
std::atomic<bool> io_run;
struct Player
{
@ -95,6 +95,8 @@ class DirectPlay8Peer: public IDirectPlay8Peer
unsigned char recv_buf[MAX_PACKET_SIZE];
size_t recv_buf_cur;
EventObject event;
SendQueue sq;
SendQueue::Buffer *sqb;
@ -102,7 +104,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer
size_t send_remain;
Player(int sock, uint32_t ip, uint16_t port):
state(PS_INIT), sock(sock), ip(ip), port(port), recv_buf_cur(0), send_buf(NULL) {}
state(PS_INIT), sock(sock), ip(ip), port(port), recv_buf_cur(0), sq(event), send_buf(NULL) {}
};
std::list<Player> peers;
@ -118,7 +120,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer
void io_main();
void io_udp_recv(int sock);
void handle_udp_socket_event();
void handle_other_socket_event();
void io_udp_send(int sock, SendQueue &q);
void io_listener_accept(int sock);
bool io_tcp_recv(Player *player);

24
src/EventObject.cpp Normal file
View File

@ -0,0 +1,24 @@
#include <winsock2.h>
#include <windows.h>
#include <stdexcept>
#include "EventObject.hpp"
EventObject::EventObject(BOOL bManualReset, BOOL bInitialState)
{
handle = CreateEvent(NULL, bManualReset, bInitialState, NULL);
if(handle == NULL)
{
throw std::runtime_error("Unable to create event object");
}
}
EventObject::~EventObject()
{
CloseHandle(handle);
}
EventObject::operator HANDLE() const
{
return handle;
}

19
src/EventObject.hpp Normal file
View File

@ -0,0 +1,19 @@
#ifndef DPLITE_EVENTOBJECT_HPP
#define DPLITE_EVENTOBJECT_HPP
#include <winsock2.h>
#include <windows.h>
class EventObject
{
private:
HANDLE handle;
public:
EventObject(BOOL bManualReset = FALSE, BOOL bInitialState = FALSE);
~EventObject();
operator HANDLE() const;
};
#endif /* !DPLITE_EVENTOBJECT_HPP */

View File

@ -1,4 +1,6 @@
#include <assert.h>
#include <winsock2.h>
#include <windows.h>
#include "SendQueue.hpp"
@ -18,6 +20,8 @@ void SendQueue::send(SendPriority priority, Buffer *buffer)
high_queue.push_back(buffer);
break;
}
SetEvent(signal_on_queue);
}
SendQueue::Buffer *SendQueue::get_next()

View File

@ -8,6 +8,7 @@
#include <stdlib.h>
#include <utility>
#include <vector>
#include <windows.h>
class SendQueue
{
@ -45,8 +46,10 @@ class SendQueue
Buffer *current;
HANDLE signal_on_queue;
public:
SendQueue(): current(NULL) {}
SendQueue(HANDLE signal_on_queue): current(NULL), signal_on_queue(signal_on_queue) {}
/* No copy c'tor. */
SendQueue(const SendQueue &src) = delete;

View File

@ -4,32 +4,9 @@
#include <vector>
#include <windows.h>
#include "../src/EventObject.hpp"
#include "../src/HandleHandlingPool.hpp"
struct EventObject
{
HANDLE handle;
EventObject(BOOL bManualReset = FALSE, BOOL bInitialState = FALSE)
{
handle = CreateEvent(NULL, bManualReset, bInitialState, NULL);
if(handle == NULL)
{
throw std::runtime_error("Unable to create event object");
}
}
~EventObject()
{
CloseHandle(handle);
}
operator HANDLE() const
{
return handle;
}
};
TEST(HandleHandlingPool, SingleThreadBasic)
{
HandleHandlingPool pool(1, 32);