1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Implement cancelling of queued asynchronous sends.

This commit is contained in:
Daniel Collins 2018-10-16 23:25:08 +01:00
parent 5c8f19d5a8
commit 5ccf66fceb
6 changed files with 1634 additions and 12 deletions

View File

@ -25,7 +25,8 @@ SET CPP_OBJS=^
tests/DirectPlay8Peer.obj^
tests/HandleHandlingPool.obj^
tests/PacketDeserialiser.obj^
tests/PacketSerialiser.obj
tests/PacketSerialiser.obj^
tests/SendQueue.obj
REM .obj files to be compiled from .c source files
SET C_OBJS=^
@ -54,7 +55,8 @@ SET TEST_OBJS=^
tests/DirectPlay8Peer.obj^
tests/HandleHandlingPool.obj^
tests/PacketDeserialiser.obj^
tests/PacketSerialiser.obj
tests/PacketSerialiser.obj^
tests/SendQueue.obj
SET TEST_LIBS=ws2_32.lib dxguid.lib ole32.lib

View File

@ -192,7 +192,74 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS
if(dwFlags & DPNCANCEL_PLAYER_SENDS)
{
/* Cancel sends to player ID in hAsyncHandle */
UNIMPLEMENTED("DirectPlay8Peer::CancelAsyncOperation");
if(hAsyncHandle == local_player_id)
{
/* DirectX permits cancelling pending messages to the local player, but we
* don't queue loopback messages. So just do nothing.
*/
return S_OK;
}
Peer *peer = get_peer_by_player_id(hAsyncHandle);
if(peer == NULL)
{
/* TODO: Is this the correct error? */
return DPNERR_INVALIDPLAYER;
}
for(; peer != NULL; peer = get_peer_by_player_id(hAsyncHandle))
{
/* The DPNCANCEL_PLAYER_SENDS_* flags are horrible.
*
* Each priority-specific flag includes the DPNCANCEL_PLAYER_SENDS bit, so
* we need to check if ONLY the DPNCANCEL_PLAYER_SENDS bit is set, or if
* the priority-specific bit is set.
*/
DWORD send_flags = (dwFlags &
( DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW
| DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL
| DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH));
if(send_flags == DPNCANCEL_PLAYER_SENDS || (send_flags & DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW) == DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW)
{
SendQueue::SendOp *sqop = peer->sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW);
if(sqop != NULL)
{
sqop->invoke_callback(l, DPNERR_USERCANCEL);
delete sqop;
continue;
}
}
if(send_flags == DPNCANCEL_PLAYER_SENDS || (send_flags & DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL) == DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL)
{
SendQueue::SendOp *sqop = peer->sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM);
if(sqop != NULL)
{
sqop->invoke_callback(l, DPNERR_USERCANCEL);
delete sqop;
continue;
}
}
if(send_flags == DPNCANCEL_PLAYER_SENDS || (send_flags & DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH) == DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH)
{
SendQueue::SendOp *sqop = peer->sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH);
if(sqop != NULL)
{
sqop->invoke_callback(l, DPNERR_USERCANCEL);
delete sqop;
continue;
}
}
/* No more queued sends to cancel. */
break;
}
return S_OK;
}
else if(dwFlags & (DPNCANCEL_ENUM | DPNCANCEL_CONNECT | DPNCANCEL_ALL_OPERATIONS))
{
@ -217,7 +284,24 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS
if(dwFlags & DPNCANCEL_ALL_OPERATIONS)
{
/* TODO: Cancel all sends */
for(auto p = peers.begin(); p != peers.end();)
{
Peer *peer = p->second;
SendQueue::SendOp *sqop = peer->sq.remove_queued();
if(sqop != NULL)
{
sqop->invoke_callback(l, DPNERR_USERCANCEL);
delete sqop;
/* Restart in case peers was munged. */
p = peers.begin();
}
else{
++p;
}
}
}
return S_OK;
@ -254,7 +338,39 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS
}
else if((hAsyncHandle & AsyncHandleAllocator::TYPE_MASK) == AsyncHandleAllocator::TYPE_SEND)
{
UNIMPLEMENTED("DirectPlay8Peer::CancelAsyncOperation");
/* Search the send queues for a queued send with the handle. */
SendQueue::SendOp *sqop = udp_sq.remove_queued_by_handle(hAsyncHandle);
if(udp_sq.handle_is_pending(hAsyncHandle))
{
/* Cannot cancel once message has started sending. */
return DPNERR_CANNOTCANCEL;
}
for(auto p = peers.begin(); p != peers.end() && sqop == NULL; ++p)
{
Peer *peer = p->second;
sqop = peer->sq.remove_queued_by_handle(hAsyncHandle);
if(peer->sq.handle_is_pending(hAsyncHandle))
{
/* Cannot cancel once message has started sending. */
return DPNERR_CANNOTCANCEL;
}
}
if(sqop != NULL)
{
/* Queued send was found, make it go away. */
sqop->invoke_callback(l, DPNERR_USERCANCEL);
delete sqop;
return S_OK;
}
else{
/* No pending send with that handle. */
return DPNERR_INVALIDHANDLE;
}
}
else{
/* Unrecognised handle type. */
@ -466,7 +582,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
if(dwFlags & DPNSEND_COMPLETEONPROCESS)
{
/* Not implemented yet. */
/* TODO: Implement DPNSEND_COMPLETEONPROCESS */
return DPNERR_GENERIC;
}
@ -599,8 +715,11 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
unsigned int *pending = new unsigned int(send_to_peers.size() + send_to_self);
HRESULT *result = new HRESULT(S_OK);
DPNHANDLE handle = handle_alloc.new_send();
*phAsyncHandle = handle;
auto handle_send_complete =
[this, pending, result, pvAsyncContext, dwFlags, prgBufferDesc, cBufferDesc]
[this, pending, result, pvAsyncContext, dwFlags, prgBufferDesc, cBufferDesc, handle]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{
if(s_result != S_OK && *result == S_OK)
@ -615,7 +734,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
memset(&sc, 0, sizeof(sc));
sc.dwSize = sizeof(sc);
// sc.hAsyncOp
sc.hAsyncOp = handle;
sc.pvUserContext = pvAsyncContext;
sc.hResultCode = *result;
// sc.dwSendTime
@ -639,8 +758,6 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
}
};
*phAsyncHandle = 0;
if(*pending == 0)
{
/* Horrible horrible hack to raise a DPNMSG_SEND_COMPLETE if there are no
@ -660,7 +777,7 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
for(auto pi = send_to_peers.begin(); pi != send_to_peers.end(); ++pi)
{
(*pi)->sq.send(priority, message, NULL,
(*pi)->sq.send(priority, message, NULL, handle,
[handle_send_complete]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{

View File

@ -4,13 +4,23 @@
#include "SendQueue.hpp"
void SendQueue::send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback)
void SendQueue::send(SendPriority priority, const PacketSerialiser &ps,
const struct sockaddr_in *dest_addr,
const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback)
{
send(priority, ps, dest_addr, 0, callback);
}
void SendQueue::send(SendPriority priority, const PacketSerialiser &ps,
const struct sockaddr_in *dest_addr, DPNHANDLE async_handle,
const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback)
{
std::pair<const void*, size_t> data = ps.raw_packet();
SendOp *op = new SendOp(
data.first, data.second,
(const struct sockaddr*)(dest_addr), (dest_addr != NULL ? sizeof(*dest_addr) : 0),
async_handle,
callback);
switch(priority)
@ -63,12 +73,99 @@ void SendQueue::pop_pending(SendQueue::SendOp *op)
current = NULL;
}
/* NOTE: The remove_queued() family of methods will ONLY return SendOps which
* have a nonzero async_handle. This is for cancelling application-created SendOps
* without also aborting internal ones.
*/
SendQueue::SendOp *SendQueue::remove_queued()
{
std::list<SendOp*> *queues[] = { &high_queue, &medium_queue, &low_queue };
for(int i = 0; i < 3; ++i)
{
for(auto it = queues[i]->begin(); it != queues[i]->end(); ++it)
{
SendOp *op = *it;
if(op->async_handle != 0)
{
queues[i]->erase(it);
return op;
}
}
}
return NULL;
}
SendQueue::SendOp *SendQueue::remove_queued_by_handle(DPNHANDLE async_handle)
{
std::list<SendOp*> *queues[] = { &low_queue, &medium_queue, &high_queue };
for(int i = 0; i < 3; ++i)
{
for(auto it = queues[i]->begin(); it != queues[i]->end(); ++it)
{
SendOp *op = *it;
if(op->async_handle != 0 && op->async_handle == async_handle)
{
queues[i]->erase(it);
return op;
}
}
}
return NULL;
}
SendQueue::SendOp *SendQueue::remove_queued_by_priority(SendPriority priority)
{
std::list<SendOp*> *queue;
switch(priority)
{
case SEND_PRI_LOW:
queue = &low_queue;
break;
case SEND_PRI_MEDIUM:
queue = &medium_queue;
break;
case SEND_PRI_HIGH:
queue = &high_queue;
break;
}
for(auto it = queue->begin(); it != queue->end(); ++it)
{
SendOp *op = *it;
if(op->async_handle != 0)
{
queue->erase(it);
return op;
}
}
return NULL;
}
bool SendQueue::handle_is_pending(DPNHANDLE async_handle)
{
return (current != NULL && current->async_handle == async_handle);
}
SendQueue::SendOp::SendOp(const void *data, size_t data_size,
const struct sockaddr *dest_addr, size_t dest_addr_size,
DPNHANDLE async_handle,
const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback):
data((const unsigned char*)(data), (const unsigned char*)(data) + data_size),
sent_data(0),
async_handle(async_handle),
callback(callback)
{
assert((size_t)(dest_addr_size) <= sizeof(this->dest_addr));

View File

@ -4,6 +4,7 @@
#include <winsock2.h>
#include <functional>
#include <dplay8.h>
#include <list>
#include <mutex>
#include <set>
@ -35,9 +36,12 @@ class SendQueue
std::function<void(std::unique_lock<std::mutex>&, HRESULT)> callback;
public:
const DPNHANDLE async_handle;
SendOp(
const void *data, size_t data_size,
const struct sockaddr *dest_addr, size_t dest_addr_size,
DPNHANDLE async_handle,
const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback);
std::pair<const void*, size_t> get_data() const;
@ -65,9 +69,15 @@ class SendQueue
SendQueue(const SendQueue &src) = delete;
void send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback);
void send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, DPNHANDLE async_handle, const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback);
SendOp *get_pending();
void pop_pending(SendOp *op);
SendOp *remove_queued();
SendOp *remove_queued_by_handle(DPNHANDLE async_handle);
SendOp *remove_queued_by_priority(SendPriority priority);
bool handle_is_pending(DPNHANDLE async_handle);
};
#endif /* !DPLITE_SENDQUEUE_HPP */

View File

@ -4747,6 +4747,888 @@ TEST(DirectPlay8Peer, AsyncSendToHostToNone)
testing = false;
}
TEST(DirectPlay8Peer, AsyncSendCancelByHandle)
{
DPNID p1_player_id = -1;
DPNHANDLE cancel_handle = 0;
bool got_cancel_msg = false;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&cancel_handle, &got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->hAsyncOp, cancel_handle);
EXPECT_EQ(sc->pvUserContext, (void*)(0xBCDE));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
got_cancel_msg = true;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages we don't care about... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
0
), DPNSUCCESS_PENDING);
}
/* ...hopefully stuffing up the send queue enough for us to be able to cancel THIS send
* before it goes out.
*/
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xBCDE),
&cancel_handle,
0
), DPNSUCCESS_PENDING);
ASSERT_EQ(host->CancelAsyncOperation(cancel_handle, 0), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_TRUE(got_cancel_msg);
}
TEST(DirectPlay8Peer, AsyncSendCancelPlayerSends)
{
DPNID p1_player_id = -1;
DPNHANDLE cancel_handle = 0;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&cancel_handle, &got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
0
), DPNSUCCESS_PENDING);
}
/* ...and cancel however many of them are still pending here. */
ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_TRUE(got_cancel_msg > 0);
}
TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsLow)
{
DPNID p1_player_id = -1;
DPNHANDLE cancel_handle = 0;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&cancel_handle, &got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->hAsyncOp, cancel_handle);
EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
DPNSEND_PRIORITY_HIGH
), DPNSUCCESS_PENDING);
}
/* ...and hopefully cancel this one before it goes out. */
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&cancel_handle,
DPNSEND_PRIORITY_LOW
), DPNSUCCESS_PENDING);
ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS_PRIORITY_LOW), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_EQ(got_cancel_msg, 1);
}
TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsNormal)
{
DPNID p1_player_id = -1;
DPNHANDLE cancel_handle = 0;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&cancel_handle, &got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->hAsyncOp, cancel_handle);
EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
DPNSEND_PRIORITY_HIGH
), DPNSUCCESS_PENDING);
}
/* ...and hopefully cancel this one before it goes out. */
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&cancel_handle,
0
), DPNSUCCESS_PENDING);
ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS_PRIORITY_NORMAL), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_EQ(got_cancel_msg, 1);
}
TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsHigh)
{
DPNID p1_player_id = -1;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
DPNSEND_PRIORITY_HIGH
), DPNSUCCESS_PENDING);
}
/* ...and cancel however many haven't gone out yet. */
ASSERT_EQ(host->CancelAsyncOperation(p1_player_id, DPNCANCEL_PLAYER_SENDS_PRIORITY_HIGH), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_NE(got_cancel_msg, 0);
}
TEST(DirectPlay8Peer, AsyncSendCancelPlayerSendsOtherHandle)
{
DPNID host_player_id = -1, p1_player_id = -1;
DPNHANDLE cancel_handle = 0;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&host_player_id, &cancel_handle, &got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CREATE_PLAYER)
{
DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage);
if(host_player_id == -1)
{
host_player_id = cp->dpnidPlayer;
}
}
else if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
0
), DPNSUCCESS_PENDING);
}
/* ...and cancel none of them here. */
ASSERT_EQ(host->CancelAsyncOperation(host_player_id, DPNCANCEL_PLAYER_SENDS), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_EQ(got_cancel_msg, 0);
}
TEST(DirectPlay8Peer, AsyncSendCancelAllOperations)
{
DPNID p1_player_id = -1;
DPNHANDLE cancel_handle = 0;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&cancel_handle, &got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
0
), DPNSUCCESS_PENDING);
}
/* ...and cancel however many of them are still pending here. */
ASSERT_EQ(host->CancelAsyncOperation(0, DPNCANCEL_ALL_OPERATIONS), S_OK);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_TRUE(got_cancel_msg > 0);
}
TEST(DirectPlay8Peer, AsyncSendCancelByClose)
{
DPNID p1_player_id = -1;
int got_cancel_msg = 0;
SessionHost host(APP_GUID_1, L"Session 1", PORT,
[&got_cancel_msg]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_SEND_COMPLETE)
{
DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage);
if(sc->hResultCode == DPNERR_USERCANCEL)
{
EXPECT_EQ(sc->dwSize, sizeof(*sc));
EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD));
EXPECT_EQ(sc->dwSendCompleteFlags, 0);
EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL));
EXPECT_EQ(sc->dwNumBuffers, 0);
++got_cancel_msg;
}
else if(sc->hResultCode != S_OK)
{
ADD_FAILURE() << "Unexpected hResultCode: " << sc->hResultCode;
}
}
return DPN_OK;
});
std::function<HRESULT(DWORD,PVOID)> p1_cb =
[&p1_player_id]
(DWORD dwMessageType, PVOID pMessage)
{
if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE)
{
DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage);
p1_player_id = cc->dpnidLocal;
}
return DPN_OK;
};
IDP8PeerInstance p1;
ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK);
DPN_APPLICATION_DESC connect_to_app;
memset(&connect_to_app, 0, sizeof(connect_to_app));
connect_to_app.dwSize = sizeof(connect_to_app);
connect_to_app.guidApplication = APP_GUID_1;
IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT);
ASSERT_EQ(p1->Connect(
&connect_to_app, /* pdnAppDesc */
connect_to_addr, /* pHostAddr */
NULL, /* pDeviceInfo */
NULL, /* pdnSecurity */
NULL, /* pdnCredentials */
NULL, /* pvUserConnectData */
0, /* dwUserConnectDataSize */
NULL, /* pvPlayerContext */
NULL, /* pvAsyncContext */
NULL, /* phAsyncHandle */
DPNCONNECT_SYNC /* dwFlags */
), S_OK);
/* Give everything a moment to settle. */
Sleep(250);
DPN_BUFFER_DESC bd[] = {
{ 12, (BYTE*)("Hello, world") },
};
/* Queue a load of messages... */
for(int i = 0; i < 1000; ++i)
{
DPNHANDLE send_handle;
ASSERT_EQ(host->SendTo(
p1_player_id,
bd,
1,
0,
(void*)(0xABCD),
&send_handle,
0
), DPNSUCCESS_PENDING);
}
/* ...and cancel however many of them are still pending here. */
host->Close(DPNCLOSE_IMMEDIATE);
/* Wait for the send buffer to clear out. */
Sleep(1000);
EXPECT_TRUE(got_cancel_msg > 0);
}
TEST(DirectPlay8Peer, SyncSendToPeerToHost)
{
std::atomic<bool> testing(false);

514
tests/SendQueue.cpp Normal file
View File

@ -0,0 +1,514 @@
#include <winsock2.h>
#include <gtest/gtest.h>
#include <windows.h>
#include "../src/EventObject.hpp"
#include "../src/packet.hpp"
#include "../src/SendQueue.hpp"
class SendQueueTest: public ::testing::Test {
protected:
EventObject event;
SendQueue sq;
SendQueueTest(): sq(event) {}
~SendQueueTest() {}
bool event_signalled()
{
return (WaitForSingleObject(event, 0) == WAIT_OBJECT_0);
}
uint32_t sqop_ptype(SendQueue::SendOp *sqop)
{
std::pair<const void*, size_t> sqop_data = sqop->get_data();
PacketDeserialiser pd(sqop_data.first, sqop_data.second);
return pd.packet_type();
}
};
TEST_F(SendQueueTest, SendSingleLow)
{
EXPECT_FALSE(event_signalled());
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(0), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
/* Event should signal once after calling send() */
EXPECT_TRUE(event_signalled());
EXPECT_FALSE(event_signalled());
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 0);
sq.pop_pending(sqop);
delete sqop;
}
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, SendMultiLow)
{
EXPECT_FALSE(event_signalled());
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(2), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
/* Event should signal once after each batch of calls to send() */
EXPECT_TRUE(event_signalled());
EXPECT_FALSE(event_signalled());
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(0), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
/* Event should signal once after each batch of calls to send() */
EXPECT_TRUE(event_signalled());
EXPECT_FALSE(event_signalled());
/* Check we get the messages in the right order. */
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 1);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 2);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 0);
sq.pop_pending(sqop);
delete sqop;
}
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, SendMultiMedium)
{
EXPECT_FALSE(event_signalled());
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(1), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
/* Event should signal once after each batch of calls to send() */
EXPECT_TRUE(event_signalled());
EXPECT_FALSE(event_signalled());
/* Check we get the messages in the right order. */
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 1);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 2);
sq.pop_pending(sqop);
delete sqop;
}
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, SendMultiHigh)
{
EXPECT_FALSE(event_signalled());
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(1), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(2), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
/* Event should signal once after each batch of calls to send() */
EXPECT_TRUE(event_signalled());
EXPECT_FALSE(event_signalled());
/* Check we get the messages in the right order. */
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 1);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 2);
sq.pop_pending(sqop);
delete sqop;
}
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, SendMultiPriorities)
{
EXPECT_FALSE(event_signalled());
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(4), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(5), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(6), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
/* Event should signal once after each batch of calls to send() */
EXPECT_TRUE(event_signalled());
EXPECT_FALSE(event_signalled());
/* Check we get the messages in the right order. */
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 3);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 6);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 2);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 5);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 1);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), sqop);
EXPECT_EQ(sqop_ptype(sqop), 4);
sq.pop_pending(sqop);
delete sqop;
}
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, RemoveQueued)
{
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
{
SendQueue::SendOp *sqop = sq.remove_queued();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 3);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.remove_queued();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 2);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.remove_queued();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 1);
delete sqop;
}
EXPECT_EQ(sq.remove_queued(), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, RemoveQueuedPending)
{
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 3);
sq.pop_pending(sqop);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.remove_queued();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 2);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.get_pending();
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 1);
sq.pop_pending(sqop);
delete sqop;
}
EXPECT_EQ(sq.remove_queued(), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.get_pending(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, RemoveQueuedNoHandle)
{
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
EXPECT_EQ(sq.remove_queued(), (SendQueue::SendOp*)(NULL));
}
TEST_F(SendQueueTest, RemoveQueuedByHandle)
{
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
{
SendQueue::SendOp *sqop = sq.remove_queued_by_handle(1);
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 1);
delete sqop;
EXPECT_EQ(sq.remove_queued_by_handle(1), (SendQueue::SendOp*)(NULL));
}
{
SendQueue::SendOp *sqop = sq.remove_queued_by_handle(2);
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 2);
delete sqop;
EXPECT_EQ(sq.remove_queued_by_handle(2), (SendQueue::SendOp*)(NULL));
}
{
SendQueue::SendOp *sqop = sq.remove_queued_by_handle(3);
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 3);
delete sqop;
EXPECT_EQ(sq.remove_queued_by_handle(3), (SendQueue::SendOp*)(NULL));
}
}
TEST_F(SendQueueTest, RemoveQueuedByPriority)
{
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL, 1,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL, 2,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_HIGH, PacketSerialiser(3), NULL, 3,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
{
SendQueue::SendOp *sqop = sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW);
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 1);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM);
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 2);
delete sqop;
}
{
SendQueue::SendOp *sqop = sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH);
ASSERT_NE(sqop, (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sqop_ptype(sqop), 3);
delete sqop;
}
}
TEST_F(SendQueueTest, RemoveQueuedByPriorityNoHandle)
{
sq.send(SendQueue::SEND_PRI_LOW, PacketSerialiser(1), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(2), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
sq.send(SendQueue::SEND_PRI_MEDIUM, PacketSerialiser(3), NULL,
[](std::unique_lock<std::mutex> &l, HRESULT result) { return 0; });
EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_LOW), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_MEDIUM), (SendQueue::SendOp*)(NULL));
EXPECT_EQ(sq.remove_queued_by_priority(SendQueue::SEND_PRI_HIGH), (SendQueue::SendOp*)(NULL));
}