diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index b794c56..c46d591 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -342,6 +342,12 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST case STATE_CONNECTED: break; } + if(dwFlags & DPNSEND_COMPLETEONPROCESS) + { + /* Not implemented yet. */ + return DPNERR_GENERIC; + } + std::vector payload; for(DWORD i = 0; i < cBufferDesc; ++i) @@ -368,45 +374,120 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST priority = SendQueue::SEND_PRI_LOW; } - Peer *target_peer = get_peer_by_player_id(dpnid); - if(target_peer == NULL) + std::list send_to_peers; + bool send_to_self = false; + + if(dpnid == DPNID_ALL_PLAYERS_GROUP) { - return DPNERR_INVALIDPLAYER; + if(!(dwFlags & DPNSEND_NOLOOPBACK)) + { + send_to_self = true; + } + + for(auto pi = peers.begin(); pi != peers.end(); ++pi) + { + if(pi->second->state == Peer::PS_CONNECTED) + { + send_to_peers.push_back(pi->second); + } + } + } + else{ + if(dpnid == local_player_id) + { + send_to_self = true; + } + else{ + Peer *target_peer = get_peer_by_player_id(dpnid); + if(target_peer == NULL) + { + return DPNERR_INVALIDPLAYER; + } + + send_to_peers.push_back(target_peer); + } } if(dwFlags & DPNSEND_SYNC) { - bool done = false; + unsigned int pending = send_to_peers.size(); std::mutex d_mutex; std::condition_variable d_cv; - HRESULT result; + HRESULT result = S_OK; - target_peer->sq.send(priority, message, NULL, - [&done, &d_mutex, &d_cv, &result] - (std::unique_lock &l, HRESULT s_result) + for(auto pi = send_to_peers.begin(); pi != send_to_peers.end(); ++pi) + { + (*pi)->sq.send(priority, message, NULL, + [&pending, &d_mutex, &d_cv, &result] + (std::unique_lock &l, HRESULT s_result) + { + if(s_result != S_OK && result == S_OK) + { + /* Error code from the first failure wins. */ + result = s_result; + } + + std::unique_lock dl(d_mutex); + + if(--pending == 0) + { + dl.unlock(); + d_cv.notify_one(); + } + }); + } + + if(send_to_self) + { + /* TODO: Should the processing of this block a DPNSEND_SYNC send? */ + + unsigned char *payload_copy = new unsigned char[payload.size()]; + memcpy(payload_copy, payload.data(), payload.size()); + + DPNMSG_RECEIVE r; + memset(&r, 0, sizeof(r)); + + r.dwSize = sizeof(r); + r.dpnidSender = local_player_id; + r.pvPlayerContext = local_player_ctx; + r.pReceiveData = payload_copy; + r.dwReceiveDataSize = payload.size(); + r.hBufferHandle = (DPNHANDLE)(payload_copy); + r.dwReceiveFlags = (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0) + | (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0); + + l.unlock(); + + HRESULT r_result = message_handler(message_handler_ctx, DPN_MSGID_RECEIVE, &r); + if(r_result != DPNSUCCESS_PENDING) { - result = s_result; - - std::unique_lock dl(d_mutex); - done = true; - dl.unlock(); - - d_cv.notify_one(); - }); - - l.unlock(); + delete[] payload_copy; + } + } + else{ + l.unlock(); + } std::unique_lock dl(d_mutex); - d_cv.wait(dl, [&done]() { return done; }); + d_cv.wait(dl, [&pending]() { return (pending == 0); }); return result; } else{ - *phAsyncHandle = 0; + unsigned int *pending = new unsigned int(send_to_peers.size() + send_to_self); + HRESULT *result = new HRESULT(S_OK); - target_peer->sq.send(priority, message, NULL, - [this, dwFlags, pvAsyncContext, prgBufferDesc, cBufferDesc] + auto handle_send_complete = + [this, pending, result, pvAsyncContext, dwFlags, prgBufferDesc, cBufferDesc] (std::unique_lock &l, HRESULT s_result) + { + if(s_result != S_OK && *result == S_OK) + { + /* Error code from the first failure wins. */ + *result = s_result; + } + + if(--(*pending) == 0) { DPNMSG_SEND_COMPLETE sc; memset(&sc, 0, sizeof(sc)); @@ -414,13 +495,12 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST sc.dwSize = sizeof(sc); // sc.hAsyncOp sc.pvUserContext = pvAsyncContext; - sc.hResultCode = s_result; + sc.hResultCode = *result; // sc.dwSendTime // sc.dwFirstFrameRTT // sc.dwFirstRetryCount - sc.dwSendCompleteFlags = - (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0) - | (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0); + sc.dwSendCompleteFlags = (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0) + | (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0); if(dwFlags & DPNSEND_NOCOPY) { @@ -428,10 +508,83 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST sc.dwNumBuffers = cBufferDesc; } + delete result; + delete pending; + l.unlock(); message_handler(message_handler_ctx, DPN_MSGID_SEND_COMPLETE, &sc); l.lock(); + } + }; + + *phAsyncHandle = 0; + + if(*pending == 0) + { + /* Horrible horrible hack to raise a DPNMSG_SEND_COMPLETE if there are no + * targets for the message. + */ + + ++(*pending); + + std::thread t([this, handle_send_complete]() + { + std::unique_lock l(lock); + handle_send_complete(l, S_OK); }); + + t.detach(); + } + + for(auto pi = send_to_peers.begin(); pi != send_to_peers.end(); ++pi) + { + (*pi)->sq.send(priority, message, NULL, + [handle_send_complete] + (std::unique_lock &l, HRESULT s_result) + { + handle_send_complete(l, s_result); + }); + } + + if(send_to_self) + { + size_t payload_size = payload.size(); + + unsigned char *payload_copy = new unsigned char[payload_size]; + memcpy(payload_copy, payload.data(), payload_size); + + /* TODO: Do this in a properly managed worker thread. */ + + std::thread t([this, payload_size, payload_copy, handle_send_complete, dwFlags]() + { + std::unique_lock l(lock); + + DPNMSG_RECEIVE r; + memset(&r, 0, sizeof(r)); + + r.dwSize = sizeof(r); + r.dpnidSender = local_player_id; + r.pvPlayerContext = local_player_ctx; + r.pReceiveData = payload_copy; + r.dwReceiveDataSize = payload_size; + r.hBufferHandle = (DPNHANDLE)(payload_copy); + r.dwReceiveFlags = (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0) + | (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0); + + l.unlock(); + HRESULT r_result = message_handler(message_handler_ctx, DPN_MSGID_RECEIVE, &r); + l.lock(); + + if(r_result != DPNSUCCESS_PENDING) + { + delete[] payload_copy; + } + + handle_send_complete(l, S_OK); + }); + + t.detach(); + } return DPNSUCCESS_PENDING; } diff --git a/tests/DirectPlay8Peer.cpp b/tests/DirectPlay8Peer.cpp index 0687cb6..33510b7 100644 --- a/tests/DirectPlay8Peer.cpp +++ b/tests/DirectPlay8Peer.cpp @@ -1593,7 +1593,7 @@ TEST(DirectPlay8Peer, ConnectAsyncFail) EXPECT_EQ(p1_seq, 1); } -TEST(DirectPlay8Peer, SendToPeerToHost) +TEST(DirectPlay8Peer, AsyncSendToPeerToHost) { std::atomic testing(false); @@ -1759,7 +1759,503 @@ TEST(DirectPlay8Peer, SendToPeerToHost) testing = false; } -TEST(DirectPlay8Peer, SendToHostToPeer) +TEST(DirectPlay8Peer, AsyncSendToPeerToSelf) +{ + std::atomic testing(false); + + std::atomic host_seq(0), p1_seq(0); + DPNID host_player_id = -1, p1_player_id = -1; + DPNHANDLE send_handle; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER && host_player_id == -1) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + host_player_id = cp->dpnidPlayer; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + std::function p1_cb = + [&testing, &p1_seq, &p1_player_id, &send_handle] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++p1_seq; + + switch(seq) + { + case 1: + case 2: + { + EXPECT_TRUE(dwMessageType == DPN_MSGID_SEND_COMPLETE || dwMessageType == DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, send_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->hResultCode, DPN_OK); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + } + else if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(NULL)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(p1->SendTo( + p1_player_id, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 0); + EXPECT_EQ(p1_seq, 2); + + testing = false; +} + +TEST(DirectPlay8Peer, AsyncSendToPeerToAll) +{ + std::atomic testing(false); + + std::atomic host_seq(0), p1_seq(0); + DPNID host_player_id = -1, p1_player_id = -1; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + + if(host_player_id == -1) + { + host_player_id = cp->dpnidPlayer; + cp->pvPlayerContext = (void*)(0x0001); + } + else{ + cp->pvPlayerContext = (void*)(0x0002); + } + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(0x0002)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + DPNHANDLE send_handle; + + std::function p1_cb = + [&testing, &p1_seq, &p1_player_id, &send_handle] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++p1_seq; + + switch(seq) + { + case 1: + case 2: + { + EXPECT_TRUE(dwMessageType == DPN_MSGID_SEND_COMPLETE || dwMessageType == DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, send_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->hResultCode, DPN_OK); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + } + else if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(NULL)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(p1->SendTo( + DPNID_ALL_PLAYERS_GROUP, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + 0 + ), DPNSUCCESS_PENDING); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 1); + EXPECT_EQ(p1_seq, 2); + + testing = false; +} + +TEST(DirectPlay8Peer, AsyncSendToPeerToAllButSelf) +{ + std::atomic testing(false); + + std::atomic host_seq(0), p1_seq(0); + DPNID host_player_id = -1, p1_player_id = -1; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + + if(host_player_id == -1) + { + host_player_id = cp->dpnidPlayer; + cp->pvPlayerContext = (void*)(0x0001); + } + else{ + cp->pvPlayerContext = (void*)(0x0002); + } + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(0x0002)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + DPNHANDLE send_handle; + + std::function p1_cb = + [&testing, &p1_seq, &p1_player_id, &send_handle] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++p1_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_SEND_COMPLETE); + + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, send_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->hResultCode, DPN_OK); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(p1->SendTo( + DPNID_ALL_PLAYERS_GROUP, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + DPNSEND_NOLOOPBACK + ), DPNSUCCESS_PENDING); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 1); + EXPECT_EQ(p1_seq, 1); + + testing = false; +} + +TEST(DirectPlay8Peer, AsyncSendToHostToPeer) { std::atomic testing(false); @@ -1916,7 +2412,89 @@ TEST(DirectPlay8Peer, SendToHostToPeer) testing = false; } -TEST(DirectPlay8Peer, SendToSync) +TEST(DirectPlay8Peer, AsyncSendToHostToNone) +{ + std::atomic testing(false); + + std::atomic host_seq(0); + DPNID host_player_id = -1; + DPNHANDLE send_handle; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id, &send_handle] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER && host_player_id == -1) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + host_player_id = cp->dpnidPlayer; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_SEND_COMPLETE); + + if(dwMessageType == DPN_MSGID_SEND_COMPLETE) + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + EXPECT_EQ(sc->dwSize, sizeof(*sc)); + EXPECT_EQ(sc->hAsyncOp, send_handle); + EXPECT_EQ(sc->pvUserContext, (void*)(0xABCD)); + EXPECT_EQ(sc->hResultCode, DPN_OK); + EXPECT_EQ(sc->dwSendCompleteFlags, 0); + EXPECT_EQ(sc->pBuffers, (DPN_BUFFER_DESC*)(NULL)); + EXPECT_EQ(sc->dwNumBuffers, 0); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(host->SendTo( + DPNID_ALL_PLAYERS_GROUP, + bd, + 1, + 0, + (void*)(0xABCD), + &send_handle, + DPNSEND_NOLOOPBACK + ), DPNSUCCESS_PENDING); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 1); + + testing = false; +} + +TEST(DirectPlay8Peer, SyncSendToPeerToHost) { std::atomic testing(false); @@ -2046,3 +2624,493 @@ TEST(DirectPlay8Peer, SendToSync) testing = false; } + +TEST(DirectPlay8Peer, SyncSendToPeerToSelf) +{ + std::atomic testing(false); + + std::atomic host_seq(0), p1_seq(0); + DPNID host_player_id = -1, p1_player_id = -1; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER && host_player_id == -1) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + host_player_id = cp->dpnidPlayer; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + std::function p1_cb = + [&testing, &p1_seq, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++p1_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(NULL)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(p1->SendTo( + p1_player_id, + bd, + 1, + 0, + NULL, + NULL, + DPNSEND_SYNC + ), DPN_OK); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 0); + EXPECT_EQ(p1_seq, 1); + + testing = false; +} + +TEST(DirectPlay8Peer, SyncSendToPeerToAll) +{ + std::atomic testing(false); + + std::atomic host_seq(0), p1_seq(0); + DPNID host_player_id = -1, p1_player_id = -1; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER && host_player_id == -1) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + host_player_id = cp->dpnidPlayer; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(NULL)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + std::function p1_cb = + [&testing, &p1_seq, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++p1_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(NULL)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(p1->SendTo( + DPNID_ALL_PLAYERS_GROUP, + bd, + 1, + 0, + NULL, + NULL, + DPNSEND_SYNC + ), DPN_OK); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 1); + EXPECT_EQ(p1_seq, 1); + + testing = false; +} + +TEST(DirectPlay8Peer, SyncSendToPeerToAllButSelf) +{ + std::atomic testing(false); + + std::atomic host_seq(0), p1_seq(0); + DPNID host_player_id = -1, p1_player_id = -1; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER && host_player_id == -1) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + host_player_id = cp->dpnidPlayer; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + case 1: + { + EXPECT_EQ(dwMessageType, DPN_MSGID_RECEIVE); + + if(dwMessageType == DPN_MSGID_RECEIVE) + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + EXPECT_EQ(r->dwSize, sizeof(*r)); + EXPECT_EQ(r->dpnidSender, p1_player_id); + EXPECT_EQ(r->pvPlayerContext, (void*)(NULL)); + EXPECT_EQ(r->dwReceiveFlags, 0); + + EXPECT_EQ( + std::string((const char*)(r->pReceiveData), r->dwReceiveDataSize), + std::string("Hello, world")); + } + + break; + } + + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + std::function p1_cb = + [&testing, &p1_seq, &p1_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CONNECT_COMPLETE) + { + DPNMSG_CONNECT_COMPLETE *cc = (DPNMSG_CONNECT_COMPLETE*)(pMessage); + p1_player_id = cc->dpnidLocal; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++p1_seq; + + switch(seq) + { + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }; + + IDP8PeerInstance p1; + + ASSERT_EQ(p1->Initialize(&p1_cb, &callback_shim, 0), S_OK); + + DPN_APPLICATION_DESC connect_to_app; + memset(&connect_to_app, 0, sizeof(connect_to_app)); + + connect_to_app.dwSize = sizeof(connect_to_app); + connect_to_app.guidApplication = APP_GUID_1; + + IDP8AddressInstance connect_to_addr(L"127.0.0.1", PORT); + + ASSERT_EQ(p1->Connect( + &connect_to_app, /* pdnAppDesc */ + connect_to_addr, /* pHostAddr */ + NULL, /* pDeviceInfo */ + NULL, /* pdnSecurity */ + NULL, /* pdnCredentials */ + NULL, /* pvUserConnectData */ + 0, /* dwUserConnectDataSize */ + NULL, /* pvPlayerContext */ + NULL, /* pvAsyncContext */ + NULL, /* phAsyncHandle */ + DPNCONNECT_SYNC /* dwFlags */ + ), S_OK); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(p1->SendTo( + DPNID_ALL_PLAYERS_GROUP, + bd, + 1, + 0, + NULL, + NULL, + DPNSEND_SYNC | DPNSEND_NOLOOPBACK + ), DPN_OK); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 1); + EXPECT_EQ(p1_seq, 0); + + testing = false; +} + +TEST(DirectPlay8Peer, SyncSendToHostToNone) +{ + std::atomic testing(false); + + std::atomic host_seq(0); + DPNID host_player_id = -1; + + SessionHost host(APP_GUID_1, L"Session 1", PORT, + [&testing, &host_seq, &host_player_id] + (DWORD dwMessageType, PVOID pMessage) + { + if(dwMessageType == DPN_MSGID_CREATE_PLAYER && host_player_id == -1) + { + DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + host_player_id = cp->dpnidPlayer; + } + + if(!testing) + { + return DPN_OK; + } + + int seq = ++host_seq; + + switch(seq) + { + default: + ADD_FAILURE() << "Unexpected message of type " << dwMessageType <<", sequence " << seq; + break; + } + + return DPN_OK; + }); + + /* Give everything a moment to settle. */ + Sleep(250); + + testing = true; + + DPN_BUFFER_DESC bd[] = { + { 12, (BYTE*)("Hello, world") }, + }; + + ASSERT_EQ(host->SendTo( + DPNID_ALL_PLAYERS_GROUP, + bd, + 1, + 0, + NULL, + NULL, + DPNSEND_SYNC | DPNSEND_NOLOOPBACK + ), DPN_OK); + + /* Let the message get through any any resultant messages happen. */ + Sleep(250); + + EXPECT_EQ(host_seq, 0); + + testing = false; +}