From 5ccf66fceb1984d54188e57b3fc341fdb8de5b55 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 16 Oct 2018 23:25:08 +0100 Subject: [PATCH] Implement cancelling of queued asynchronous sends. --- build.bat | 6 +- src/DirectPlay8Peer.cpp | 135 +++++- src/SendQueue.cpp | 99 ++++- src/SendQueue.hpp | 10 + tests/DirectPlay8Peer.cpp | 882 ++++++++++++++++++++++++++++++++++++++ tests/SendQueue.cpp | 514 ++++++++++++++++++++++ 6 files changed, 1634 insertions(+), 12 deletions(-) create mode 100644 tests/SendQueue.cpp diff --git a/build.bat b/build.bat index cd29202..cc87753 100644 --- a/build.bat +++ b/build.bat @@ -25,7 +25,8 @@ SET CPP_OBJS=^ tests/DirectPlay8Peer.obj^ tests/HandleHandlingPool.obj^ tests/PacketDeserialiser.obj^ - tests/PacketSerialiser.obj + tests/PacketSerialiser.obj^ + tests/SendQueue.obj REM .obj files to be compiled from .c source files SET C_OBJS=^ @@ -54,7 +55,8 @@ SET TEST_OBJS=^ tests/DirectPlay8Peer.obj^ tests/HandleHandlingPool.obj^ tests/PacketDeserialiser.obj^ - tests/PacketSerialiser.obj + tests/PacketSerialiser.obj^ + tests/SendQueue.obj SET TEST_LIBS=ws2_32.lib dxguid.lib ole32.lib diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index 661a6dc..c2f4431 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -192,7 +192,74 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS if(dwFlags & DPNCANCEL_PLAYER_SENDS) { /* Cancel sends to player ID in hAsyncHandle */ - UNIMPLEMENTED("DirectPlay8Peer::CancelAsyncOperation"); + + if(hAsyncHandle == local_player_id) + { + /* DirectX permits cancelling pending messages to the local player, but we + * don't queue loopback messages. So just do nothing. + */ + return S_OK; + } + + Peer *peer = get_peer_by_player_id(hAsyncHandle); + if(peer == NULL) + { + /* TODO: Is this the correct error? */ + return DPNERR_INVALIDPLAYER; + } + + for(; peer != NULL; peer = get_peer_by_player_id(hAsyncHandle)) + { + /* The DPNCANCEL_PLAYER_SENDS_* flags are horrible. + * + * Each priority-specific flag includes the DPNCANCEL_PLAYER_SENDS bit, so + * we need to check if ONLY the DPNCANCEL_PLAYER_SENDS bit is set, or if + * the priority-specific bit is set. + */ + + DWORD send_flags = (dwFlags & + ( DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW + | DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL + | DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH)); + + if(send_flags == DPNCANCEL_PLAYER_SENDS || (send_flags & DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW) == DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW) + { + SendQueue::SendOp *sqop = peer->sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW); + if(sqop != NULL) + { + sqop->invoke_callback(l, DPNERR_USERCANCEL); + delete sqop; + continue; + } + } + + if(send_flags == DPNCANCEL_PLAYER_SENDS || (send_flags & DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL) == DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL) + { + SendQueue::SendOp *sqop = peer->sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM); + if(sqop != NULL) + { + sqop->invoke_callback(l, DPNERR_USERCANCEL); + delete sqop; + continue; + } + } + + if(send_flags == DPNCANCEL_PLAYER_SENDS || (send_flags & DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH) == DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH) + { + SendQueue::SendOp *sqop = peer->sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH); + if(sqop != NULL) + { + sqop->invoke_callback(l, DPNERR_USERCANCEL); + delete sqop; + continue; + } + } + + /* No more queued sends to cancel. */ + break; + } + + return S_OK; } else if(dwFlags & (DPNCANCEL_ENUM | DPNCANCEL_CONNECT | DPNCANCEL_ALL_OPERATIONS)) { @@ -217,7 +284,24 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS if(dwFlags & DPNCANCEL_ALL_OPERATIONS) { - /* TODO: Cancel all sends */ + for(auto p = peers.begin(); p != peers.end();) + { + Peer *peer = p->second; + + SendQueue::SendOp *sqop = peer->sq.remove_queued(); + + if(sqop != NULL) + { + sqop->invoke_callback(l, DPNERR_USERCANCEL); + delete sqop; + + /* Restart in case peers was munged. */ + p = peers.begin(); + } + else{ + ++p; + } + } } return S_OK; @@ -254,7 +338,39 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS } else if((hAsyncHandle & AsyncHandleAllocator::TYPE_MASK) == AsyncHandleAllocator::TYPE_SEND) { - UNIMPLEMENTED("DirectPlay8Peer::CancelAsyncOperation"); + /* Search the send queues for a queued send with the handle. */ + + SendQueue::SendOp *sqop = udp_sq.remove_queued_by_handle(hAsyncHandle); + if(udp_sq.handle_is_pending(hAsyncHandle)) + { + /* Cannot cancel once message has started sending. */ + return DPNERR_CANNOTCANCEL; + } + + for(auto p = peers.begin(); p != peers.end() && sqop == NULL; ++p) + { + Peer *peer = p->second; + + sqop = peer->sq.remove_queued_by_handle(hAsyncHandle); + if(peer->sq.handle_is_pending(hAsyncHandle)) + { + /* Cannot cancel once message has started sending. */ + return DPNERR_CANNOTCANCEL; + } + } + + if(sqop != NULL) + { + /* Queued send was found, make it go away. */ + sqop->invoke_callback(l, DPNERR_USERCANCEL); + delete sqop; + + return S_OK; + } + else{ + /* No pending send with that handle. */ + return DPNERR_INVALIDHANDLE; + } } else{ /* Unrecognised handle type. */ @@ -466,7 +582,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST if(dwFlags & DPNSEND_COMPLETEONPROCESS) { - /* Not implemented yet. */ + /* TODO: Implement DPNSEND_COMPLETEONPROCESS */ return DPNERR_GENERIC; } @@ -599,8 +715,11 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST unsigned int *pending = new unsigned int(send_to_peers.size() + send_to_self); HRESULT *result = new HRESULT(S_OK); + DPNHANDLE handle = handle_alloc.new_send(); + *phAsyncHandle = handle; + auto handle_send_complete = - [this, pending, result, pvAsyncContext, dwFlags, prgBufferDesc, cBufferDesc] + [this, pending, result, pvAsyncContext, dwFlags, prgBufferDesc, cBufferDesc, handle] (std::unique_lock &l, HRESULT s_result) { if(s_result != S_OK && *result == S_OK) @@ -615,7 +734,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST memset(&sc, 0, sizeof(sc)); sc.dwSize = sizeof(sc); - // sc.hAsyncOp + sc.hAsyncOp = handle; sc.pvUserContext = pvAsyncContext; sc.hResultCode = *result; // sc.dwSendTime @@ -639,8 +758,6 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST } }; - *phAsyncHandle = 0; - if(*pending == 0) { /* Horrible horrible hack to raise a DPNMSG_SEND_COMPLETE if there are no @@ -660,7 +777,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST for(auto pi = send_to_peers.begin(); pi != send_to_peers.end(); ++pi) { - (*pi)->sq.send(priority, message, NULL, + (*pi)->sq.send(priority, message, NULL, handle, [handle_send_complete] (std::unique_lock &l, HRESULT s_result) { diff --git a/src/SendQueue.cpp b/src/SendQueue.cpp index 5e69e4c..8c06354 100644 --- a/src/SendQueue.cpp +++ b/src/SendQueue.cpp @@ -4,13 +4,23 @@ #include "SendQueue.hpp" -void SendQueue::send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function&, HRESULT)> &callback) +void SendQueue::send(SendPriority priority, const PacketSerialiser &ps, + const struct sockaddr_in *dest_addr, + const std::function&, HRESULT)> &callback) +{ + send(priority, ps, dest_addr, 0, callback); +} + +void SendQueue::send(SendPriority priority, const PacketSerialiser &ps, + const struct sockaddr_in *dest_addr, DPNHANDLE async_handle, + const std::function&, HRESULT)> &callback) { std::pair data = ps.raw_packet(); SendOp *op = new SendOp( data.first, data.second, (const struct sockaddr*)(dest_addr), (dest_addr != NULL ? sizeof(*dest_addr) : 0), + async_handle, callback); switch(priority) @@ -63,12 +73,99 @@ void SendQueue::pop_pending(SendQueue::SendOp *op) current = NULL; } +/* NOTE: The remove_queued() family of methods will ONLY return SendOps which + * have a nonzero async_handle. This is for cancelling application-created SendOps + * without also aborting internal ones. +*/ + +SendQueue::SendOp *SendQueue::remove_queued() +{ + std::list *queues[] = { &high_queue, &medium_queue, &low_queue }; + + for(int i = 0; i < 3; ++i) + { + for(auto it = queues[i]->begin(); it != queues[i]->end(); ++it) + { + SendOp *op = *it; + + if(op->async_handle != 0) + { + queues[i]->erase(it); + return op; + } + } + } + + return NULL; +} + +SendQueue::SendOp *SendQueue::remove_queued_by_handle(DPNHANDLE async_handle) +{ + std::list *queues[] = { &low_queue, &medium_queue, &high_queue }; + + for(int i = 0; i < 3; ++i) + { + for(auto it = queues[i]->begin(); it != queues[i]->end(); ++it) + { + SendOp *op = *it; + + if(op->async_handle != 0 && op->async_handle == async_handle) + { + queues[i]->erase(it); + return op; + } + } + } + + return NULL; +} + +SendQueue::SendOp *SendQueue::remove_queued_by_priority(SendPriority priority) +{ + std::list *queue; + + switch(priority) + { + case SEND_PRI_LOW: + queue = &low_queue; + break; + + case SEND_PRI_MEDIUM: + queue = &medium_queue; + break; + + case SEND_PRI_HIGH: + queue = &high_queue; + break; + } + + for(auto it = queue->begin(); it != queue->end(); ++it) + { + SendOp *op = *it; + + if(op->async_handle != 0) + { + queue->erase(it); + return op; + } + } + + return NULL; +} + +bool SendQueue::handle_is_pending(DPNHANDLE async_handle) +{ + return (current != NULL && current->async_handle == async_handle); +} + SendQueue::SendOp::SendOp(const void *data, size_t data_size, const struct sockaddr *dest_addr, size_t dest_addr_size, + DPNHANDLE async_handle, const std::function&, HRESULT)> &callback): data((const unsigned char*)(data), (const unsigned char*)(data) + data_size), sent_data(0), + async_handle(async_handle), callback(callback) { assert((size_t)(dest_addr_size) <= sizeof(this->dest_addr)); diff --git a/src/SendQueue.hpp b/src/SendQueue.hpp index 9a68631..49c51c4 100644 --- a/src/SendQueue.hpp +++ b/src/SendQueue.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -35,9 +36,12 @@ class SendQueue std::function&, HRESULT)> callback; public: + const DPNHANDLE async_handle; + SendOp( const void *data, size_t data_size, const struct sockaddr *dest_addr, size_t dest_addr_size, + DPNHANDLE async_handle, const std::function&, HRESULT)> &callback); std::pair get_data() const; @@ -65,9 +69,15 @@ class SendQueue SendQueue(const SendQueue &src) = delete; void send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function&, HRESULT)> &callback); + void send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, DPNHANDLE async_handle, const std::function&, HRESULT)> &callback); SendOp *get_pending(); void pop_pending(SendOp *op); + + SendOp *remove_queued(); + SendOp *remove_queued_by_handle(DPNHANDLE async_handle); + SendOp *remove_queued_by_priority(SendPriority priority); + bool handle_is_pending(DPNHANDLE async_handle); }; #endif /* !DPLITE_SENDQUEUE_HPP */ diff --git a/tests/DirectPlay8Peer.cpp b/tests/DirectPlay8Peer.cpp index f94674e..18cf7fd 100644 --- a/tests/DirectPlay8Peer.cpp +++ b/tests/DirectPlay8Peer.cpp @@ -4747,6 +4747,888 @@ TEST(DirectPlay8Peer, AsyncSendToHostToNone) testing = false; } +TEST(DirectPlay8Peer, AsyncSendCancelByHandle) +{ + DPNID p1_player_id = -1; + + DPNHANDLE cancel_handle = 0; + bool got_cancel_msg = false; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&cancel_handle, &got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, cancel_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xBCDE)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + got_cancel_msg = true; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages we don't care about... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + } + + /* ...hopefully stuffing up the send queue enough for us to be able to cancel THIS send + * before it goes out. + */ + + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xBCDE), + &cancel_handle, + 0 + ), DPNSUCCESS_PENDING); + + ASSERT_EQ(host->CancelAsyncOperation(cancel_handle, 0), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_TRUE(got_cancel_msg); +} + +TEST(DirectPlay8Peer, AsyncSendCancelPlayerSends) +{ + DPNID p1_player_id = -1; + + DPNHANDLE cancel_handle = 0; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&cancel_handle, &got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + } + + /* ...and cancel however many of them are still pending here. */ + + ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_TRUE(got_cancel_msg > 0); +} + +TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsLow) +{ + DPNID p1_player_id = -1; + + DPNHANDLE cancel_handle = 0; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&cancel_handle, &got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, cancel_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + DPNSEND_PRIORITY_HIGH + ), DPNSUCCESS_PENDING); + } + + /* ...and hopefully cancel this one before it goes out. */ + + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &cancel_handle, + DPNSEND_PRIORITY_LOW + ), DPNSUCCESS_PENDING); + + ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_EQ(got_cancel_msg, 1); +} + +TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsNormal) +{ + DPNID p1_player_id = -1; + + DPNHANDLE cancel_handle = 0; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&cancel_handle, &got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, cancel_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + DPNSEND_PRIORITY_HIGH + ), DPNSUCCESS_PENDING); + } + + /* ...and hopefully cancel this one before it goes out. */ + + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &cancel_handle, + 0 + ), DPNSUCCESS_PENDING); + + ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_EQ(got_cancel_msg, 1); +} + +TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsHigh) +{ + DPNID p1_player_id = -1; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + DPNSEND_PRIORITY_HIGH + ), DPNSUCCESS_PENDING); + } + + /* ...and cancel however many haven't gone out yet. */ + + ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_NE(got_cancel_msg, 0); +} + +TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsOtherHandle) +{ + DPNID host_player_id = -1, p1_player_id = -1; + + DPNHANDLE cancel_handle = 0; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&host_player_id, &cancel_handle, &got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + + if(host_player_id == -1) + { + host_player_id = cp->dpnidPlayer; + } + } + else if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + } + + /* ...and cancel none of them here. */ + + ASSERT_EQ(host->CancelAsyncOperation(host_player_id, DPNCANCEL_PLAYER_SENDS), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_EQ(got_cancel_msg, 0); +} + +TEST(DirectPlay8Peer, AsyncSendCancelAllOperations) +{ + DPNID p1_player_id = -1; + + DPNHANDLE cancel_handle = 0; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&cancel_handle, &got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + } + + /* ...and cancel however many of them are still pending here. */ + + ASSERT_EQ(host->CancelAsyncOperation(0, DPNCANCEL_ALL_OPERATIONS), S_OK); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_TRUE(got_cancel_msg > 0); +} + +TEST(DirectPlay8Peer, AsyncSendCancelByClose) +{ + DPNID p1_player_id = -1; + int got_cancel_msg = 0; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&got_cancel_msg] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + if(sc->hResultCode == DPNERR_USERCANCEL) + { + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + + ++got_cancel_msg; + } + else if(sc->hResultCode != S_OK) + { + ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode; + } + } + + return DPN_OK; + }); + + std::function p1_cb = + [&p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + /* Queue a load of messages... */ + + for(int i = 0; i < 1000; ++i) + { + DPNHANDLE send_handle; + ASSERT_EQ(host->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + } + + /* ...and cancel however many of them are still pending here. */ + + host->Close(DPNCLOSE_IMMEDIATE); + + /* Wait for the send buffer to clear out. */ + Sleep(1000); + + EXPECT_TRUE(got_cancel_msg > 0); +} + TEST(DirectPlay8Peer, SyncSendToPeerToHost) { std::atomic testing(false); diff --git a/tests/SendQueue.cpp b/tests/SendQueue.cpp new file mode 100644 index 0000000..966ae4d --- /dev/null +++ b/tests/SendQueue.cpp @@ -0,0 +1,514 @@ +#include +#include +#include + +#include "../src/EventObject.hpp" +#include "../src/packet.hpp" +#include "../src/SendQueue.hpp" + +class SendQueueTest: public ::testing::Test { + protected: + EventObject event; + SendQueue sq; + + SendQueueTest(): sq(event) {} + ~SendQueueTest() {} + + bool event_signalled() + { + return (WaitForSingleObject(event, 0) == WAIT_OBJECT_0); + } + + uint32_t sqop_ptype(SendQueue::SendOp *sqop) + { + std::pair sqop_data = sqop->get_data(); + + PacketDeserialiser pd(sqop_data.first, sqop_data.second); + return pd.packet_type(); + } +}; + +TEST_F(SendQueueTest, SendSingleLow) +{ + EXPECT_FALSE(event_signalled()); + + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(0), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + /* Event should signal once after calling send() */ + EXPECT_TRUE(event_signalled()); + EXPECT_FALSE(event_signalled()); + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 0); + + sq.pop_pending(sqop); + delete sqop; + } + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, SendMultiLow) +{ + EXPECT_FALSE(event_signalled()); + + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(2), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + /* Event should signal once after each batch of calls to send() */ + EXPECT_TRUE(event_signalled()); + EXPECT_FALSE(event_signalled()); + + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(0), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + /* Event should signal once after each batch of calls to send() */ + EXPECT_TRUE(event_signalled()); + EXPECT_FALSE(event_signalled()); + + /* Check we get the messages in the right order. */ + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 0); + + sq.pop_pending(sqop); + delete sqop; + } + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, SendMultiMedium) +{ + EXPECT_FALSE(event_signalled()); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(1), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + /* Event should signal once after each batch of calls to send() */ + EXPECT_TRUE(event_signalled()); + EXPECT_FALSE(event_signalled()); + + /* Check we get the messages in the right order. */ + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + sq.pop_pending(sqop); + delete sqop; + } + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, SendMultiHigh) +{ + EXPECT_FALSE(event_signalled()); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(1), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(2), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + /* Event should signal once after each batch of calls to send() */ + EXPECT_TRUE(event_signalled()); + EXPECT_FALSE(event_signalled()); + + /* Check we get the messages in the right order. */ + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + sq.pop_pending(sqop); + delete sqop; + } + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, SendMultiPriorities) +{ + EXPECT_FALSE(event_signalled()); + + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(4), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(5), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(6), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + /* Event should signal once after each batch of calls to send() */ + EXPECT_TRUE(event_signalled()); + EXPECT_FALSE(event_signalled()); + + /* Check we get the messages in the right order. */ + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 3); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 6); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 5); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), sqop); + + EXPECT_EQ(sqop_ptype(sqop), 4); + + sq.pop_pending(sqop); + delete sqop; + } + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, RemoveQueued) +{ + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + { + SendQueue::SendOp *sqop = sq.remove_queued(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 3); + + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.remove_queued(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.remove_queued(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + delete sqop; + } + + EXPECT_EQ(sq.remove_queued(), (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, RemoveQueuedPending) +{ + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 3); + + sq.pop_pending(sqop); + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.remove_queued(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.get_pending(); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + sq.pop_pending(sqop); + delete sqop; + } + + EXPECT_EQ(sq.remove_queued(), (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, RemoveQueuedNoHandle) +{ + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + EXPECT_EQ(sq.remove_queued(), (SendQueue::SendOp*)(NULL)); +} + +TEST_F(SendQueueTest, RemoveQueuedByHandle) +{ + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + { + SendQueue::SendOp *sqop = sq.remove_queued_by_handle(1); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + delete sqop; + + EXPECT_EQ(sq.remove_queued_by_handle(1), (SendQueue::SendOp*)(NULL)); + } + + { + SendQueue::SendOp *sqop = sq.remove_queued_by_handle(2); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + delete sqop; + + EXPECT_EQ(sq.remove_queued_by_handle(2), (SendQueue::SendOp*)(NULL)); + } + + { + SendQueue::SendOp *sqop = sq.remove_queued_by_handle(3); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 3); + + delete sqop; + + EXPECT_EQ(sq.remove_queued_by_handle(3), (SendQueue::SendOp*)(NULL)); + } +} + +TEST_F(SendQueueTest, RemoveQueuedByPriority) +{ + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + { + SendQueue::SendOp *sqop = sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW), (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 1); + + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM), (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 2); + + delete sqop; + } + + { + SendQueue::SendOp *sqop = sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH); + ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH), (SendQueue::SendOp*)(NULL)); + + EXPECT_EQ(sqop_ptype(sqop), 3); + + delete sqop; + } +} + +TEST_F(SendQueueTest, RemoveQueuedByPriorityNoHandle) +{ + sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(3), NULL, + [](std::unique_lock &l, HRESULT result) { return 0; }); + + EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW), (SendQueue::SendOp*)(NULL)); + EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM), (SendQueue::SendOp*)(NULL)); + EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH), (SendQueue::SendOp*)(NULL)); +}