From f06f219d61c84d10848369af4b2e993c3dac63d4 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sat, 15 Sep 2018 21:38:01 +0100 Subject: [PATCH] Implement HandleHandlingPool class. This will run all the main I/O threads in DirectPlay8Peer, only the host enumeration will be outside of it at this time. --- build.bat | 4 + src/HandleHandlingPool.cpp | 330 +++++++++++++++++++++++++++++++++++ src/HandleHandlingPool.hpp | 152 ++++++++++++++++ tests/HandleHandlingPool.cpp | 204 ++++++++++++++++++++++ 4 files changed, 690 insertions(+) create mode 100644 src/HandleHandlingPool.cpp create mode 100644 src/HandleHandlingPool.hpp create mode 100644 tests/HandleHandlingPool.cpp diff --git a/build.bat b/build.bat index 8f8b782..e831dfe 100644 --- a/build.bat +++ b/build.bat @@ -13,12 +13,14 @@ SET CPP_OBJS=^ src/COMAPIException.obj^ src/DirectPlay8Address.obj^ src/DirectPlay8Peer.obj^ + src/HandleHandlingPool.obj^ src/HostEnumerator.obj^ src/network.obj^ src/packet.obj^ src/SendQueue.obj^ tests/DirectPlay8Address.obj^ tests/DirectPlay8Peer.obj^ + tests/HandleHandlingPool.obj^ tests/PacketDeserialiser.obj^ tests/PacketSerialiser.obj @@ -30,12 +32,14 @@ SET TEST_OBJS=^ src/COMAPIException.obj^ src/DirectPlay8Address.obj^ src/DirectPlay8Peer.obj^ + src/HandleHandlingPool.obj^ src/HostEnumerator.obj^ src/network.obj^ src/packet.obj^ src/SendQueue.obj^ tests/DirectPlay8Address.obj^ tests/DirectPlay8Peer.obj^ + tests/HandleHandlingPool.obj^ tests/PacketDeserialiser.obj^ tests/PacketSerialiser.obj diff --git a/src/HandleHandlingPool.cpp b/src/HandleHandlingPool.cpp new file mode 100644 index 0000000..0ec23c0 --- /dev/null +++ b/src/HandleHandlingPool.cpp @@ -0,0 +1,330 @@ +#include +#include +#include +#include + +#include "HandleHandlingPool.hpp" + +HandleHandlingPool::HandleHandlingPool(size_t threads_per_pool, size_t max_handles_per_pool): + threads_per_pool(threads_per_pool), + max_handles_per_pool(max_handles_per_pool + 1), + stopping(false) +{ + if(threads_per_pool < 1) + { + throw std::invalid_argument("threads_per_pool must be >= 1"); + } + + if(max_handles_per_pool < 1 || max_handles_per_pool >= MAXIMUM_WAIT_OBJECTS) + { + throw std::invalid_argument("max_handles_per_pool must be >= 1 and < MAXIMUM_WAIT_OBJECTS"); + } + + spin_workers = CreateEvent(NULL, TRUE, FALSE, NULL); + if(spin_workers == NULL) + { + throw std::runtime_error("Unable to create event object"); + } +} + +HandleHandlingPool::~HandleHandlingPool() +{ + /* Signal all active workers to stop and wait for any to exit. */ + + stopping = true; + SetEvent(spin_workers); + + std::unique_lock wo_l(workers_lock); + workers_cv.wait(wo_l, [this]() { return active_workers.empty(); }); + + /* Reap the last thread which exited, if any were spawned. */ + + if(join_worker.joinable()) + { + join_worker.join(); + } + + CloseHandle(spin_workers); +} + +void HandleHandlingPool::add_handle(HANDLE handle, const std::function &callback) +{ + /* See HandleHandlingPool.hpp for an explanation of this sequence. */ + + std::unique_lock pwl(pending_writer_lock); + + pending_writer = true; + SetEvent(spin_workers); + + std::unique_lock wal(wait_lock); + + ResetEvent(spin_workers); + pending_writer = false; + + pwl.unlock(); + + /* Notify early and move any waiting workers up against wait_lock in case one of the below + * operations fails, else they may be left deadlocked. + */ + pending_writer_cv.notify_all(); + + if((handles.size() % max_handles_per_pool) == 0) + { + /* There aren't any free slots for this handle in the currently running worker + * threads, start a new block beginning with spin_workers. + */ + + size_t base_index = handles.size(); + + handles.push_back(spin_workers); + try { + callbacks.push_back([](){ abort(); }); /* Callback should never be executed. */ + + handles.push_back(handle); + try { + callbacks.push_back(callback); + } + catch(const std::exception &e) + { + handles.pop_back(); + throw e; + } + } + catch(const std::exception &e) + { + handles.pop_back(); + throw e; + } + + /* Calculate how many threads we need to spawn for there to be threads_per_pool + * workers for this pool. + * + * This may be less than threads_per_pool, if the block we just created previously + * existed, then some handles were removed causing it to go away - threads will exit + * once they detect they have nothing to wait for, but we may also add more handles + * before they catch up. + * + * Workers check if they have anything to do and remove themselves from + * active_workers while holding wait_lock, so there is no race between us counting + * the workers and them going away. + */ + + std::unique_lock wo_l(workers_lock); + + size_t threads_to_spawn = threads_per_pool; + + for(auto w = active_workers.begin(); w != active_workers.end(); w++) + { + if((*w)->base_index == base_index) + { + --threads_to_spawn; + } + } + + /* Spawn the new worker threads. + * + * We need to create the worker data on the heap so that: + * + * a) We can pass a reference to it into the worker main so that it may remove + * itself from active_workers when the time comes. + * + * b) The reference in active_workers itself is const, so that thread may be + * changed by the thread starting/exiting. + * + * The thread won't attempt to do anything (e.g. exit) until after acquiring + * wait_lock, so it won't attempt to do anything with its thread handle before it + * is done being initialised. + */ + + for(size_t i = 0; i < threads_to_spawn; ++i) + { + Worker *w = new Worker(base_index); + + try { + active_workers.insert(w); + + try { + w->thread = std::thread(&HandleHandlingPool::worker_main, this, w); + } + catch(const std::exception &e) + { + active_workers.erase(w); + throw e; + } + } + catch(const std::exception &e) + { + delete w; + + if(i == 0) + { + /* This is the first worker we tried spawning, fail the + * whole operation. + */ + handles.pop_back(); callbacks.pop_back(); + handles.pop_back(); callbacks.pop_back(); + + throw e; + } + } + } + } + else{ + handles.push_back(handle); + + try { + callbacks.push_back(callback); + } + catch(const std::exception &e) + { + handles.pop_back(); + throw e; + } + } +} + +void HandleHandlingPool::remove_handle(HANDLE handle) +{ + /* See HandleHandlingPool.hpp for an explanation of this sequence. */ + + std::unique_lock pwl(pending_writer_lock); + + pending_writer = true; + SetEvent(spin_workers); + + std::unique_lock wal(wait_lock); + + ResetEvent(spin_workers); + pending_writer = false; + + pwl.unlock(); + + /* Scan handles to find the index of the handle to be removed. */ + + size_t remove_index = 0; + while(remove_index < handles.size() && handles[remove_index] != handle) + { + ++remove_index; + } + + if(remove_index >= handles.size()) + { + /* Couldn't find the handle. */ + return; + } + + /* If the last handle is spin_workers, then the last call to remove_handle() removed the + * last handle in the last block. Now we remove it and the worker threads for that block + * will exit when they wake up. + * + * Doing it this way around means we destroy worker threads after removing its last handle + * and then another handle, reducing thrash if a single handle is added/removed at the + * boundary. Downside is we may keep one group of idle threads around for no reason. + */ + + if(handles.back() == spin_workers) + { + handles.pop_back(); + callbacks.pop_back(); + } + + /* Replace it with the last handle. */ + + handles[remove_index] = handles.back(); + handles.pop_back(); + + callbacks[remove_index] = callbacks.back(); + callbacks.pop_back(); + + pending_writer_cv.notify_all(); +} + +void HandleHandlingPool::worker_main(HandleHandlingPool::Worker *w) +{ + while(1) + { + if(pending_writer) + { + std::unique_lock pwl(pending_writer_lock); + pending_writer_cv.wait(pwl, [this]() { return !pending_writer; }); + } + + std::shared_lock l(wait_lock); + + if(handles.size() <= w->base_index) + { + /* No handles to wait on. Exit. */ + worker_exit(w); + return; + } + + /* Number of handles to wait for. We wait from base_index to the end of the handles + * array or the end of our block, whichever is closest. + */ + size_t num_handles = std::min((handles.size() - w->base_index), max_handles_per_pool); + + DWORD wait_res = WaitForMultipleObjects(num_handles, &(handles[w->base_index]), FALSE, INFINITE); + + if(stopping) + { + worker_exit(w); + return; + } + + if(wait_res >= (WAIT_OBJECT_0 + 1) && wait_res < (WAIT_OBJECT_0 + num_handles)) + { + /* Take a copy of the callback functor so we can release all of our locks + * without worrying about it disappearing from under itself if handles are + * added/removed while its executing. + */ + size_t wait_index = (w->base_index + wait_res) - WAIT_OBJECT_0; + std::function callback = callbacks[wait_index]; + + l.unlock(); + + callback(); + } + else if(wait_res == WAIT_FAILED) + { + /* Some system error while waiting... invalid handle? + * Only thing we can do is go quietly. Or maybe not so quietly... abort? + */ + worker_exit(w); + return; + } + } +} + +/* Exit a worker thread. + * + * This MUST only be called by the worker thread which is about to exit, which + * MUST exit as soon as this method returns. +*/ +void HandleHandlingPool::worker_exit(HandleHandlingPool::Worker *w) +{ + std::unique_lock wo_l(workers_lock); + + if(join_worker.joinable()) + { + /* Only one zombie thread is allowed at a time. If one exists, we must reap it + * before taking its place. + */ + join_worker.join(); + } + + /* MSVC doesn't like assignment here for some reason. */ + join_worker.swap(w->thread); + + active_workers.erase(w); + delete w; + + wo_l.unlock(); + workers_cv.notify_one(); + + /* This thread is now ready to be reaped by the next thread to exit, or the destructor, if + * we are the last one. + */ +} + +HandleHandlingPool::Worker::Worker(size_t base_index): + base_index(base_index) {} diff --git a/src/HandleHandlingPool.hpp b/src/HandleHandlingPool.hpp new file mode 100644 index 0000000..97d7804 --- /dev/null +++ b/src/HandleHandlingPool.hpp @@ -0,0 +1,152 @@ +#ifndef DPLITE_HANDLEHANDLINGPOOL_HPP +#define DPLITE_HANDLEHANDLINGPOOL_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* This class maintains a pool of threads to wait on HANDLEs and invoke callback functors when the + * HANDLEs become signalled. + * + * For up to every max_handles_per_pool HANDLEs registered, threads_per_pool worker threads will be + * created to wait on them. Each block of HANDLEs will be assigned to their own pool of threads but + * may move between them when handles are removed. + * + * Handles may be added or removed at any point, although this is not a heavily optimised path and + * will temporarily block all worker threads from waiting on their handles. + * + * Note that the same callback may be invoked in multiple threads concurrently if the linked HANDLE + * is signalled multiple times in quick sucession or is a manual reset event. Ensure your callbacks + * can handle this and do not block, as this will prevent other HANDLEs managed by the same thread + * from being invoked. +*/ + +class HandleHandlingPool +{ + private: + struct Worker + { + const size_t base_index; + std::thread thread; + + Worker(size_t base_index); + }; + + const size_t threads_per_pool; + const size_t max_handles_per_pool; + + /* spin_workers is a MANUAL RESET event object, we set this to signalled whenever + * we need all the worker threads to exit their WaitForMultipleObjects() calls. + */ + + HANDLE spin_workers; + std::atomic stopping; + + /* Array of handles to wait for, and the callbacks to invoke when they become + * signalled. + * + * These are interlaced with spin_workers every multiple of max_handles_per_pool + * elements, so each thread may pass a range directly into WaitForMultipleObjects() + * and will be woken by WAIT_OBJECT_0 when we signal spin_workers. + * + * Slots in callbacks which correspond to spin_workers have functors which must + * never be called to simplify the implementation. + */ + + std::vector handles; + std::vector< std::function > callbacks; + + /* This shared_mutex protects access to handles/callbacks. + * + * Multiple workers may hold it, and will do so before they start waiting for + * events until they enter a callback method. + * + * One thread may hold it exclusively in order to add or remove a handle, this + * will block any workers from waiting for events. + */ + + std::shared_mutex wait_lock; + + /* pending_writer ensures worker threads cannot starve writers which are trying to + * exclusively take wait_lock. + * + * When a thread wants to take the lock exclusively, they: + * + * 1) Claim pending_writer_lock + * 2) Set pending_writer + * 3) Set spin_workers to signalled + * 4) Exclusively lock wait_lock + * + * Then, any worker threads will do the following: + * + * 4) If currently waiting, return from WaitForMultipleObjects() and then release + * its shared lock on wait_lock. + * + * 5) Check pending_writer, if true, wait on pending_writer_cv for it to be false. + * + * This doesn't really need to be mutex protected, and there is still a possible + * race between the worker releasing pending_writer_lock and re-acquiring + * wait_lock and another thread attempting to lock it exclusively again, but + * then it will just spin around the loop again due to spin_workers. + * + * Once all workers have relinquished wait_lock, the thread which has + * pending_writer_lock will do the following: + * + * 6) Reset spin_workers to non-signalled + * 7) Clear pending_writer + * 8) Release pending_writer_lock + * + * At this point, the writer is free to mess around with the handles, before + * releasing wait_lock and signalling pending_writer_cv. All worker threads will + * now be eligible to wake up and resume operations. + */ + + std::atomic pending_writer; + std::mutex pending_writer_lock; + std::condition_variable pending_writer_cv; + + /* active_workers contains a pointer to the Worker object for each running or + * starting worker thread. + * + * join_worker, if joinable, is a handle to the last worker thread which exited. + * + * Worker threads are responsible for removing themselves from the active_workers + * set when they exit and placing themselves into join_worker. If join_worker is + * already set to a valid thread, the new exiting thread must wait on it. The final + * thread is joined by our destructor. + * + * This ensures that we can only ever have one zombie thread at a time and all + * threads have been joined before our destructor returns. + * + * workers_lock serialises access to active_workers AND join_worker; any thread + * must lock it before performing ANY operation on either. + * + * workers_cv is signalled by worker threads when they exit, after they have placed + * themselves into join_worker and are about to exit. This is used by the destructor + * to wait for zero active_workers. + */ + + std::set active_workers; + std::thread join_worker; + std::mutex workers_lock; + std::condition_variable workers_cv; + + void worker_main(HandleHandlingPool::Worker *w); + void worker_exit(HandleHandlingPool::Worker *w); + + public: + HandleHandlingPool(size_t threads_per_pool, size_t max_handles_per_pool); + ~HandleHandlingPool(); + + void add_handle(HANDLE handle, const std::function &callback); + void remove_handle(HANDLE handle); +}; + +#endif /* !DPLITE_HANDLEHANDLINGPOOL_HPP */ diff --git a/tests/HandleHandlingPool.cpp b/tests/HandleHandlingPool.cpp new file mode 100644 index 0000000..f877f4e --- /dev/null +++ b/tests/HandleHandlingPool.cpp @@ -0,0 +1,204 @@ +#include +#include +#include +#include +#include + +#include "../src/HandleHandlingPool.hpp" + +struct EventObject +{ + HANDLE handle; + + EventObject(BOOL bManualReset = FALSE, BOOL bInitialState = FALSE) + { + handle = CreateEvent(NULL, bManualReset, bInitialState, NULL); + if(handle == NULL) + { + throw std::runtime_error("Unable to create event object"); + } + } + + ~EventObject() + { + CloseHandle(handle); + } + + operator HANDLE() const + { + return handle; + } +}; + +TEST(HandleHandlingPool, SingleThreadBasic) +{ + HandleHandlingPool pool(1, 32); + + EventObject e1(FALSE, TRUE); + std::atomic e1_counter(0); + + EventObject e2(FALSE, FALSE); + std::atomic e2_counter(0); + + EventObject e3(FALSE, FALSE); + std::atomic e3_counter(0); + + pool.add_handle(e1, [&e1, &e1_counter]() { if(++e1_counter < 4) { SetEvent(e1); } }); + pool.add_handle(e2, [&e2, &e2_counter]() { if(++e2_counter < 2) { SetEvent(e2); } }); + pool.add_handle(e3, [&e3_counter]() { ++e3_counter; }); + + SetEvent(e2); + + Sleep(100); + + EXPECT_EQ(e1_counter, 4); + EXPECT_EQ(e2_counter, 2); + EXPECT_EQ(e3_counter, 0); +} + +TEST(HandleHandlingPool, MultiThreadBasic) +{ + HandleHandlingPool pool(4, 32); + + EventObject e1(FALSE, TRUE); + std::atomic e1_counter(0); + + EventObject e2(FALSE, FALSE); + std::atomic e2_counter(0); + + EventObject e3(FALSE, FALSE); + std::atomic e3_counter(0); + + pool.add_handle(e1, [&e1, &e1_counter]() { if(++e1_counter < 4) { SetEvent(e1); } }); + pool.add_handle(e2, [&e2, &e2_counter]() { if(++e2_counter < 2) { SetEvent(e2); } }); + pool.add_handle(e3, [&e3_counter]() { ++e3_counter; }); + + SetEvent(e2); + + Sleep(100); + + EXPECT_EQ(e1_counter, 4); + EXPECT_EQ(e2_counter, 2); + EXPECT_EQ(e3_counter, 0); +} + +TEST(HandleHandlingPool, ThreadAssignment) +{ + HandleHandlingPool pool(4, 1); + + EventObject e1(TRUE, TRUE); + std::atomic e1_counter(0); + + EventObject e2(FALSE, TRUE); + std::atomic e2_counter(0); + + EventObject e3(FALSE, FALSE); + std::atomic e3_counter(0); + + std::mutex mutex; + mutex.lock(); + + pool.add_handle(e1, [&mutex, &e1_counter]() { ++e1_counter; mutex.lock(); mutex.unlock(); }); + pool.add_handle(e2, [&mutex, &e2, &e2_counter]() { if(++e2_counter < 20) { SetEvent(e2); } mutex.lock(); mutex.unlock(); }); + pool.add_handle(e3, [&e3_counter]() { ++e3_counter; }); + + Sleep(100); + + /* We are holding the mutex, so the e1/e2 callbacks should have been invoked once by each + * worker thread, and still be sleeping until we released the mutex. The thread allocation + * for e3 should still be free. + */ + + EXPECT_EQ(e1_counter, 4); + EXPECT_EQ(e2_counter, 4); + EXPECT_EQ(e3_counter, 0); + + SetEvent(e3); + + Sleep(100); + + /* Now e3 should have fired once and returned. */ + + EXPECT_EQ(e1_counter, 4); + EXPECT_EQ(e2_counter, 4); + EXPECT_EQ(e3_counter, 1); + + ResetEvent(e1); + mutex.unlock(); + + Sleep(100); + + /* Since we released the mutex, the pending e1/e2 callbacks should return and the e2 + * callback should keep signalling itself until it reaches 20 calls. + */ + + EXPECT_EQ(e1_counter, 4); + EXPECT_EQ(e2_counter, 20); + EXPECT_EQ(e3_counter, 1); +} + +TEST(HandleHandlingPool, RemoveHandle) +{ + HandleHandlingPool pool(4, 1); + + EventObject e1(FALSE, FALSE); + std::atomic e1_counter(0); + + EventObject e2(FALSE, FALSE); + std::atomic e2_counter(0); + + EventObject e3(FALSE, FALSE); + std::atomic e3_counter(0); + + pool.add_handle(e1, [&e1_counter]() { ++e1_counter; }); + pool.add_handle(e2, [&e2_counter]() { ++e2_counter; }); + pool.add_handle(e3, [&e3_counter]() { ++e3_counter; }); + + SetEvent(e1); + SetEvent(e2); + SetEvent(e3); + + Sleep(100); + + EXPECT_EQ(e1_counter, 1); + EXPECT_EQ(e2_counter, 1); + EXPECT_EQ(e3_counter, 1); + + pool.remove_handle(e2); + + SetEvent(e1); + SetEvent(e2); + SetEvent(e3); + + Sleep(100); + + EXPECT_EQ(e1_counter, 2); + EXPECT_EQ(e2_counter, 1); + EXPECT_EQ(e3_counter, 2); +} + +TEST(HandleHandlingPool, Stress) +{ + /* Stress test to see if we can trigger a race/crash - 4 groups of 16 handles, each with 8 + * worker threads, and each handle signalling 10,000 times. + */ + + std::vector events(64); + std::vector< std::atomic > counters(64); + + HandleHandlingPool pool(8, 16); + + for(int i = 0; i < 64; ++i) + { + pool.add_handle(events[i], [i, &counters, &events]() { if(++counters[i] < 10000) { SetEvent(events[i]); } }); + SetEvent(events[i]); + } + + /* Is 5s enough to handle 640,000 events across 32 threads? Probably! */ + Sleep(5000); + + for(int i = 0; i < 64; ++i) + { + EXPECT_EQ(counters[i], 10000); + } +}