1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

SendTo() support for DPNID_ALL_PLAYERS_GROUP, DPNSEND_NOLOOPBACK.

This commit is contained in:
Daniel Collins 2018-09-24 21:43:25 +01:00
parent 2635fd04cd
commit f19678217f
2 changed files with 1250 additions and 29 deletions

View File

@ -342,6 +342,12 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
case STATE_CONNECTED: break;
}
if(dwFlags & DPNSEND_COMPLETEONPROCESS)
{
/* Not implemented yet. */
return DPNERR_GENERIC;
}
std::vector<unsigned char> payload;
for(DWORD i = 0; i < cBufferDesc; ++i)
@ -368,45 +374,120 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
priority = SendQueue::SEND_PRI_LOW;
}
Peer *target_peer = get_peer_by_player_id(dpnid);
if(target_peer == NULL)
std::list<Peer*> send_to_peers;
bool send_to_self = false;
if(dpnid == DPNID_ALL_PLAYERS_GROUP)
{
return DPNERR_INVALIDPLAYER;
if(!(dwFlags & DPNSEND_NOLOOPBACK))
{
send_to_self = true;
}
for(auto pi = peers.begin(); pi != peers.end(); ++pi)
{
if(pi->second->state == Peer::PS_CONNECTED)
{
send_to_peers.push_back(pi->second);
}
}
}
else{
if(dpnid == local_player_id)
{
send_to_self = true;
}
else{
Peer *target_peer = get_peer_by_player_id(dpnid);
if(target_peer == NULL)
{
return DPNERR_INVALIDPLAYER;
}
send_to_peers.push_back(target_peer);
}
}
if(dwFlags & DPNSEND_SYNC)
{
bool done = false;
unsigned int pending = send_to_peers.size();
std::mutex d_mutex;
std::condition_variable d_cv;
HRESULT result;
HRESULT result = S_OK;
target_peer->sq.send(priority, message, NULL,
[&done, &d_mutex, &d_cv, &result]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
for(auto pi = send_to_peers.begin(); pi != send_to_peers.end(); ++pi)
{
(*pi)->sq.send(priority, message, NULL,
[&pending, &d_mutex, &d_cv, &result]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{
if(s_result != S_OK && result == S_OK)
{
/* Error code from the first failure wins. */
result = s_result;
}
std::unique_lock<std::mutex> dl(d_mutex);
if(--pending == 0)
{
dl.unlock();
d_cv.notify_one();
}
});
}
if(send_to_self)
{
/* TODO: Should the processing of this block a DPNSEND_SYNC send? */
unsigned char *payload_copy = new unsigned char[payload.size()];
memcpy(payload_copy, payload.data(), payload.size());
DPNMSG_RECEIVE r;
memset(&r, 0, sizeof(r));
r.dwSize = sizeof(r);
r.dpnidSender = local_player_id;
r.pvPlayerContext = local_player_ctx;
r.pReceiveData = payload_copy;
r.dwReceiveDataSize = payload.size();
r.hBufferHandle = (DPNHANDLE)(payload_copy);
r.dwReceiveFlags = (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0)
| (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0);
l.unlock();
HRESULT r_result = message_handler(message_handler_ctx, DPN_MSGID_RECEIVE, &r);
if(r_result != DPNSUCCESS_PENDING)
{
result = s_result;
std::unique_lock<std::mutex> dl(d_mutex);
done = true;
dl.unlock();
d_cv.notify_one();
});
l.unlock();
delete[] payload_copy;
}
}
else{
l.unlock();
}
std::unique_lock<std::mutex> dl(d_mutex);
d_cv.wait(dl, [&done]() { return done; });
d_cv.wait(dl, [&pending]() { return (pending == 0); });
return result;
}
else{
*phAsyncHandle = 0;
unsigned int *pending = new unsigned int(send_to_peers.size() + send_to_self);
HRESULT *result = new HRESULT(S_OK);
target_peer->sq.send(priority, message, NULL,
[this, dwFlags, pvAsyncContext, prgBufferDesc, cBufferDesc]
auto handle_send_complete =
[this, pending, result, pvAsyncContext, dwFlags, prgBufferDesc, cBufferDesc]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{
if(s_result != S_OK && *result == S_OK)
{
/* Error code from the first failure wins. */
*result = s_result;
}
if(--(*pending) == 0)
{
DPNMSG_SEND_COMPLETE sc;
memset(&sc, 0, sizeof(sc));
@ -414,13 +495,12 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
sc.dwSize = sizeof(sc);
// sc.hAsyncOp
sc.pvUserContext = pvAsyncContext;
sc.hResultCode = s_result;
sc.hResultCode = *result;
// sc.dwSendTime
// sc.dwFirstFrameRTT
// sc.dwFirstRetryCount
sc.dwSendCompleteFlags =
(dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0)
| (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0);
sc.dwSendCompleteFlags = (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0)
| (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0);
if(dwFlags & DPNSEND_NOCOPY)
{
@ -428,10 +508,83 @@ HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST
sc.dwNumBuffers = cBufferDesc;
}
delete result;
delete pending;
l.unlock();
message_handler(message_handler_ctx, DPN_MSGID_SEND_COMPLETE, &sc);
l.lock();
}
};
*phAsyncHandle = 0;
if(*pending == 0)
{
/* Horrible horrible hack to raise a DPNMSG_SEND_COMPLETE if there are no
* targets for the message.
*/
++(*pending);
std::thread t([this, handle_send_complete]()
{
std::unique_lock<std::mutex> l(lock);
handle_send_complete(l, S_OK);
});
t.detach();
}
for(auto pi = send_to_peers.begin(); pi != send_to_peers.end(); ++pi)
{
(*pi)->sq.send(priority, message, NULL,
[handle_send_complete]
(std::unique_lock<std::mutex> &l, HRESULT s_result)
{
handle_send_complete(l, s_result);
});
}
if(send_to_self)
{
size_t payload_size = payload.size();
unsigned char *payload_copy = new unsigned char[payload_size];
memcpy(payload_copy, payload.data(), payload_size);
/* TODO: Do this in a properly managed worker thread. */
std::thread t([this, payload_size, payload_copy, handle_send_complete, dwFlags]()
{
std::unique_lock<std::mutex> l(lock);
DPNMSG_RECEIVE r;
memset(&r, 0, sizeof(r));
r.dwSize = sizeof(r);
r.dpnidSender = local_player_id;
r.pvPlayerContext = local_player_ctx;
r.pReceiveData = payload_copy;
r.dwReceiveDataSize = payload_size;
r.hBufferHandle = (DPNHANDLE)(payload_copy);
r.dwReceiveFlags = (dwFlags & DPNSEND_GUARANTEED ? DPNRECEIVE_GUARANTEED : 0)
| (dwFlags & DPNSEND_COALESCE ? DPNRECEIVE_COALESCED : 0);
l.unlock();
HRESULT r_result = message_handler(message_handler_ctx, DPN_MSGID_RECEIVE, &r);
l.lock();
if(r_result != DPNSUCCESS_PENDING)
{
delete[] payload_copy;
}
handle_send_complete(l, S_OK);
});
t.detach();
}
return DPNSUCCESS_PENDING;
}

File diff suppressed because it is too large Load Diff