1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Implement HandleHandlingPool class.

This will run all the main I/O threads in DirectPlay8Peer, only
the host enumeration will be outside of it at this time.
This commit is contained in:
Daniel Collins 2018-09-15 21:38:01 +01:00
parent 26c038a24d
commit f06f219d61
4 changed files with 690 additions and 0 deletions

View File

@ -13,12 +13,14 @@ SET CPP_OBJS=^
src/COMAPIException.obj^
src/DirectPlay8Address.obj^
src/DirectPlay8Peer.obj^
src/HandleHandlingPool.obj^
src/HostEnumerator.obj^
src/network.obj^
src/packet.obj^
src/SendQueue.obj^
tests/DirectPlay8Address.obj^
tests/DirectPlay8Peer.obj^
tests/HandleHandlingPool.obj^
tests/PacketDeserialiser.obj^
tests/PacketSerialiser.obj
@ -30,12 +32,14 @@ SET TEST_OBJS=^
src/COMAPIException.obj^
src/DirectPlay8Address.obj^
src/DirectPlay8Peer.obj^
src/HandleHandlingPool.obj^
src/HostEnumerator.obj^
src/network.obj^
src/packet.obj^
src/SendQueue.obj^
tests/DirectPlay8Address.obj^
tests/DirectPlay8Peer.obj^
tests/HandleHandlingPool.obj^
tests/PacketDeserialiser.obj^
tests/PacketSerialiser.obj

330
src/HandleHandlingPool.cpp Normal file
View File

@ -0,0 +1,330 @@
#include <algorithm>
#include <exception>
#include <stdexcept>
#include <stdlib.h>
#include "HandleHandlingPool.hpp"
HandleHandlingPool::HandleHandlingPool(size_t threads_per_pool, size_t max_handles_per_pool):
threads_per_pool(threads_per_pool),
max_handles_per_pool(max_handles_per_pool + 1),
stopping(false)
{
if(threads_per_pool < 1)
{
throw std::invalid_argument("threads_per_pool must be >= 1");
}
if(max_handles_per_pool < 1 || max_handles_per_pool >= MAXIMUM_WAIT_OBJECTS)
{
throw std::invalid_argument("max_handles_per_pool must be >= 1 and < MAXIMUM_WAIT_OBJECTS");
}
spin_workers = CreateEvent(NULL, TRUE, FALSE, NULL);
if(spin_workers == NULL)
{
throw std::runtime_error("Unable to create event object");
}
}
HandleHandlingPool::~HandleHandlingPool()
{
/* Signal all active workers to stop and wait for any to exit. */
stopping = true;
SetEvent(spin_workers);
std::unique_lock<std::mutex> wo_l(workers_lock);
workers_cv.wait(wo_l, [this]() { return active_workers.empty(); });
/* Reap the last thread which exited, if any were spawned. */
if(join_worker.joinable())
{
join_worker.join();
}
CloseHandle(spin_workers);
}
void HandleHandlingPool::add_handle(HANDLE handle, const std::function<void()> &callback)
{
/* See HandleHandlingPool.hpp for an explanation of this sequence. */
std::unique_lock<std::mutex> pwl(pending_writer_lock);
pending_writer = true;
SetEvent(spin_workers);
std::unique_lock<std::shared_mutex> wal(wait_lock);
ResetEvent(spin_workers);
pending_writer = false;
pwl.unlock();
/* Notify early and move any waiting workers up against wait_lock in case one of the below
* operations fails, else they may be left deadlocked.
*/
pending_writer_cv.notify_all();
if((handles.size() % max_handles_per_pool) == 0)
{
/* There aren't any free slots for this handle in the currently running worker
* threads, start a new block beginning with spin_workers.
*/
size_t base_index = handles.size();
handles.push_back(spin_workers);
try {
callbacks.push_back([](){ abort(); }); /* Callback should never be executed. */
handles.push_back(handle);
try {
callbacks.push_back(callback);
}
catch(const std::exception &e)
{
handles.pop_back();
throw e;
}
}
catch(const std::exception &e)
{
handles.pop_back();
throw e;
}
/* Calculate how many threads we need to spawn for there to be threads_per_pool
* workers for this pool.
*
* This may be less than threads_per_pool, if the block we just created previously
* existed, then some handles were removed causing it to go away - threads will exit
* once they detect they have nothing to wait for, but we may also add more handles
* before they catch up.
*
* Workers check if they have anything to do and remove themselves from
* active_workers while holding wait_lock, so there is no race between us counting
* the workers and them going away.
*/
std::unique_lock<std::mutex> wo_l(workers_lock);
size_t threads_to_spawn = threads_per_pool;
for(auto w = active_workers.begin(); w != active_workers.end(); w++)
{
if((*w)->base_index == base_index)
{
--threads_to_spawn;
}
}
/* Spawn the new worker threads.
*
* We need to create the worker data on the heap so that:
*
* a) We can pass a reference to it into the worker main so that it may remove
* itself from active_workers when the time comes.
*
* b) The reference in active_workers itself is const, so that thread may be
* changed by the thread starting/exiting.
*
* The thread won't attempt to do anything (e.g. exit) until after acquiring
* wait_lock, so it won't attempt to do anything with its thread handle before it
* is done being initialised.
*/
for(size_t i = 0; i < threads_to_spawn; ++i)
{
Worker *w = new Worker(base_index);
try {
active_workers.insert(w);
try {
w->thread = std::thread(&HandleHandlingPool::worker_main, this, w);
}
catch(const std::exception &e)
{
active_workers.erase(w);
throw e;
}
}
catch(const std::exception &e)
{
delete w;
if(i == 0)
{
/* This is the first worker we tried spawning, fail the
* whole operation.
*/
handles.pop_back(); callbacks.pop_back();
handles.pop_back(); callbacks.pop_back();
throw e;
}
}
}
}
else{
handles.push_back(handle);
try {
callbacks.push_back(callback);
}
catch(const std::exception &e)
{
handles.pop_back();
throw e;
}
}
}
void HandleHandlingPool::remove_handle(HANDLE handle)
{
/* See HandleHandlingPool.hpp for an explanation of this sequence. */
std::unique_lock<std::mutex> pwl(pending_writer_lock);
pending_writer = true;
SetEvent(spin_workers);
std::unique_lock<std::shared_mutex> wal(wait_lock);
ResetEvent(spin_workers);
pending_writer = false;
pwl.unlock();
/* Scan handles to find the index of the handle to be removed. */
size_t remove_index = 0;
while(remove_index < handles.size() && handles[remove_index] != handle)
{
++remove_index;
}
if(remove_index >= handles.size())
{
/* Couldn't find the handle. */
return;
}
/* If the last handle is spin_workers, then the last call to remove_handle() removed the
* last handle in the last block. Now we remove it and the worker threads for that block
* will exit when they wake up.
*
* Doing it this way around means we destroy worker threads after removing its last handle
* and then another handle, reducing thrash if a single handle is added/removed at the
* boundary. Downside is we may keep one group of idle threads around for no reason.
*/
if(handles.back() == spin_workers)
{
handles.pop_back();
callbacks.pop_back();
}
/* Replace it with the last handle. */
handles[remove_index] = handles.back();
handles.pop_back();
callbacks[remove_index] = callbacks.back();
callbacks.pop_back();
pending_writer_cv.notify_all();
}
void HandleHandlingPool::worker_main(HandleHandlingPool::Worker *w)
{
while(1)
{
if(pending_writer)
{
std::unique_lock<std::mutex> pwl(pending_writer_lock);
pending_writer_cv.wait(pwl, [this]() { return !pending_writer; });
}
std::shared_lock<std::shared_mutex> l(wait_lock);
if(handles.size() <= w->base_index)
{
/* No handles to wait on. Exit. */
worker_exit(w);
return;
}
/* Number of handles to wait for. We wait from base_index to the end of the handles
* array or the end of our block, whichever is closest.
*/
size_t num_handles = std::min((handles.size() - w->base_index), max_handles_per_pool);
DWORD wait_res = WaitForMultipleObjects(num_handles, &(handles[w->base_index]), FALSE, INFINITE);
if(stopping)
{
worker_exit(w);
return;
}
if(wait_res >= (WAIT_OBJECT_0 + 1) && wait_res < (WAIT_OBJECT_0 + num_handles))
{
/* Take a copy of the callback functor so we can release all of our locks
* without worrying about it disappearing from under itself if handles are
* added/removed while its executing.
*/
size_t wait_index = (w->base_index + wait_res) - WAIT_OBJECT_0;
std::function<void()> callback = callbacks[wait_index];
l.unlock();
callback();
}
else if(wait_res == WAIT_FAILED)
{
/* Some system error while waiting... invalid handle?
* Only thing we can do is go quietly. Or maybe not so quietly... abort?
*/
worker_exit(w);
return;
}
}
}
/* Exit a worker thread.
*
* This MUST only be called by the worker thread which is about to exit, which
* MUST exit as soon as this method returns.
*/
void HandleHandlingPool::worker_exit(HandleHandlingPool::Worker *w)
{
std::unique_lock<std::mutex> wo_l(workers_lock);
if(join_worker.joinable())
{
/* Only one zombie thread is allowed at a time. If one exists, we must reap it
* before taking its place.
*/
join_worker.join();
}
/* MSVC doesn't like assignment here for some reason. */
join_worker.swap(w->thread);
active_workers.erase(w);
delete w;
wo_l.unlock();
workers_cv.notify_one();
/* This thread is now ready to be reaped by the next thread to exit, or the destructor, if
* we are the last one.
*/
}
HandleHandlingPool::Worker::Worker(size_t base_index):
base_index(base_index) {}

152
src/HandleHandlingPool.hpp Normal file
View File

@ -0,0 +1,152 @@
#ifndef DPLITE_HANDLEHANDLINGPOOL_HPP
#define DPLITE_HANDLEHANDLINGPOOL_HPP
#include <winsock2.h>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <thread>
#include <vector>
#include <windows.h>
/* This class maintains a pool of threads to wait on HANDLEs and invoke callback functors when the
* HANDLEs become signalled.
*
* For up to every max_handles_per_pool HANDLEs registered, threads_per_pool worker threads will be
* created to wait on them. Each block of HANDLEs will be assigned to their own pool of threads but
* may move between them when handles are removed.
*
* Handles may be added or removed at any point, although this is not a heavily optimised path and
* will temporarily block all worker threads from waiting on their handles.
*
* Note that the same callback may be invoked in multiple threads concurrently if the linked HANDLE
* is signalled multiple times in quick sucession or is a manual reset event. Ensure your callbacks
* can handle this and do not block, as this will prevent other HANDLEs managed by the same thread
* from being invoked.
*/
class HandleHandlingPool
{
private:
struct Worker
{
const size_t base_index;
std::thread thread;
Worker(size_t base_index);
};
const size_t threads_per_pool;
const size_t max_handles_per_pool;
/* spin_workers is a MANUAL RESET event object, we set this to signalled whenever
* we need all the worker threads to exit their WaitForMultipleObjects() calls.
*/
HANDLE spin_workers;
std::atomic<bool> stopping;
/* Array of handles to wait for, and the callbacks to invoke when they become
* signalled.
*
* These are interlaced with spin_workers every multiple of max_handles_per_pool
* elements, so each thread may pass a range directly into WaitForMultipleObjects()
* and will be woken by WAIT_OBJECT_0 when we signal spin_workers.
*
* Slots in callbacks which correspond to spin_workers have functors which must
* never be called to simplify the implementation.
*/
std::vector<HANDLE> handles;
std::vector< std::function<void()> > callbacks;
/* This shared_mutex protects access to handles/callbacks.
*
* Multiple workers may hold it, and will do so before they start waiting for
* events until they enter a callback method.
*
* One thread may hold it exclusively in order to add or remove a handle, this
* will block any workers from waiting for events.
*/
std::shared_mutex wait_lock;
/* pending_writer ensures worker threads cannot starve writers which are trying to
* exclusively take wait_lock.
*
* When a thread wants to take the lock exclusively, they:
*
* 1) Claim pending_writer_lock
* 2) Set pending_writer
* 3) Set spin_workers to signalled
* 4) Exclusively lock wait_lock
*
* Then, any worker threads will do the following:
*
* 4) If currently waiting, return from WaitForMultipleObjects() and then release
* its shared lock on wait_lock.
*
* 5) Check pending_writer, if true, wait on pending_writer_cv for it to be false.
*
* This doesn't really need to be mutex protected, and there is still a possible
* race between the worker releasing pending_writer_lock and re-acquiring
* wait_lock and another thread attempting to lock it exclusively again, but
* then it will just spin around the loop again due to spin_workers.
*
* Once all workers have relinquished wait_lock, the thread which has
* pending_writer_lock will do the following:
*
* 6) Reset spin_workers to non-signalled
* 7) Clear pending_writer
* 8) Release pending_writer_lock
*
* At this point, the writer is free to mess around with the handles, before
* releasing wait_lock and signalling pending_writer_cv. All worker threads will
* now be eligible to wake up and resume operations.
*/
std::atomic<bool> pending_writer;
std::mutex pending_writer_lock;
std::condition_variable pending_writer_cv;
/* active_workers contains a pointer to the Worker object for each running or
* starting worker thread.
*
* join_worker, if joinable, is a handle to the last worker thread which exited.
*
* Worker threads are responsible for removing themselves from the active_workers
* set when they exit and placing themselves into join_worker. If join_worker is
* already set to a valid thread, the new exiting thread must wait on it. The final
* thread is joined by our destructor.
*
* This ensures that we can only ever have one zombie thread at a time and all
* threads have been joined before our destructor returns.
*
* workers_lock serialises access to active_workers AND join_worker; any thread
* must lock it before performing ANY operation on either.
*
* workers_cv is signalled by worker threads when they exit, after they have placed
* themselves into join_worker and are about to exit. This is used by the destructor
* to wait for zero active_workers.
*/
std::set<Worker*> active_workers;
std::thread join_worker;
std::mutex workers_lock;
std::condition_variable workers_cv;
void worker_main(HandleHandlingPool::Worker *w);
void worker_exit(HandleHandlingPool::Worker *w);
public:
HandleHandlingPool(size_t threads_per_pool, size_t max_handles_per_pool);
~HandleHandlingPool();
void add_handle(HANDLE handle, const std::function<void()> &callback);
void remove_handle(HANDLE handle);
};
#endif /* !DPLITE_HANDLEHANDLINGPOOL_HPP */

View File

@ -0,0 +1,204 @@
#include <winsock2.h>
#include <atomic>
#include <gtest/gtest.h>
#include <vector>
#include <windows.h>
#include "../src/HandleHandlingPool.hpp"
struct EventObject
{
HANDLE handle;
EventObject(BOOL bManualReset = FALSE, BOOL bInitialState = FALSE)
{
handle = CreateEvent(NULL, bManualReset, bInitialState, NULL);
if(handle == NULL)
{
throw std::runtime_error("Unable to create event object");
}
}
~EventObject()
{
CloseHandle(handle);
}
operator HANDLE() const
{
return handle;
}
};
TEST(HandleHandlingPool, SingleThreadBasic)
{
HandleHandlingPool pool(1, 32);
EventObject e1(FALSE, TRUE);
std::atomic<int> e1_counter(0);
EventObject e2(FALSE, FALSE);
std::atomic<int> e2_counter(0);
EventObject e3(FALSE, FALSE);
std::atomic<int> e3_counter(0);
pool.add_handle(e1, [&e1, &e1_counter]() { if(++e1_counter < 4) { SetEvent(e1); } });
pool.add_handle(e2, [&e2, &e2_counter]() { if(++e2_counter < 2) { SetEvent(e2); } });
pool.add_handle(e3, [&e3_counter]() { ++e3_counter; });
SetEvent(e2);
Sleep(100);
EXPECT_EQ(e1_counter, 4);
EXPECT_EQ(e2_counter, 2);
EXPECT_EQ(e3_counter, 0);
}
TEST(HandleHandlingPool, MultiThreadBasic)
{
HandleHandlingPool pool(4, 32);
EventObject e1(FALSE, TRUE);
std::atomic<int> e1_counter(0);
EventObject e2(FALSE, FALSE);
std::atomic<int> e2_counter(0);
EventObject e3(FALSE, FALSE);
std::atomic<int> e3_counter(0);
pool.add_handle(e1, [&e1, &e1_counter]() { if(++e1_counter < 4) { SetEvent(e1); } });
pool.add_handle(e2, [&e2, &e2_counter]() { if(++e2_counter < 2) { SetEvent(e2); } });
pool.add_handle(e3, [&e3_counter]() { ++e3_counter; });
SetEvent(e2);
Sleep(100);
EXPECT_EQ(e1_counter, 4);
EXPECT_EQ(e2_counter, 2);
EXPECT_EQ(e3_counter, 0);
}
TEST(HandleHandlingPool, ThreadAssignment)
{
HandleHandlingPool pool(4, 1);
EventObject e1(TRUE, TRUE);
std::atomic<int> e1_counter(0);
EventObject e2(FALSE, TRUE);
std::atomic<int> e2_counter(0);
EventObject e3(FALSE, FALSE);
std::atomic<int> e3_counter(0);
std::mutex mutex;
mutex.lock();
pool.add_handle(e1, [&mutex, &e1_counter]() { ++e1_counter; mutex.lock(); mutex.unlock(); });
pool.add_handle(e2, [&mutex, &e2, &e2_counter]() { if(++e2_counter < 20) { SetEvent(e2); } mutex.lock(); mutex.unlock(); });
pool.add_handle(e3, [&e3_counter]() { ++e3_counter; });
Sleep(100);
/* We are holding the mutex, so the e1/e2 callbacks should have been invoked once by each
* worker thread, and still be sleeping until we released the mutex. The thread allocation
* for e3 should still be free.
*/
EXPECT_EQ(e1_counter, 4);
EXPECT_EQ(e2_counter, 4);
EXPECT_EQ(e3_counter, 0);
SetEvent(e3);
Sleep(100);
/* Now e3 should have fired once and returned. */
EXPECT_EQ(e1_counter, 4);
EXPECT_EQ(e2_counter, 4);
EXPECT_EQ(e3_counter, 1);
ResetEvent(e1);
mutex.unlock();
Sleep(100);
/* Since we released the mutex, the pending e1/e2 callbacks should return and the e2
* callback should keep signalling itself until it reaches 20 calls.
*/
EXPECT_EQ(e1_counter, 4);
EXPECT_EQ(e2_counter, 20);
EXPECT_EQ(e3_counter, 1);
}
TEST(HandleHandlingPool, RemoveHandle)
{
HandleHandlingPool pool(4, 1);
EventObject e1(FALSE, FALSE);
std::atomic<int> e1_counter(0);
EventObject e2(FALSE, FALSE);
std::atomic<int> e2_counter(0);
EventObject e3(FALSE, FALSE);
std::atomic<int> e3_counter(0);
pool.add_handle(e1, [&e1_counter]() { ++e1_counter; });
pool.add_handle(e2, [&e2_counter]() { ++e2_counter; });
pool.add_handle(e3, [&e3_counter]() { ++e3_counter; });
SetEvent(e1);
SetEvent(e2);
SetEvent(e3);
Sleep(100);
EXPECT_EQ(e1_counter, 1);
EXPECT_EQ(e2_counter, 1);
EXPECT_EQ(e3_counter, 1);
pool.remove_handle(e2);
SetEvent(e1);
SetEvent(e2);
SetEvent(e3);
Sleep(100);
EXPECT_EQ(e1_counter, 2);
EXPECT_EQ(e2_counter, 1);
EXPECT_EQ(e3_counter, 2);
}
TEST(HandleHandlingPool, Stress)
{
/* Stress test to see if we can trigger a race/crash - 4 groups of 16 handles, each with 8
* worker threads, and each handle signalling 10,000 times.
*/
std::vector<EventObject> events(64);
std::vector< std::atomic<int> > counters(64);
HandleHandlingPool pool(8, 16);
for(int i = 0; i < 64; ++i)
{
pool.add_handle(events[i], [i, &counters, &events]() { if(++counters[i] < 10000) { SetEvent(events[i]); } });
SetEvent(events[i]);
}
/* Is 5s enough to handle 640,000 events across 32 threads? Probably! */
Sleep(5000);
for(int i = 0; i < 64; ++i)
{
EXPECT_EQ(counters[i], 10000);
}
}