1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Basic message sending.

This commit is contained in:
Daniel Collins 2018-09-21 01:03:49 +01:00
parent e8fdf2ecb1
commit 45b4fd19bd
4 changed files with 176 additions and 5 deletions

View File

@ -330,7 +330,111 @@ HRESULT DirectPlay8Peer::Connect(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, I
HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST prgBufferDesc, CONST DWORD cBufferDesc, CONST DWORD dwTimeOut, void* CONST pvAsyncContext, DPNHANDLE* CONST phAsyncHandle, CONST DWORD dwFlags)
{
UNIMPLEMENTED("DirectPlay8Peer::SendTo");
std::unique_lock<std::mutex> l(lock);
switch(state)
{
case STATE_NEW: return DPNERR_UNINITIALIZED;
case STATE_INITIALISED: return DPNERR_NOTREADY;
case STATE_HOSTING: break;
case STATE_CONNECTING: return DPNERR_NOTREADY;
case STATE_CONNECT_FAILED: return DPNERR_NOTREADY;
case STATE_CONNECTED: break;
}
std::vector<unsigned char> payload;
for(DWORD i = 0; i < cBufferDesc; ++i)
{
payload.reserve(payload.size() + prgBufferDesc[i].dwBufferSize);
payload.insert(payload.end(),
(const unsigned char*)(prgBufferDesc[i].pBufferData),
(const unsigned char*)(prgBufferDesc[i].pBufferData) + prgBufferDesc[i].dwBufferSize);
}
PacketSerialiser message(DPLITE_MSGID_MESSAGE);
message.append_dword(local_player_id);
message.append_data(payload.data(), payload.size());
message.append_dword(dwFlags & (DPNSEND_GUARANTEED | DPNSEND_COALESCE | DPNSEND_COMPLETEONPROCESS));
SendQueue::SendPriority priority = SendQueue::SEND_PRI_MEDIUM;
if(dwFlags & DPNSEND_PRIORITY_HIGH)
{
priority = SendQueue::SEND_PRI_HIGH;
}
else if(dwFlags & DPNSEND_PRIORITY_LOW)
{
priority = SendQueue::SEND_PRI_LOW;
}
Peer *target_peer = get_peer_by_player_id(dpnid);
if(target_peer == NULL)
{
return DPNERR_INVALIDPLAYER;
}
if(dwFlags & DPNSEND_SYNC)
{
bool done = false;
std::mutex d_mutex;
std::condition_variable d_cv;
HRESULT result;
target_peer->sq.send(priority, message, NULL,
[&done, &d_mutex, &d_cv, &result]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{
result = s_result;
std::unique_lock<std::mutex> dl(d_mutex);
done = true;
dl.unlock();
d_cv.notify_one();
});
l.unlock();
std::unique_lock<std::mutex> dl(d_mutex);
d_cv.wait(dl, [&done]() { return done; });
return result;
}
else{
*phAsyncHandle = 0;
target_peer->sq.send(priority, message, NULL,
[this, dwFlags, pvAsyncContext, prgBufferDesc, cBufferDesc]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{
DPNMSG_SEND_COMPLETE sc;
memset(&sc, 0, sizeof(sc));
sc.dwSize = sizeof(sc);
// sc.hAsyncOp
sc.pvUserContext = pvAsyncContext;
sc.hResultCode = s_result;
// sc.dwSendTime
// sc.dwFirstFrameRTT
// sc.dwFirstRetryCount
sc.dwSendCompleteFlags =
(dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0)
| (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0);
if(dwFlags & DPNSEND_NOCOPY)
{
sc.pBuffers = (DPN_BUFFER_DESC*)(prgBufferDesc);
sc.dwNumBuffers = cBufferDesc;
}
l.unlock();
message_handler(message_handler_ctx, DPN_MSGID_SEND_COMPLETE, &sc);
l.lock();
});
return DPNSUCCESS_PENDING;
}
}
HRESULT DirectPlay8Peer::GetSendQueueInfo(CONST DPNID dpnid, DWORD* CONST pdwNumMsgs, DWORD* CONST pdwNumBytes, CONST DWORD dwFlags)
@ -796,7 +900,10 @@ HRESULT DirectPlay8Peer::DestroyPeer(CONST DPNID dpnidClient, CONST void* CONST
HRESULT DirectPlay8Peer::ReturnBuffer(CONST DPNHANDLE hBufferHandle, CONST DWORD dwFlags)
{
UNIMPLEMENTED("DirectPlay8Peer::ReturnBuffer");
unsigned char *buffer = (unsigned char*)(hBufferHandle);
delete[] buffer;
return S_OK;
}
HRESULT DirectPlay8Peer::GetPlayerContext(CONST DPNID dpnid,PVOID* CONST ppvPlayerContext, CONST DWORD dwFlags)
@ -1154,11 +1261,12 @@ void DirectPlay8Peer::io_peer_send(std::unique_lock<std::mutex> &l, unsigned int
void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int peer_id)
{
Peer *peer;
std::unique_lock<std::mutex> rl;
bool rb_claimed = false;
while((peer = get_peer_by_peer_id(peer_id)) != NULL)
{
if(peer->recv_busy)
if(!rb_claimed && peer->recv_busy)
{
/* Another thread is already processing data from this socket.
*
@ -1170,6 +1278,7 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
}
peer->recv_busy = true;
rb_claimed = true;
/* TODO: Mask read events to avoid workers spinning. */
@ -1254,6 +1363,12 @@ void DirectPlay8Peer::io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int
break;
}
case DPLITE_MSGID_MESSAGE:
{
handle_message(l, *pd);
break;
}
default:
/* TODO: Log "unrecognised packet type" */
break;
@ -1844,6 +1959,51 @@ void DirectPlay8Peer::handle_host_connect_fail(std::unique_lock<std::mutex> &l,
connect_fail(l, hResultCode, pvApplicationReplyData, dwApplicationReplyDataSize);
}
void DirectPlay8Peer::handle_message(std::unique_lock<std::mutex> &l, const PacketDeserialiser &pd)
{
try {
DWORD from_player_id = pd.get_dword(0);
std::pair<const void*, size_t> payload = pd.get_data(1);
DWORD flags = pd.get_dword(2);
Peer *peer = get_peer_by_player_id(from_player_id);
if(peer == NULL)
{
return;
}
unsigned char *payload_copy = new unsigned char[payload.second];
memcpy(payload_copy, payload.first, payload.second);
DPNMSG_RECEIVE r;
memset(&r, 0, sizeof(r));
static_assert(sizeof(DPNHANDLE) >= sizeof(unsigned char*),
"DPNHANDLE must be large enough to take a pointer");
r.dwSize = sizeof(r);
r.dpnidSender = from_player_id;
r.pvPlayerContext = peer->player_ctx;
r.pReceiveData = payload_copy;
r.dwReceiveDataSize = payload.second;
r.hBufferHandle = (DPNHANDLE)(payload_copy);
// r.dwReceiveFlags
l.unlock();
HRESULT r_result = message_handler(message_handler_ctx, DPN_MSGID_RECEIVE, &r);
l.lock();
if(r_result != DPNSUCCESS_PENDING)
{
delete[] payload_copy;
}
}
catch(const PacketDeserialiser::Error &e)
{
/* TODO: LOG ME */
}
}
/* Check if we have finished connecting and should enter STATE_CONNECTED.
*
* This is called after processing either of:

View File

@ -172,6 +172,8 @@ class DirectPlay8Peer: public IDirectPlay8Peer
void handle_host_connect_request(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);
void handle_host_connect_ok(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);
void handle_host_connect_fail(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);
void handle_message(std::unique_lock<std::mutex> &l, const PacketDeserialiser &pd);
void connect_check(std::unique_lock<std::mutex> &l);
void connect_fail(std::unique_lock<std::mutex> &l, HRESULT hResultCode, const void *pvApplicationReplyData, DWORD dwApplicationReplyDataSize);

View File

@ -62,4 +62,13 @@
* DATA | NULL - Response data
*/
#define DPLITE_MSGID_MESSAGE 6
/* Message sent via SendTo() by application.
*
* DWORD - Player ID of sender
* DATA - Message payload
* DWORD - Flags (DPNSEND_GUARANTEED, DPNSEND_COALESCE, DPNSEND_COMPLETEONPROCESS)
*/
#endif /* !DPLITE_MESSAGES_HPP */

View File

@ -7,7 +7,7 @@
#include "../src/DirectPlay8Address.hpp"
#include "../src/DirectPlay8Peer.hpp"
#define INSTANTIATE_FROM_COM
// #define INSTANTIATE_FROM_COM
#define PORT 42895