From 8d46ae9e9b1266c6af4876aef9ec802bb0b8ab9e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 27 Nov 2018 18:50:44 +0000 Subject: [PATCH] IDirectPlay8Peer: Complete asynchronous sends in the worker pool. Starting threads is expensive, far too expensive to be doing it on every asynchronous SendTo() call. --- src/DirectPlay8Peer.cpp | 36 +++++++++++++++++++++++++++++++----- src/DirectPlay8Peer.hpp | 7 +++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index d4f5a71..bfe5536 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -124,6 +124,7 @@ HRESULT DirectPlay8Peer::Initialize(PVOID CONST pvUserContext, CONST PFNDPNMESSA worker_pool->add_handle(udp_socket_event, [this]() { handle_udp_socket_event(); }); worker_pool->add_handle(other_socket_event, [this]() { handle_other_socket_event(); }); + worker_pool->add_handle(work_ready, [this]() { handle_work(); }); state = STATE_INITIALISED; @@ -832,9 +833,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST unsigned char *payload_copy = new unsigned char[payload_size]; memcpy(payload_copy, payload.data(), payload_size); - /* TODO: Do this in a properly managed worker thread. */ - - std::thread t([this, payload_size, payload_copy, handle_send_complete, dwFlags]() + queue_work([this, payload_size, payload_copy, handle_send_complete, dwFlags]() { std::unique_lock l(lock); @@ -861,8 +860,6 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST handle_send_complete(l, S_OK); }); - - t.detach(); } return DPNSUCCESS_PENDING; @@ -3189,6 +3186,35 @@ void DirectPlay8Peer::handle_other_socket_event() peer_accept(l); } +void DirectPlay8Peer::queue_work(const std::function &work) +{ + work_queue.push(work); + SetEvent(work_ready); +} + +void DirectPlay8Peer::handle_work() +{ + std::unique_lock l(lock); + + if(!work_queue.empty()) + { + std::function work = work_queue.front(); + work_queue.pop(); + + if(!work_queue.empty()) + { + /* Wake up another thread, in case we are heavily loaded and the pool isn't + * keeping up with the events from queue_work() + */ + SetEvent(work_ready); + } + + l.unlock(); + + work(); + } +} + void DirectPlay8Peer::io_peer_triggered(unsigned int peer_id) { std::unique_lock l(lock); diff --git a/src/DirectPlay8Peer.hpp b/src/DirectPlay8Peer.hpp index b112d05..4c32f9f 100644 --- a/src/DirectPlay8Peer.hpp +++ b/src/DirectPlay8Peer.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -69,6 +70,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer HandleHandlingPool *worker_pool; + std::queue< std::function > work_queue; + EventObject work_ready; + SendQueue udp_sq; struct Peer @@ -207,6 +211,9 @@ class DirectPlay8Peer: public IDirectPlay8Peer void io_udp_send(std::unique_lock &l); void handle_other_socket_event(); + void queue_work(const std::function &work); + void handle_work(); + void io_peer_triggered(unsigned int peer_id); void io_peer_connected(std::unique_lock &l, unsigned int peer_id); void io_peer_send(std::unique_lock &l, unsigned int peer_id);