From 45b4fd19bdad1a4c171704d736e14a66ca5f6a1d Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 21 Sep 2018 01:03:49 +0100 Subject: [PATCH] Basic message sending. --- src/DirectPlay8Peer.cpp | 168 +++++++++++++++++++++++++++++++++++++- src/DirectPlay8Peer.hpp | 2 + src/Messages.hpp | 9 ++ tests/DirectPlay8Peer.cpp | 2 +- 4 files changed, 176 insertions(+), 5 deletions(-) diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index 8ff47d4..2cbd2f1 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -330,7 +330,111 @@ HRESULT DirectPlay8Peer::Connect(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, I HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST prgBufferDesc, CONST DWORD cBufferDesc, CONST DWORD dwTimeOut, void* CONST pvAsyncContext, DPNHANDLE* CONST phAsyncHandle, CONST DWORD dwFlags) { - UNIMPLEMENTED("DirectPlay8Peer::SendTo"); + std::unique_lock l(lock); + + switch(state) + { + case STATE_NEW: return DPNERR_UNINITIALIZED; + case STATE_INITIALISED: return DPNERR_NOTREADY; + case STATE_HOSTING: break; + case STATE_CONNECTING: return DPNERR_NOTREADY; + case STATE_CONNECT_FAILED: return DPNERR_NOTREADY; + case STATE_CONNECTED: break; + } + + std::vector payload; + + for(DWORD i = 0; i < cBufferDesc; ++i) + { + payload.reserve(payload.size() + prgBufferDesc[i].dwBufferSize); + payload.insert(payload.end(), + (const unsigned char*)(prgBufferDesc[i].pBufferData), + (const unsigned char*)(prgBufferDesc[i].pBufferData) + prgBufferDesc[i].dwBufferSize); + } + + PacketSerialiser message(DPLITE_MSGID_MESSAGE); + + message.append_dword(local_player_id); + message.append_data(payload.data(), payload.size()); + message.append_dword(dwFlags & (DPNSEND_GUARANTEED | DPNSEND_COALESCE | DPNSEND_COMPLETEONPROCESS)); + + SendQueue::SendPriority priority = SendQueue::SEND_PRI_MEDIUM; + if(dwFlags & DPNSEND_PRIORITY_HIGH) + { + priority = SendQueue::SEND_PRI_HIGH; + } + else if(dwFlags & DPNSEND_PRIORITY_LOW) + { + priority = SendQueue::SEND_PRI_LOW; + } + + Peer *target_peer = get_peer_by_player_id(dpnid); + if(target_peer == NULL) + { + return DPNERR_INVALIDPLAYER; + } + + if(dwFlags & DPNSEND_SYNC) + { + bool done = false; + std::mutex d_mutex; + std::condition_variable d_cv; + HRESULT result; + + target_peer->sq.send(priority, message, NULL, + [&done, &d_mutex, &d_cv, &result] + (std::unique_lock &l, HRESULT s_result) + { + result = s_result; + + std::unique_lock dl(d_mutex); + done = true; + dl.unlock(); + + d_cv.notify_one(); + }); + + l.unlock(); + + std::unique_lock dl(d_mutex); + d_cv.wait(dl, [&done]() { return done; }); + + return result; + } + else{ + *phAsyncHandle = 0; + + target_peer->sq.send(priority, message, NULL, + [this, dwFlags, pvAsyncContext, prgBufferDesc, cBufferDesc] + (std::unique_lock &l, HRESULT s_result) + { + DPNMSG_SEND_COMPLETE sc; + memset(&sc, 0, sizeof(sc)); + + sc.dwSize = sizeof(sc); + // sc.hAsyncOp + sc.pvUserContext = pvAsyncContext; + sc.hResultCode = s_result; + // sc.dwSendTime + // sc.dwFirstFrameRTT + // sc.dwFirstRetryCount + sc.dwSendCompleteFlags = + (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0) + | (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0); + + if(dwFlags & DPNSEND_NOCOPY) + { + sc.pBuffers = (DPN_BUFFER_DESC*)(prgBufferDesc); + sc.dwNumBuffers = cBufferDesc; + } + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_SEND_COMPLETE, &sc); + l.lock(); + }); + + return DPNSUCCESS_PENDING; + } } HRESULT DirectPlay8Peer::GetSendQueueInfo(CONST DPNID dpnid, DWORD* CONST pdwNumMsgs, DWORD* CONST pdwNumBytes, CONST DWORD dwFlags) @@ -796,7 +900,10 @@ HRESULT DirectPlay8Peer::DestroyPeer(CONST DPNID dpnidClient, CONST void* CONST HRESULT DirectPlay8Peer::ReturnBuffer(CONST DPNHANDLE hBufferHandle, CONST DWORD dwFlags) { - UNIMPLEMENTED("DirectPlay8Peer::ReturnBuffer"); + unsigned char *buffer = (unsigned char*)(hBufferHandle); + delete[] buffer; + + return S_OK; } HRESULT DirectPlay8Peer::GetPlayerContext(CONST DPNID dpnid,PVOID* CONST ppvPlayerContext, CONST DWORD dwFlags) @@ -1154,11 +1261,12 @@ void DirectPlay8Peer::io_peer_send(std::unique_lock &l, unsigned int void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int peer_id) { Peer *peer; - std::unique_lock rl; + + bool rb_claimed = false; while((peer = get_peer_by_peer_id(peer_id)) != NULL) { - if(peer->recv_busy) + if(!rb_claimed && peer->recv_busy) { /* Another thread is already processing data from this socket. * @@ -1170,6 +1278,7 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int } peer->recv_busy = true; + rb_claimed = true; /* TODO: Mask read events to avoid workers spinning. */ @@ -1254,6 +1363,12 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int break; } + case DPLITE_MSGID_MESSAGE: + { + handle_message(l, *pd); + break; + } + default: /* TODO: Log "unrecognised packet type" */ break; @@ -1844,6 +1959,51 @@ void DirectPlay8Peer::handle_host_connect_fail(std::unique_lock &l, connect_fail(l, hResultCode, pvApplicationReplyData, dwApplicationReplyDataSize); } +void DirectPlay8Peer::handle_message(std::unique_lock &l, const PacketDeserialiser &pd) +{ + try { + DWORD from_player_id = pd.get_dword(0); + std::pair payload = pd.get_data(1); + DWORD flags = pd.get_dword(2); + + Peer *peer = get_peer_by_player_id(from_player_id); + if(peer == NULL) + { + return; + } + + unsigned char *payload_copy = new unsigned char[payload.second]; + memcpy(payload_copy, payload.first, payload.second); + + DPNMSG_RECEIVE r; + memset(&r, 0, sizeof(r)); + + static_assert(sizeof(DPNHANDLE) >= sizeof(unsigned char*), + "DPNHANDLE must be large enough to take a pointer"); + + r.dwSize = sizeof(r); + r.dpnidSender = from_player_id; + r.pvPlayerContext = peer->player_ctx; + r.pReceiveData = payload_copy; + r.dwReceiveDataSize = payload.second; + r.hBufferHandle = (DPNHANDLE)(payload_copy); + // r.dwReceiveFlags + + l.unlock(); + HRESULT r_result = message_handler(message_handler_ctx, DPN_MSGID_RECEIVE, &r); + l.lock(); + + if(r_result != DPNSUCCESS_PENDING) + { + delete[] payload_copy; + } + } + catch(const PacketDeserialiser::Error &e) + { + /* TODO: LOG ME */ + } +} + /* Check if we have finished connecting and should enter STATE_CONNECTED. * * This is called after processing either of: diff --git a/src/DirectPlay8Peer.hpp b/src/DirectPlay8Peer.hpp index 8abb87b..d3b1d21 100644 --- a/src/DirectPlay8Peer.hpp +++ b/src/DirectPlay8Peer.hpp @@ -172,6 +172,8 @@ class DirectPlay8Peer: public IDirectPlay8Peer void handle_host_connect_request(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); void handle_host_connect_ok(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); void handle_host_connect_fail(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); + void handle_message(std::unique_lock &l, const PacketDeserialiser &pd); + void connect_check(std::unique_lock &l); void connect_fail(std::unique_lock &l, HRESULT hResultCode, const void *pvApplicationReplyData, DWORD dwApplicationReplyDataSize); diff --git a/src/Messages.hpp b/src/Messages.hpp index 5f24801..b382924 100644 --- a/src/Messages.hpp +++ b/src/Messages.hpp @@ -62,4 +62,13 @@ * DATA | NULL - Response data */ +#define DPLITE_MSGID_MESSAGE 6 + +/* Message sent via SendTo() by application. + * + * DWORD - Player ID of sender + * DATA - Message payload + * DWORD - Flags (DPNSEND_GUARANTEED, DPNSEND_COALESCE, DPNSEND_COMPLETEONPROCESS) +*/ + #endif /* !DPLITE_MESSAGES_HPP */ diff --git a/tests/DirectPlay8Peer.cpp b/tests/DirectPlay8Peer.cpp index ea513b5..0687cb6 100644 --- a/tests/DirectPlay8Peer.cpp +++ b/tests/DirectPlay8Peer.cpp @@ -7,7 +7,7 @@ #include "../src/DirectPlay8Address.hpp" #include "../src/DirectPlay8Peer.hpp" -#define INSTANTIATE_FROM_COM +// #define INSTANTIATE_FROM_COM #define PORT 42895