From 25341f2488c65dc6c01821bb353b9ee7775e36f1 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 20 Sep 2018 00:58:41 +0100 Subject: [PATCH] Milestone: Connect a single peer! --- src/DirectPlay8Peer.cpp | 1266 +++++++++++++++++++++++++++++++------ src/DirectPlay8Peer.hpp | 87 ++- src/EventObject.hpp | 3 + src/Messages.hpp | 36 ++ src/SendQueue.cpp | 64 +- src/SendQueue.hpp | 40 +- src/network.cpp | 43 ++ src/network.hpp | 1 + src/packet.cpp | 2 +- src/packet.hpp | 2 +- tests/DirectPlay8Peer.cpp | 10 +- 11 files changed, 1310 insertions(+), 244 deletions(-) diff --git a/src/DirectPlay8Peer.cpp b/src/DirectPlay8Peer.cpp index a2c6cf9..8ff47d4 100644 --- a/src/DirectPlay8Peer.cpp +++ b/src/DirectPlay8Peer.cpp @@ -1,14 +1,17 @@ #include +#include #include #include #include #include +#include #include #include #include #include #include #include +#include #include "COMAPIException.hpp" #include "DirectPlay8Address.hpp" @@ -20,9 +23,20 @@ fprintf(stderr, "Unimplemented method: " fmt "\n", ## __VA_ARGS__); \ return E_NOTIMPL; +#define RENEW_PEER_OR_RETURN() \ + peer = get_peer_by_peer_id(peer_id); \ + if(peer == NULL) \ + { \ + return; \ + } + #define THREADS_PER_POOL 4 #define MAX_HANDLES_PER_POOL 16 +/* Ephemeral port range as defined by IANA. */ +static const int AUTO_PORT_MIN = 49152; +static const int AUTO_PORT_MAX = 65535; + DirectPlay8Peer::DirectPlay8Peer(std::atomic *global_refcount): global_refcount(global_refcount), local_refcount(0), @@ -183,7 +197,135 @@ HRESULT DirectPlay8Peer::CancelAsyncOperation(CONST DPNHANDLE hAsyncHandle, CONS HRESULT DirectPlay8Peer::Connect(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDirectPlay8Address* CONST pHostAddr, IDirectPlay8Address* CONST pDeviceInfo, CONST DPN_SECURITY_DESC* CONST pdnSecurity, CONST DPN_SECURITY_CREDENTIALS* CONST pdnCredentials, CONST void* CONST pvUserConnectData, CONST DWORD dwUserConnectDataSize, void* CONST pvPlayerContext, void* CONST pvAsyncContext, DPNHANDLE* CONST phAsyncHandle, CONST DWORD dwFlags) { - UNIMPLEMENTED("DirectPlay8Peer::Connect"); + std::unique_lock l(lock); + + switch(state) + { + case STATE_NEW: return DPNERR_UNINITIALIZED; + case STATE_INITIALISED: break; + case STATE_HOSTING: return DPNERR_HOSTING; + case STATE_CONNECTING: return DPNERR_CONNECTING; + case STATE_CONNECTED: return DPNERR_ALREADYCONNECTED; + } + + application_guid = pdnAppDesc->guidApplication; + instance_guid = pdnAppDesc->guidInstance; + + connect_req_data.clear(); + + if(dwUserConnectDataSize > 0) + { + connect_req_data.reserve(dwUserConnectDataSize); + connect_req_data.insert(connect_req_data.end(), + (const unsigned char*)(pvUserConnectData), + (const unsigned char*)(pvUserConnectData) + dwUserConnectDataSize); + } + + local_player_ctx = pvPlayerContext; + + uint32_t l_ipaddr = htonl(INADDR_ANY); + uint16_t l_port = 0; + + /* TODO: Get bind address/port from pDeviceInfo, if specified. */ + + uint32_t r_ipaddr; + uint16_t r_port; + + { + wchar_t buf[128]; + DWORD bsize = sizeof(buf); + DWORD type; + + pHostAddr->GetComponentByName(DPNA_KEY_HOSTNAME, buf, &bsize, &type); + InetPtonW(AF_INET, buf, &r_ipaddr); + } + + { + DWORD buf; + DWORD bsize = sizeof(buf); + DWORD type; + + pHostAddr->GetComponentByName(DPNA_KEY_PORT, &buf, &bsize, &type); + r_port = buf; + } + + if(l_port == 0) + { + for(int p = AUTO_PORT_MIN; p <= AUTO_PORT_MAX; ++p) + { + /* TODO: Only continue if creation failed due to address conflict. */ + + udp_socket = create_udp_socket(l_ipaddr, p); + if(udp_socket == -1) + { + continue; + } + + listener_socket = create_listener_socket(l_ipaddr, p); + if(listener_socket == -1) + { + closesocket(udp_socket); + udp_socket = -1; + + continue; + } + + local_ip = l_ipaddr; + local_port = p; + + break; + } + + if(udp_socket == -1) + { + return DPNERR_GENERIC; + } + } + else{ + udp_socket = create_udp_socket(l_ipaddr, l_port); + if(udp_socket == -1) + { + return DPNERR_GENERIC; + } + + listener_socket = create_listener_socket(l_ipaddr, l_port); + if(listener_socket == -1) + { + closesocket(udp_socket); + udp_socket = -1; + + return DPNERR_GENERIC; + } + + local_ip = l_ipaddr; + local_port = l_port; + } + + connect_ctx = pvAsyncContext; + connect_handle = (dwFlags & DPNCONNECT_SYNC) ? 0 : handle_alloc.new_connect(); + + state = STATE_CONNECTING; + + if(!peer_connect(Peer::PS_CONNECTING_HOST, r_ipaddr, r_port)) + { + closesocket(listener_socket); + listener_socket = -1; + + closesocket(udp_socket); + udp_socket = -1; + + return DPNERR_GENERIC; + } + + if(dwFlags & DPNCONNECT_SYNC) + { + connect_cv.wait(l, [this]() { return (state != STATE_CONNECTING && state != STATE_CONNECT_FAILED); }); + return connect_result; + } + else{ + *phAsyncHandle = connect_handle; + return DPNSUCCESS_PENDING; + } } HRESULT DirectPlay8Peer::SendTo(CONST DPNID dpnid, CONST DPN_BUFFER_DESC* CONST prgBufferDesc, CONST DWORD cBufferDesc, CONST DWORD dwTimeOut, void* CONST pvAsyncContext, DPNHANDLE* CONST phAsyncHandle, CONST DWORD dwFlags) @@ -203,6 +345,7 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir case STATE_NEW: return DPNERR_UNINITIALIZED; case STATE_INITIALISED: break; case STATE_HOSTING: return DPNERR_ALREADYCONNECTED; + case STATE_CONNECTING: return DPNERR_CONNECTING; case STATE_CONNECTED: return DPNERR_ALREADYCONNECTED; } @@ -276,21 +419,17 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir if(port == 0) { - /* Ephemeral port range as defined by IANA. */ - const int AUTO_PORT_MIN = 49152; - const int AUTO_PORT_MAX = 65535; - for(int p = AUTO_PORT_MIN; p <= AUTO_PORT_MAX; ++p) { /* TODO: Only continue if creation failed due to address conflict. */ - udp_socket = create_udp_socket(ipaddr, port); + udp_socket = create_udp_socket(ipaddr, p); if(udp_socket == -1) { continue; } - listener_socket = create_listener_socket(ipaddr, port); + listener_socket = create_listener_socket(ipaddr, p); if(listener_socket == -1) { closesocket(udp_socket); @@ -299,6 +438,9 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir continue; } + local_ip = ipaddr; + local_port = p; + break; } @@ -322,6 +464,9 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir return DPNERR_GENERIC; } + + local_ip = ipaddr; + local_port = port; } if(WSAEventSelect(udp_socket, udp_socket_event, FD_READ | FD_WRITE) != 0 @@ -341,8 +486,27 @@ HRESULT DirectPlay8Peer::Host(CONST DPN_APPLICATION_DESC* CONST pdnAppDesc, IDir } } + next_player_id = 1; + host_player_id = next_player_id++; + + local_player_id = host_player_id; + local_player_ctx = pvPlayerContext; + state = STATE_HOSTING; + /* Send DPNMSG_CREATE_PLAYER for local player. */ + + DPNMSG_CREATE_PLAYER cp; + memset(&cp, 0, sizeof(cp)); + + cp.dwSize = sizeof(cp); + cp.dpnidPlayer = local_player_id; + cp.pvPlayerContext = local_player_ctx; + + message_handler(message_handler_ctx, DPN_MSGID_CREATE_PLAYER, &cp); + + local_player_ctx = cp.pvPlayerContext; + return S_OK; } @@ -366,7 +530,7 @@ HRESULT DirectPlay8Peer::GetApplicationDesc(DPN_APPLICATION_DESC* CONST pAppDesc pAppDescBuffer->guidInstance = instance_guid; pAppDescBuffer->guidApplication = application_guid; pAppDescBuffer->dwMaxPlayers = max_players; - pAppDescBuffer->dwCurrentPlayers = other_player_ids.size() + 1; + pAppDescBuffer->dwCurrentPlayers = player_to_peer_id.size() + 1; if(!password.empty()) { @@ -410,7 +574,7 @@ HRESULT DirectPlay8Peer::SetApplicationDesc(CONST DPN_APPLICATION_DESC* CONST pa { if(state == STATE_HOSTING) { - if(pad->dwMaxPlayers > 0 && pad->dwMaxPlayers <= other_player_ids.size()) + if(pad->dwMaxPlayers > 0 && pad->dwMaxPlayers <= player_to_peer_id.size()) { /* Can't set dwMaxPlayers below current player count. */ return DPNERR_INVALIDPARAM; @@ -637,7 +801,32 @@ HRESULT DirectPlay8Peer::ReturnBuffer(CONST DPNHANDLE hBufferHandle, CONST DWORD HRESULT DirectPlay8Peer::GetPlayerContext(CONST DPNID dpnid,PVOID* CONST ppvPlayerContext, CONST DWORD dwFlags) { - UNIMPLEMENTED("DirectPlay8Peer::GetPlayerContext"); + std::unique_lock l(lock); + + switch(state) + { + case STATE_NEW: return DPNERR_UNINITIALIZED; + case STATE_INITIALISED: return DPNERR_NOTREADY; + case STATE_HOSTING: break; + case STATE_CONNECTING: return DPNERR_NOTREADY; + case STATE_CONNECT_FAILED: return DPNERR_NOTREADY; + case STATE_CONNECTED: break; + } + + if(dpnid == local_player_id) + { + *ppvPlayerContext = local_player_ctx; + return S_OK; + } + + Peer *peer = get_peer_by_player_id(dpnid); + if(peer != NULL) + { + *ppvPlayerContext = peer->player_ctx; + return S_OK; + } + + return DPNERR_INVALIDPLAYER; } HRESULT DirectPlay8Peer::GetGroupContext(CONST DPNID dpnid,PVOID* CONST ppvGroupContext, CONST DWORD dwFlags) @@ -680,8 +869,34 @@ HRESULT DirectPlay8Peer::TerminateSession(void* CONST pvTerminateData, CONST DWO UNIMPLEMENTED("DirectPlay8Peer::TerminateSession"); } +DirectPlay8Peer::Peer *DirectPlay8Peer::get_peer_by_peer_id(unsigned int peer_id) +{ + auto pi = peers.find(peer_id); + if(pi != peers.end()) + { + return pi->second; + } + else{ + return NULL; + } +} + +DirectPlay8Peer::Peer *DirectPlay8Peer::get_peer_by_player_id(DPNID player_id) +{ + auto i = player_to_peer_id.find(player_id); + if(i != player_to_peer_id.end()) + { + return get_peer_by_peer_id(i->second); + } + else{ + return NULL; + } +} + void DirectPlay8Peer::handle_udp_socket_event() { + std::unique_lock l(lock); + struct sockaddr_in from_addr; int fa_len = sizeof(from_addr); @@ -706,11 +921,7 @@ void DirectPlay8Peer::handle_udp_socket_event() { case DPLITE_MSGID_HOST_ENUM_REQUEST: { - if(state == STATE_HOSTING) - { - handle_host_enum_request(*pd, &from_addr); - } - + handle_host_enum_request(l, *pd, &from_addr); break; } @@ -720,11 +931,13 @@ void DirectPlay8Peer::handle_udp_socket_event() } } - io_udp_send(udp_socket, udp_sq); + io_udp_send(l); } void DirectPlay8Peer::handle_other_socket_event() { + std::unique_lock l(lock); + if(discovery_socket != -1) { struct sockaddr_in from_addr; @@ -751,11 +964,7 @@ void DirectPlay8Peer::handle_other_socket_event() { case DPLITE_MSGID_HOST_ENUM_REQUEST: { - if(state == STATE_HOSTING) - { - handle_host_enum_request(*pd, &from_addr); - } - + handle_host_enum_request(l, *pd, &from_addr); break; } @@ -766,42 +975,324 @@ void DirectPlay8Peer::handle_other_socket_event() } } - io_listener_accept(listener_socket); + peer_accept(l); } -void DirectPlay8Peer::io_udp_send(int sock, SendQueue &sq) +void DirectPlay8Peer::io_peer_triggered(unsigned int peer_id) { - SendQueue::Buffer *sqb; + std::unique_lock l(lock); - while((sqb = sq.get_next()) != NULL) + Peer *peer = get_peer_by_peer_id(peer_id); + if(peer == NULL) { - std::pair data = sqb->get_data(); - std::pair addr = sqb->get_dest_addr(); + /* The peer has gone away since this event was raised. Discard. */ + return; + } + + if(peer->state == Peer::PS_CONNECTING_HOST || peer->state == Peer::PS_CONNECTING_PEER) + { + assert(state == STATE_CONNECTING); + io_peer_connected(l, peer_id); + } + else{ + io_peer_send(l, peer_id); + io_peer_recv(l, peer_id); + } +} + +void DirectPlay8Peer::io_udp_send(std::unique_lock &l) +{ + SendQueue::SendOp *sqop; + + while(udp_socket != -1 && (sqop = udp_sq.get_pending()) != NULL) + { + std::pair data = sqop->get_data(); + std::pair addr = sqop->get_dest_addr(); - int s = sendto(sock, (const char*)(data.first), data.second, 0, addr.first, addr.second); + int s = sendto(udp_socket, (const char*)(data.first), data.second, 0, addr.first, addr.second); if(s == -1) { DWORD err = WSAGetLastError(); - if(err != WSAEWOULDBLOCK) + if(err == WSAEWOULDBLOCK) { - /* TODO: More specific error codes */ - sq.complete(sqb, DPNERR_GENERIC); + /* Socket send buffer is full. */ + return; + } + else{ + /* TODO: LOG ME */ } - - break; } - sq.complete(sqb, S_OK); + udp_sq.pop_pending(sqop); + + /* Wake up another worker to continue dealing with this socket in case we wind up + * blocking for a long time in application code within the callback. + */ + SetEvent(udp_socket_event); + + /* TODO: More specific error codes */ + sqop->invoke_callback(l, (s < 0 ? DPNERR_GENERIC : S_OK)); + + delete sqop; } } -void DirectPlay8Peer::io_listener_accept(int sock) +void DirectPlay8Peer::io_peer_connected(std::unique_lock &l, unsigned int peer_id) { + Peer *peer = get_peer_by_peer_id(peer_id); + assert(peer != NULL); + + int error; + int esize = sizeof(error); + + if(getsockopt(peer->sock, SOL_SOCKET, SO_ERROR, (char*)(&error), &esize) != 0) + { + /* TODO: LOG ME */ + connect_fail(l, DPNERR_GENERIC, NULL, 0); + } + + if(error == 0) + { + /* TCP connection established. */ + + if(peer->state == Peer::PS_CONNECTING_HOST) + { + PacketSerialiser connect_host(DPLITE_MSGID_CONNECT_HOST); + + if(instance_guid != GUID_NULL) + { + connect_host.append_guid(instance_guid); + } + else{ + connect_host.append_null(); + } + + connect_host.append_guid(application_guid); + + if(!password.empty()) + { + connect_host.append_wstring(password); + } + else{ + connect_host.append_null(); + } + + if(!connect_req_data.empty()) + { + connect_host.append_data(connect_req_data.data(), connect_req_data.size()); + } + else{ + connect_host.append_null(); + } + + peer->sq.send(SendQueue::SEND_PRI_MEDIUM, + connect_host, + NULL, + [](std::unique_lock &l, HRESULT result){}); + + peer->state = Peer::PS_REQUESTING_HOST; + } + else if(peer->state == Peer::PS_CONNECTING_PEER) + { + /* TODO: Send DPLITE_MSGID_CONNECT_PEER message. */ + } + } + else{ + /* TCP connection failed. */ + + if(peer->state == Peer::PS_CONNECTING_HOST) + { + connect_fail(l, DPNERR_NOCONNECTION, NULL, 0); + } + else if(peer->state == Peer::PS_CONNECTING_PEER) + { + connect_fail(l, DPNERR_PLAYERNOTREACHABLE, NULL, 0); + } + } +} + +void DirectPlay8Peer::io_peer_send(std::unique_lock &l, unsigned int peer_id) +{ + Peer *peer; + SendQueue::SendOp *sqop; + + while((peer = get_peer_by_peer_id(peer_id)) != NULL && (sqop = peer->sq.get_pending()) != NULL) + { + std::pair d = sqop->get_pending_data(); + + int s = send(peer->sock, (const char*)(d.first), d.second, 0); + if(s < 0) + { + DWORD err = WSAGetLastError(); + + if(err == WSAEWOULDBLOCK) + { + break; + } + else{ + /* TODO */ + return; + } + } + + sqop->inc_sent_data(s); + + if(s == d.second) + { + peer->sq.pop_pending(sqop); + + /* TODO: Poke event, send/recv mutexes per TCP socket. */ + + sqop->invoke_callback(l, S_OK); + + delete sqop; + } + } +} + +void DirectPlay8Peer::io_peer_recv(std::unique_lock &l, unsigned int peer_id) +{ + Peer *peer; + std::unique_lock rl; + + while((peer = get_peer_by_peer_id(peer_id)) != NULL) + { + if(peer->recv_busy) + { + /* Another thread is already processing data from this socket. + * + * Only one thread at a time is allowed to handle reads, even when the + * other thread is in the application callback, so that the order of + * messages is preserved. + */ + return; + } + + peer->recv_busy = true; + + /* TODO: Mask read events to avoid workers spinning. */ + + int r = recv(peer->sock, (char*)(peer->recv_buf) + peer->recv_buf_cur, sizeof(peer->recv_buf) - peer->recv_buf_cur, 0); + if(r == 0) + { + /* Connection closed */ + /* TODO */ + break; + } + else if(r == -1) + { + DWORD err = WSAGetLastError(); + + if(err == WSAEWOULDBLOCK) + { + /* Nothing to read */ + break; + } + else{ + /* Read error. */ + /* TODO */ + break; + } + } + + if(peer->state == Peer::PS_CLOSING) + { + /* When a peer is in PS_CLOSING, we keep the socket open until the send queue has + * been flushed and discard anything we read until we get EOF. + */ + + continue; + } + + peer->recv_buf_cur += r; + + while(peer->recv_buf_cur >= sizeof(TLVChunk)) + { + TLVChunk *header = (TLVChunk*)(peer->recv_buf); + size_t full_packet_size = sizeof(TLVChunk) + header->value_length; + + if(full_packet_size > MAX_PACKET_SIZE) + { + /* Malformed packet received - TCP stream invalid! */ + /* TODO */ + abort(); + } + + if(peer->recv_buf_cur >= full_packet_size) + { + /* Process message */ + std::unique_ptr pd; + + try { + pd.reset(new PacketDeserialiser(peer->recv_buf, full_packet_size)); + } + catch(const PacketDeserialiser::Error &e) + { + /* Malformed packet received - TCP stream invalid! */ + /* TODO */ + abort(); + } + + switch(pd->packet_type()) + { + case DPLITE_MSGID_CONNECT_HOST: + { + handle_host_connect_request(l, peer_id, *pd); + break; + } + + case DPLITE_MSGID_CONNECT_HOST_OK: + { + handle_host_connect_ok(l, peer_id, *pd); + break; + } + + case DPLITE_MSGID_CONNECT_HOST_FAIL: + { + handle_host_connect_fail(l, peer_id, *pd); + break; + } + + default: + /* TODO: Log "unrecognised packet type" */ + break; + } + + RENEW_PEER_OR_RETURN(); + + /* Message at the front of the buffer has been dealt with, shift any + * remaining data beyond it to the front and truncate it. + */ + + memmove(peer->recv_buf, peer->recv_buf + full_packet_size, + peer->recv_buf_cur - full_packet_size); + peer->recv_buf_cur -= full_packet_size; + } + else{ + /* Haven't read the full message yet. */ + break; + } + } + } + + if(peer != NULL) + { + peer->recv_busy = false; + } +} + +void DirectPlay8Peer::peer_accept(std::unique_lock &l) +{ + if(listener_socket == -1) + { + return; + } + struct sockaddr_in addr; int addrlen = sizeof(addr); - int newfd = accept(sock, (struct sockaddr*)(&addr), &addrlen); + int newfd = accept(listener_socket, (struct sockaddr*)(&addr), &addrlen); if(newfd == -1) { DWORD err = WSAGetLastError(); @@ -823,154 +1314,137 @@ void DirectPlay8Peer::io_listener_accept(int sock) return; } - peers.emplace_back(newfd, addr.sin_addr.s_addr, ntohs(addr.sin_port)); + unsigned int peer_id = next_peer_id++; + Peer *peer = new Peer(Peer::PS_ACCEPTED, newfd, addr.sin_addr.s_addr, ntohs(addr.sin_port)); + + peers.insert(std::make_pair(peer_id, peer)); + + if(WSAEventSelect(peer->sock, peer->event, FD_READ | FD_WRITE | FD_CLOSE) != 0) + { + /* TODO */ + } + + worker_pool.add_handle(peer->event, [this, peer_id]() { io_peer_triggered(peer_id); }); } -bool DirectPlay8Peer::io_tcp_recv(Player *player) +bool DirectPlay8Peer::peer_connect(Peer::PeerState initial_state, uint32_t remote_ip, uint16_t remote_port) { - int r = recv(player->sock, (char*)(player->recv_buf) + player->recv_buf_cur, sizeof(player->recv_buf) - player->recv_buf_cur, 0); - if(r == 0) + int p_sock = create_client_socket(local_ip, local_port); + if(p_sock == -1) { - /* Connection closed */ return false; } - else if(r == -1) + + unsigned int peer_id = next_peer_id++; + Peer *peer = new Peer(initial_state, p_sock, remote_ip, remote_port); + + if(WSAEventSelect(peer->sock, peer->event, FD_CONNECT | FD_READ | FD_WRITE | FD_CLOSE) != 0) { - DWORD err = WSAGetLastError(); + closesocket(peer->sock); + delete peer; - if(err == WSAEWOULDBLOCK) - { - /* Nothing to read */ - return true; - } - else{ - /* Read error. */ - return false; - } + return false; } - player->recv_buf_cur += r; + struct sockaddr_in r_addr; + r_addr.sin_family = AF_INET; + r_addr.sin_addr.s_addr = remote_ip; + r_addr.sin_port = htons(remote_port); - if(player->recv_buf_cur >= sizeof(TLVChunk)) + if(connect(peer->sock, (struct sockaddr*)(&r_addr), sizeof(r_addr)) != -1 || WSAGetLastError() != WSAEWOULDBLOCK) { - TLVChunk *header = (TLVChunk*)(player->recv_buf); - size_t full_packet_size = sizeof(TLVChunk) + header->value_length; + closesocket(peer->sock); + delete peer; - if(full_packet_size > MAX_PACKET_SIZE) + return false; + } + + peers.insert(std::make_pair(peer_id, peer)); + + worker_pool.add_handle(peer->event, [this, peer_id]() { io_peer_triggered(peer_id); }); + + return true; +} + +void DirectPlay8Peer::peer_destroy(std::unique_lock &l, unsigned int peer_id, HRESULT outstanding_op_result) +{ + Peer *peer; + + while((peer = get_peer_by_peer_id(peer_id)) != NULL) + { + SendQueue::SendOp *sqop; + if((sqop = peer->sq.get_pending()) != NULL) { - /* Malformed packet received */ - return false; - } - - if(player->recv_buf_cur >= full_packet_size) - { - /* Process message */ - std::unique_ptr pd; + peer->sq.pop_pending(sqop); - try { - pd.reset(new PacketDeserialiser(player->recv_buf, full_packet_size)); - } - catch(const PacketDeserialiser::Error &e) - { - /* Malformed packet received */ - return false; - } + sqop->invoke_callback(l, outstanding_op_result); - /* Message at the front of the buffer has been dealt with, shift any - * remaining data beyond it to the front and truncate it. + delete sqop; + + /* Return to the start in case the peer has gone away while the lock was + * released to run the callback. */ - - memmove(player->recv_buf, player->recv_buf + full_packet_size, - player->recv_buf_cur - full_packet_size); - player->recv_buf_cur -= full_packet_size; + continue; } + + if(peer->state == Peer::PS_CONNECTED) + { + /* TODO: DPN_MSGID_DESTROY_PLAYER? */ + player_to_peer_id.erase(peer->player_id); + } + + worker_pool.remove_handle(peer->event); + + closesocket(peer->sock); + + peers.erase(peer_id); + delete peer; + + break; } - - return true; } -bool DirectPlay8Peer::io_tcp_send(Player *player) +/* Immediately close all sockets and erase all peers. */ +void DirectPlay8Peer::close_everything_now(std::unique_lock &l) { - SendQueue::Buffer *sqb = player->sq.get_next(); - - while(player->send_buf != NULL || sqb != NULL) + while(!peers.empty()) { - if(player->sqb == NULL) - { - std::pair sqb_data = sqb->get_data(); - - player->send_buf = (const unsigned char*)(sqb_data.first); - player->send_remain = sqb_data.second; - } - - int s = send(player->sock, (const char*)(player->send_buf), player->send_remain, 0); - if(s < 0) - { - DWORD err = WSAGetLastError(); - - if(err == WSAEWOULDBLOCK) - { - break; - } - else{ - /* TODO: Better error codes */ - player->sq.complete(sqb, DPNERR_GENERIC); - return false; - } - } - - if((size_t)(s) == player->send_remain) - { - player->send_buf = NULL; - - player->sq.complete(sqb, S_OK); - sqb = player->sq.get_next(); - } - else{ - player->send_buf += s; - player->send_remain -= s; - } + peer_destroy(l, peers.begin()->first, DPNERR_GENERIC); } - return true; + if(discovery_socket != -1) + { + WSAEventSelect(discovery_socket, other_socket_event, 0); + + closesocket(discovery_socket); + discovery_socket = -1; + } + + if(listener_socket != -1) + { + WSAEventSelect(listener_socket, other_socket_event, 0); + + closesocket(listener_socket); + listener_socket = -1; + } + + if(udp_socket != -1) + { + WSAEventSelect(udp_socket, udp_socket_event, 0); + + closesocket(udp_socket); + udp_socket = -1; + } } -class SQB_TODO: public SendQueue::Buffer -{ - private: - PFNDPNMESSAGEHANDLER message_handler; - PVOID message_handler_ctx; - - DPNMSG_ENUM_HOSTS_QUERY query; - - public: - SQB_TODO(const void *data, size_t data_size, const struct sockaddr_in *dest_addr, - PFNDPNMESSAGEHANDLER message_handler, PVOID message_handler_ctx, - DPNMSG_ENUM_HOSTS_QUERY query): - Buffer(data, data_size, (const struct sockaddr*)(dest_addr), sizeof(*dest_addr)), - message_handler(message_handler), - message_handler_ctx(message_handler_ctx), - query(query) {} - - virtual void complete(HRESULT result) override - { - if(query.pvResponseData != NULL) - { - DPNMSG_RETURN_BUFFER rb; - memset(&rb, 0, sizeof(rb)); - - rb.dwSize = sizeof(rb); - rb.hResultCode = result; - rb.pvBuffer = query.pvResponseData; - rb.pvUserContext = query.pvResponseContext; - - message_handler(message_handler_ctx, DPN_MSGID_RETURN_BUFFER, &rb); - } - } -}; - -void DirectPlay8Peer::handle_host_enum_request(const PacketDeserialiser &pd, const struct sockaddr_in *from_addr) +void DirectPlay8Peer::handle_host_enum_request(std::unique_lock &l, const PacketDeserialiser &pd, const struct sockaddr_in *from_addr) { + if(state != STATE_HOSTING) + { + /* TODO */ + return; + } + if(!pd.is_null(0)) { GUID r_application_guid = pd.get_guid(0); @@ -984,60 +1458,488 @@ void DirectPlay8Peer::handle_host_enum_request(const PacketDeserialiser &pd, con } } - DPNMSG_ENUM_HOSTS_QUERY query; - memset(&query, 0, sizeof(query)); + DPNMSG_ENUM_HOSTS_QUERY ehq; + memset(&ehq, 0, sizeof(ehq)); - query.dwSize = sizeof(query); - query.pAddressSender = NULL; // TODO - query.pAddressDevice = NULL; // TODO + ehq.dwSize = sizeof(ehq); + ehq.pAddressSender = NULL; // TODO + ehq.pAddressDevice = NULL; // TODO if(!pd.is_null(1)) { std::pair data = pd.get_data(1); - query.pvReceivedData = (void*)(data.first); /* TODO: Make a non-const copy? */ - query.dwReceivedDataSize = data.second; + ehq.pvReceivedData = (void*)(data.first); /* TODO: Make a non-const copy? */ + ehq.dwReceivedDataSize = data.second; } - query.dwMaxResponseDataSize = 9999; // TODO + ehq.dwMaxResponseDataSize = 9999; // TODO DWORD req_tick = pd.get_dword(2); - if(message_handler(message_handler_ctx, DPN_MSGID_ENUM_HOSTS_QUERY, &query) == DPN_OK) + l.unlock(); + HRESULT ehq_result = message_handler(message_handler_ctx, DPN_MSGID_ENUM_HOSTS_QUERY, &ehq); + l.lock(); + + std::vector response_data_buffer; + if(ehq.dwResponseDataSize > 0) { - PacketSerialiser ps(DPLITE_MSGID_HOST_ENUM_RESPONSE); + response_data_buffer.reserve(ehq.dwResponseDataSize); + response_data_buffer.insert(response_data_buffer.end(), + (const unsigned char*)(ehq.pvResponseData), + (const unsigned char*)(ehq.pvResponseData) + ehq.dwResponseDataSize); - ps.append_dword(password.empty() ? 0 : DPNSESSION_REQUIREPASSWORD); - ps.append_guid(instance_guid); - ps.append_guid(application_guid); - ps.append_dword(max_players); - ps.append_dword(other_player_ids.size() + 1); - ps.append_wstring(session_name); + DPNMSG_RETURN_BUFFER rb; + memset(&rb, 0, sizeof(rb)); + + rb.dwSize = sizeof(rb); + rb.hResultCode = S_OK; + rb.pvBuffer = ehq.pvResponseData; + rb.pvUserContext = ehq.pvResponseContext; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_RETURN_BUFFER, &rb); + l.lock(); + + ehq.pvResponseData = response_data_buffer.data(); + } + + if(state != STATE_HOSTING) + { + return; + } + + if(ehq_result == DPN_OK) + { + PacketSerialiser host_enum_response(DPLITE_MSGID_HOST_ENUM_RESPONSE); + + host_enum_response.append_dword(password.empty() ? 0 : DPNSESSION_REQUIREPASSWORD); + host_enum_response.append_guid(instance_guid); + host_enum_response.append_guid(application_guid); + host_enum_response.append_dword(max_players); + host_enum_response.append_dword(player_to_peer_id.size() + 1); + host_enum_response.append_wstring(session_name); if(!application_data.empty()) { - ps.append_data(application_data.data(), application_data.size()); + host_enum_response.append_data(application_data.data(), application_data.size()); } else{ - ps.append_null(); + host_enum_response.append_null(); } - if(query.pvResponseData != NULL && query.dwResponseDataSize != 0) + if(ehq.dwResponseDataSize > 0) { - ps.append_data(query.pvResponseData, query.dwResponseDataSize); + host_enum_response.append_data(ehq.pvResponseData, ehq.dwResponseDataSize); } else{ - ps.append_null(); + host_enum_response.append_null(); } - ps.append_dword(req_tick); + host_enum_response.append_dword(req_tick); - std::pair raw_pkt = ps.raw_packet(); - - udp_sq.send(SendQueue::SEND_PRI_MEDIUM, new SQB_TODO(raw_pkt.first, raw_pkt.second, from_addr, - message_handler, message_handler_ctx, query)); + udp_sq.send(SendQueue::SEND_PRI_MEDIUM, + host_enum_response, + from_addr, + [](std::unique_lock &l, HRESULT result){}); } else{ /* Application rejected the DPNMSG_ENUM_HOSTS_QUERY message. */ + /* TODO */ } } + +void DirectPlay8Peer::handle_host_connect_request(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd) +{ + Peer *peer = get_peer_by_peer_id(peer_id); + assert(peer != NULL); + + if(peer->state != Peer::PS_ACCEPTED) + { + /* TODO */ + return; + } + + auto send_fail = [&peer](DWORD error, void *data, size_t data_size) + { + PacketSerialiser connect_host_fail(DPLITE_MSGID_CONNECT_HOST_FAIL); + connect_host_fail.append_dword(error); + + if(data_size > 0) + { + connect_host_fail.append_data(data, data_size); + } + else{ + connect_host_fail.append_null(); + } + + peer->sq.send(SendQueue::SEND_PRI_MEDIUM, + connect_host_fail, + NULL, + [](std::unique_lock &l, HRESULT result) {}); + }; + + if(state != STATE_HOSTING) + { + send_fail(DPNERR_NOTHOST, NULL, 0); + return; + } + + if(!pd.is_null(0) && pd.get_guid(0) != instance_guid) + { + send_fail(DPNERR_INVALIDINSTANCE, NULL, 0); + return; + } + + if(pd.get_guid(1) != application_guid) + { + send_fail(DPNERR_INVALIDAPPLICATION, NULL, 0); + return; + } + + std::wstring req_password = pd.is_null(2) ? L"" : pd.get_wstring(2); + + if(req_password != password) + { + send_fail(DPNERR_INVALIDPASSWORD, NULL, 0); + return; + } + + DPNMSG_INDICATE_CONNECT ic; + memset(&ic, 0, sizeof(ic)); + + ic.dwSize = sizeof(ic); + + if(!pd.is_null(3)) + { + std::pair d = pd.get_data(3); + + ic.pvUserConnectData = (void*)(d.first); /* TODO: Take non-const copy? */ + ic.dwUserConnectDataSize = d.second; + } + + ic.pAddressPlayer = NULL; /* TODO */ + ic.pAddressDevice = NULL; /* TODO */ + + peer->state = Peer::PS_INDICATING; + + l.unlock(); + HRESULT ic_result = message_handler(message_handler_ctx, DPN_MSGID_INDICATE_CONNECT, &ic); + l.lock(); + + std::vector reply_data_buffer; + if(ic.dwReplyDataSize > 0) + { + reply_data_buffer.reserve(ic.dwReplyDataSize); + reply_data_buffer.insert(reply_data_buffer.end(), + (const unsigned char*)(ic.pvReplyData), + (const unsigned char*)(ic.pvReplyData) + ic.dwReplyDataSize); + + DPNMSG_RETURN_BUFFER rb; + memset(&rb, 0, sizeof(rb)); + + rb.dwSize = sizeof(rb); + rb.hResultCode = S_OK; /* TODO: Size check */ + rb.pvBuffer = ic.pvReplyData; + rb.pvUserContext = ic.pvReplyContext; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_RETURN_BUFFER, &rb); + l.lock(); + + ic.pvReplyData = reply_data_buffer.data(); + } + + RENEW_PEER_OR_RETURN(); + + if(ic_result == DPN_OK) + { + /* Connection accepted by application. */ + + peer->player_id = next_player_id++; + peer->player_ctx = ic.pvPlayerContext; + + player_to_peer_id[peer->player_id] = peer_id; + + peer->state = Peer::PS_CONNECTED; + + PacketSerialiser connect_host_ok(DPLITE_MSGID_CONNECT_HOST_OK); + connect_host_ok.append_guid(instance_guid); + connect_host_ok.append_dword(host_player_id); + connect_host_ok.append_dword(peer->player_id); + + connect_host_ok.append_dword(0); /* TODO: Other peers */ + + if(ic.dwReplyDataSize > 0) + { + connect_host_ok.append_data(ic.pvReplyData, ic.dwReplyDataSize); + } + else{ + connect_host_ok.append_null(); + } + + peer->sq.send(SendQueue::SEND_PRI_MEDIUM, + connect_host_ok, + NULL, + [](std::unique_lock &l, HRESULT result) {}); + + DPNMSG_CREATE_PLAYER cp; + memset(&cp, 0, sizeof(cp)); + + cp.dwSize = sizeof(cp); + cp.dpnidPlayer = peer->player_id; + cp.pvPlayerContext = peer->player_ctx; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_CREATE_PLAYER, &cp); + l.lock(); + + RENEW_PEER_OR_RETURN(); + + peer->player_ctx = cp.pvPlayerContext; + } + else{ + /* Connection rejected by application. */ + send_fail(DPNERR_HOSTREJECTEDCONNECTION, ic.pvReplyData, ic.dwReplyDataSize); + } +} + +/* Successful response to DPLITE_MSGID_CONNECT_HOST from host. + * + * GUID - Instance GUID + * DWORD - Player ID of current host + * DWORD - Player ID assigned to receiving client + * DWORD - Number of other peers (total - 2) + * + * For each peer: + * DWORD - Player ID + * DWORD - IPv4 address (network byte order) + * DWORD - Port (host byte order) + * + * DATA | NULL - Response data +*/ + +void DirectPlay8Peer::handle_host_connect_ok(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd) +{ + Peer *peer = get_peer_by_peer_id(peer_id); + assert(peer != NULL); + + if(peer->state != Peer::PS_REQUESTING_HOST) + { + /* TODO: LOG ME */ + return; + } + + assert(state == STATE_CONNECTING); + + instance_guid = pd.get_guid(0); + + host_player_id = pd.get_dword(1); + + peer->player_id = host_player_id; + player_to_peer_id[peer->player_id] = peer_id; + + local_player_id = pd.get_dword(2); + + DWORD n_other_peers = pd.get_dword(3); + + for(DWORD n = 0; n < n_other_peers; ++n) + { + DPNID player_id = pd.get_dword(4 + (n * 3)); + uint32_t player_ipaddr = pd.get_dword(5 + (n * 3)); + uint16_t player_port = pd.get_dword(6 + (n * 3)); + + /* TODO: Setup connections to other peers. */ + abort(); + } + + connect_reply_data.clear(); + + if(!pd.is_null(4 + (n_other_peers * 3))) + { + std::pair d = pd.get_data(4 + (n_other_peers * 3)); + + if(d.second > 0) + { + connect_reply_data.reserve(d.second); + connect_reply_data.insert(connect_reply_data.end(), + (unsigned const char*)(d.first), + (unsigned const char*)(d.first) + d.second); + } + } + + peer->state = Peer::PS_CONNECTED; + + { + DPNMSG_CREATE_PLAYER cp; + memset(&cp, 0, sizeof(cp)); + + cp.dwSize = sizeof(cp); + cp.dpnidPlayer = local_player_id; + cp.pvPlayerContext = local_player_ctx; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_CREATE_PLAYER, &cp); + l.lock(); + + local_player_ctx = cp.pvPlayerContext; + } + + { + DPNMSG_CREATE_PLAYER cp; + memset(&cp, 0, sizeof(cp)); + + cp.dwSize = sizeof(cp); + cp.dpnidPlayer = peer->player_id; + cp.pvPlayerContext = NULL; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_CREATE_PLAYER, &cp); + l.lock(); + + RENEW_PEER_OR_RETURN(); + + peer->player_ctx = cp.pvPlayerContext; + } + + connect_check(l); +} + +void DirectPlay8Peer::handle_host_connect_fail(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd) +{ + Peer *peer = get_peer_by_peer_id(peer_id); + assert(peer != NULL); + + if(peer->state != Peer::PS_REQUESTING_HOST) + { + /* TODO: LOG ME */ + return; + } + + assert(state == STATE_CONNECTING); + + DWORD hResultCode = DPNERR_GENERIC; + const void *pvApplicationReplyData = NULL; + DWORD dwApplicationReplyDataSize = 0; + + try { + hResultCode = pd.get_dword(0); + + if(!pd.is_null(1)) + { + std::pair d = pd.get_data(1); + + if(d.second > 0) + { + pvApplicationReplyData = d.first; + dwApplicationReplyDataSize = d.second; + } + } + } + catch(const PacketDeserialiser::Error &e) + { + /* TODO: LOG ME */ + } + + connect_fail(l, hResultCode, pvApplicationReplyData, dwApplicationReplyDataSize); +} + +/* Check if we have finished connecting and should enter STATE_CONNECTED. + * + * This is called after processing either of: + * + * DPLITE_MSGID_CONNECT_HOST_OK + * DPLITE_MSGID_CONNECT_PEER_OK + * + * If there are no outgoing connections still outstanding, then we have + * successfully connected to every peer in the session at the point the server + * accepted us and we should proceed. +*/ +void DirectPlay8Peer::connect_check(std::unique_lock &l) +{ + assert(state == STATE_CONNECTING); + + /* Search for any outgoing connections we have initiated that haven't + * completed or failed yet. + */ + + for(auto p = peers.begin(); p != peers.end(); ++p) + { + switch(p->second->state) + { + case Peer::PS_CONNECTING_HOST: + case Peer::PS_REQUESTING_HOST: + case Peer::PS_CONNECTING_PEER: + case Peer::PS_REQUESTING_PEER: + return; + + default: + break; + } + } + + /* No outgoing connections in progress, proceed to STATE_CONNECTED! */ + + state = STATE_CONNECTED; + + DPNMSG_CONNECT_COMPLETE cc; + memset(&cc, 0, sizeof(cc)); + + cc.dwSize = sizeof(cc); + cc.hAsyncOp = connect_handle; + cc.pvUserContext = connect_ctx; + cc.hResultCode = S_OK; + cc.dpnidLocal = local_player_id; + + if(!connect_reply_data.empty()) + { + cc.pvApplicationReplyData = connect_reply_data.data(); + cc.dwApplicationReplyDataSize = connect_reply_data.size(); + } + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_CONNECT_COMPLETE, &cc); + l.lock(); + + /* Signal the pending synchronous Connect() call (if any) to return. */ + + connect_result = S_OK; + connect_cv.notify_all(); +} + +/* Fail a pending Connect operation and return to STATE_INITIALISED. + * ... +*/ +void DirectPlay8Peer::connect_fail(std::unique_lock &l, HRESULT hResultCode, const void *pvApplicationReplyData, DWORD dwApplicationReplyDataSize) +{ + assert(state == STATE_CONNECTING); + + close_everything_now(l); + + state = STATE_CONNECT_FAILED; + + DPNMSG_CONNECT_COMPLETE cc; + memset(&cc, 0, sizeof(cc)); + + cc.dwSize = sizeof(cc); + cc.hAsyncOp = connect_handle; + cc.pvUserContext = connect_ctx; + + cc.hResultCode = hResultCode; + cc.pvApplicationReplyData = (void*)(pvApplicationReplyData); /* TODO: Take non-const copy? */ + cc.dwApplicationReplyDataSize = dwApplicationReplyDataSize; + + l.unlock(); + message_handler(message_handler_ctx, DPN_MSGID_CONNECT_COMPLETE, &cc); + l.lock(); + + /* Signal the pending synchronous Connect() call (if any) to return. */ + + connect_result = hResultCode; + state = STATE_INITIALISED; + connect_cv.notify_all(); +} + +DirectPlay8Peer::Peer::Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port): + state(state), sock(sock), ip(ip), port(port), recv_busy(false), recv_buf_cur(0), sq(event) +{} diff --git a/src/DirectPlay8Peer.hpp b/src/DirectPlay8Peer.hpp index 1d1a3ec..8abb87b 100644 --- a/src/DirectPlay8Peer.hpp +++ b/src/DirectPlay8Peer.hpp @@ -31,6 +31,8 @@ class DirectPlay8Peer: public IDirectPlay8Peer STATE_NEW, STATE_INITIALISED, STATE_HOSTING, + STATE_CONNECTING, + STATE_CONNECT_FAILED, STATE_CONNECTED, } state; @@ -45,6 +47,10 @@ class DirectPlay8Peer: public IDirectPlay8Peer std::wstring password; std::vector application_data; + /* Local IP and port for all our sockets, except discovery_socket. */ + uint32_t local_ip; + uint16_t local_port; + int udp_socket; /* UDP socket, used for general send/recv operations. */ int listener_socket; /* TCP listener socket. */ int discovery_socket; /* Discovery UDP sockets, RECIEVES broadcasts only. */ @@ -56,17 +62,29 @@ class DirectPlay8Peer: public IDirectPlay8Peer SendQueue udp_sq; - struct Player + struct Peer { - enum PlayerState { + enum PeerState { /* Peer has connected to us, we're waiting for the initial message from it. */ - PS_INIT, + PS_ACCEPTED, - /* We are the host and the peer has sent the initial connect request, we are waiting + /* We have started connecting to the host, waiting for async connect() to complete. */ + PS_CONNECTING_HOST, + + /* TCP connection to host open, waiting for response to DPLITE_MSGID_CONNECT_HOST. */ + PS_REQUESTING_HOST, + + /* We have started connecting to a peer, waiting for async connect() to complete. */ + PS_CONNECTING_PEER, + + /* TCP connection to peer open, waiting for response to DPLITE_MSGID_CONNECT_PEER. */ + PS_REQUESTING_PEER, + + /* We are the host and the peer has sent DPLITE_MSGID_CONNECT_HOST, we are waiting * for the application to process DPN_MSGID_INDICATE_CONNECT before we either add the * player to the session or reject it. */ - PS_CONNECTING, + PS_INDICATING, /* This is a fully-fledged peer. */ PS_CONNECTED, @@ -79,7 +97,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer PS_CLOSING, }; - enum PlayerState state; + enum PeerState state; /* This is the TCP socket to the peer, we may have connected to it, or it * may have connected to us depending who joined the session first, that @@ -90,25 +108,30 @@ class DirectPlay8Peer: public IDirectPlay8Peer uint32_t ip; /* IPv4 address, network byte order. */ uint16_t port; /* TCP and UDP port, host byte order. */ - DPNID id; /* Player ID, not initialised before state PS_CONNECTED. */ + DPNID player_id; /* Player ID, not initialised before state PS_CONNECTED. */ + void *player_ctx; /* Player context, not initialised before state PS_CONNECTED. */ + bool recv_busy; unsigned char recv_buf[MAX_PACKET_SIZE]; size_t recv_buf_cur; EventObject event; SendQueue sq; - SendQueue::Buffer *sqb; - const unsigned char *send_buf; - size_t send_remain; - - Player(int sock, uint32_t ip, uint16_t port): - state(PS_INIT), sock(sock), ip(ip), port(port), recv_buf_cur(0), sq(event), send_buf(NULL) {} + Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port); }; - std::list peers; - std::map::iterator> other_player_ids; + DPNID local_player_id; + void *local_player_ctx; + + DPNID next_player_id; + DPNID host_player_id; + + unsigned int next_peer_id; + std::map peers; + + std::map player_to_peer_id; /* Serialises access to: * @@ -118,17 +141,39 @@ class DirectPlay8Peer: public IDirectPlay8Peer */ std::mutex lock; - void io_main(); + std::condition_variable connect_cv; + + void *connect_ctx; + DPNHANDLE connect_handle; + std::vector connect_req_data; + + HRESULT connect_result; + std::vector connect_reply_data; + + Peer *get_peer_by_peer_id(unsigned int peer_id); + Peer *get_peer_by_player_id(DPNID player_id); void handle_udp_socket_event(); + void io_udp_send(std::unique_lock &l); void handle_other_socket_event(); - void io_udp_send(int sock, SendQueue &q); - void io_listener_accept(int sock); - bool io_tcp_recv(Player *player); - bool io_tcp_send(Player *player); + void io_peer_triggered(unsigned int peer_id); + void io_peer_connected(std::unique_lock &l, unsigned int peer_id); + void io_peer_send(std::unique_lock &l, unsigned int peer_id); + void io_peer_recv(std::unique_lock &l, unsigned int peer_id); - void handle_host_enum_request(const PacketDeserialiser &pd, const struct sockaddr_in *from_addr); + void peer_accept(std::unique_lock &l); + bool peer_connect(Peer::PeerState initial_state, uint32_t remote_ip, uint16_t remote_port); + void peer_destroy(std::unique_lock &l, unsigned int peer_id, HRESULT outstanding_op_result); + + void close_everything_now(std::unique_lock &l); + + void handle_host_enum_request(std::unique_lock &l, const PacketDeserialiser &pd, const struct sockaddr_in *from_addr); + void handle_host_connect_request(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); + void handle_host_connect_ok(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); + void handle_host_connect_fail(std::unique_lock &l, unsigned int peer_id, const PacketDeserialiser &pd); + void connect_check(std::unique_lock &l); + void connect_fail(std::unique_lock &l, HRESULT hResultCode, const void *pvApplicationReplyData, DWORD dwApplicationReplyDataSize); public: DirectPlay8Peer(std::atomic *global_refcount); diff --git a/src/EventObject.hpp b/src/EventObject.hpp index 76ae335..131f358 100644 --- a/src/EventObject.hpp +++ b/src/EventObject.hpp @@ -7,6 +7,9 @@ class EventObject { private: + /* No copy c'tor. */ + EventObject(const EventObject&) = delete; + HANDLE handle; public: diff --git a/src/Messages.hpp b/src/Messages.hpp index 8cc2d62..5f24801 100644 --- a/src/Messages.hpp +++ b/src/Messages.hpp @@ -26,4 +26,40 @@ * DWORD - Tick count from DPLITE_MSGID_HOST_ENUM_REQUEST */ +#define DPLITE_MSGID_CONNECT_HOST 3 + +/* Initial Connect() request to host. + * + * GUID | NULL - Instance GUID + * GUID - Application GUID + * WSTRING | NULL - Password + * DATA | NULL - Request data +*/ + +#define DPLITE_MSGID_CONNECT_HOST_OK 4 + +/* Successful response to DPLITE_MSGID_CONNECT_HOST from host. + * + * GUID - Instance GUID + * DWORD - Player ID of current host + * DWORD - Player ID assigned to receiving client + * DWORD - Number of other peers (total - 2) + * + * For each peer: + * DWORD - Player ID + * DWORD - IPv4 address (network byte order) + * DWORD - Port (host byte order) + * + * DATA | NULL - Response data +*/ + +#define DPLITE_MSGID_CONNECT_HOST_FAIL 5 + +/* Negative response to DPLITE_MSGID_CONNECT_HOST from host. + * Host will close the connection after sending this. + * + * DWORD - Error code (DPNERR_HOSTREJECTEDCONNECTION, DPNERR_INVALIDAPPLICATION, etc) + * DATA | NULL - Response data +*/ + #endif /* !DPLITE_MESSAGES_HPP */ diff --git a/src/SendQueue.cpp b/src/SendQueue.cpp index 4fdc3af..5e69e4c 100644 --- a/src/SendQueue.cpp +++ b/src/SendQueue.cpp @@ -4,27 +4,34 @@ #include "SendQueue.hpp" -void SendQueue::send(SendPriority priority, Buffer *buffer) +void SendQueue::send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function&, HRESULT)> &callback) { + std::pair data = ps.raw_packet(); + + SendOp *op = new SendOp( + data.first, data.second, + (const struct sockaddr*)(dest_addr), (dest_addr != NULL ? sizeof(*dest_addr) : 0), + callback); + switch(priority) { case SEND_PRI_LOW: - low_queue.push_back(buffer); + low_queue.push_back(op); break; case SEND_PRI_MEDIUM: - medium_queue.push_back(buffer); + medium_queue.push_back(op); break; case SEND_PRI_HIGH: - high_queue.push_back(buffer); + high_queue.push_back(op); break; } SetEvent(signal_on_queue); } -SendQueue::Buffer *SendQueue::get_next() +SendQueue::SendOp *SendQueue::get_pending() { if(current != NULL) { @@ -50,33 +57,48 @@ SendQueue::Buffer *SendQueue::get_next() return current; } -void SendQueue::complete(SendQueue::Buffer *buffer, HRESULT result) +void SendQueue::pop_pending(SendQueue::SendOp *op) { - assert(buffer == current); - + assert(op == current); current = NULL; - - buffer->complete(result); - delete buffer; } -SendQueue::Buffer::Buffer(const void *data, size_t data_size, const struct sockaddr *dest_addr, int dest_addr_len): - data((const unsigned char*)(data), (const unsigned char*)(data) + data_size) +SendQueue::SendOp::SendOp(const void *data, size_t data_size, + const struct sockaddr *dest_addr, size_t dest_addr_size, + const std::function&, HRESULT)> &callback): + + data((const unsigned char*)(data), (const unsigned char*)(data) + data_size), + sent_data(0), + callback(callback) { - assert((size_t)(dest_addr_len) <= sizeof(this->dest_addr)); + assert((size_t)(dest_addr_size) <= sizeof(this->dest_addr)); - memcpy(&(this->dest_addr), dest_addr, dest_addr_len); - this->dest_addr_len = dest_addr_len; + memcpy(&(this->dest_addr), dest_addr, dest_addr_size); + this->dest_addr_size = dest_addr_size; } -SendQueue::Buffer::~Buffer() {} - -std::pair SendQueue::Buffer::get_data() +std::pair SendQueue::SendOp::get_data() const { return std::make_pair(data.data(), data.size()); } -std::pair SendQueue::Buffer::get_dest_addr() +std::pair SendQueue::SendOp::get_dest_addr() const { - return std::make_pair((struct sockaddr*)(&dest_addr), (int)(dest_addr_len)); + return std::make_pair((const struct sockaddr*)(&dest_addr), dest_addr_size); +} + +void SendQueue::SendOp::inc_sent_data(size_t sent) +{ + sent_data += sent; + assert(sent_data <= data.size()); +} + +std::pair SendQueue::SendOp::get_pending_data() const +{ + return std::make_pair(data.data() + sent_data, data.size() - sent_data); +} + +void SendQueue::SendOp::invoke_callback(std::unique_lock &l, HRESULT result) const +{ + callback(l, result); } diff --git a/src/SendQueue.hpp b/src/SendQueue.hpp index 62072aa..9a68631 100644 --- a/src/SendQueue.hpp +++ b/src/SendQueue.hpp @@ -3,13 +3,17 @@ #include +#include #include +#include #include #include #include #include #include +#include "packet.hpp" + class SendQueue { public: @@ -19,32 +23,38 @@ class SendQueue SEND_PRI_HIGH = 4, }; - class Buffer { + class SendOp + { private: std::vector data; + size_t sent_data; struct sockaddr_storage dest_addr; - int dest_addr_len; + size_t dest_addr_size; - protected: - Buffer(const void *data, size_t data_size, const struct sockaddr *dest_addr = NULL, int dest_addr_len = 0); + std::function&, HRESULT)> callback; public: - virtual ~Buffer(); + SendOp( + const void *data, size_t data_size, + const struct sockaddr *dest_addr, size_t dest_addr_size, + const std::function&, HRESULT)> &callback); - std::pair get_data(); + std::pair get_data() const; + std::pair get_dest_addr() const; - std::pair get_dest_addr(); + void inc_sent_data(size_t sent); + std::pair get_pending_data() const; - virtual void complete(HRESULT result) = 0; + void invoke_callback(std::unique_lock &l, HRESULT result) const; }; private: - std::list low_queue; - std::list medium_queue; - std::list high_queue; + std::list low_queue; + std::list medium_queue; + std::list high_queue; - Buffer *current; + SendOp *current; HANDLE signal_on_queue; @@ -54,10 +64,10 @@ class SendQueue /* No copy c'tor. */ SendQueue(const SendQueue &src) = delete; - void send(SendPriority priority, Buffer *buffer); + void send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function&, HRESULT)> &callback); - Buffer *get_next(); - void complete(Buffer *buffer, HRESULT result); + SendOp *get_pending(); + void pop_pending(SendOp *op); }; #endif /* !DPLITE_SENDQUEUE_HPP */ diff --git a/src/network.cpp b/src/network.cpp index 272d852..eb2e776 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -55,6 +55,13 @@ int create_listener_socket(uint32_t ipaddr, uint16_t port) return -1; } + BOOL reuse = TRUE; + if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)(&reuse), sizeof(BOOL)) == -1) + { + closesocket(sock); + return -1; + } + struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; @@ -75,6 +82,42 @@ int create_listener_socket(uint32_t ipaddr, uint16_t port) return sock; } +int create_client_socket(uint32_t local_ipaddr, uint16_t local_port) +{ + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1) + { + return -1; + } + + u_long non_blocking = 1; + if(ioctlsocket(sock, FIONBIO, &non_blocking) != 0) + { + closesocket(sock); + return -1; + } + + BOOL reuse = TRUE; + if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)(&reuse), sizeof(BOOL)) == -1) + { + closesocket(sock); + return -1; + } + + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + l_addr.sin_addr.s_addr = local_ipaddr; + l_addr.sin_port = htons(local_port); + + if(bind(sock, (struct sockaddr*)(&l_addr), sizeof(l_addr)) == -1) + { + closesocket(sock); + return -1; + } + + return sock; +} + int create_discovery_socket() { int sock = socket(AF_INET, SOCK_DGRAM, 0); diff --git a/src/network.hpp b/src/network.hpp index 13201e0..f22c53e 100644 --- a/src/network.hpp +++ b/src/network.hpp @@ -10,6 +10,7 @@ int create_udp_socket(uint32_t ipaddr, uint16_t port); int create_listener_socket(uint32_t ipaddr, uint16_t port); +int create_client_socket(uint32_t local_ipaddr, uint16_t local_port); int create_discovery_socket(); #endif /* !DPLITE_NETWORK_HPP */ diff --git a/src/packet.cpp b/src/packet.cpp index ca48622..748810e 100644 --- a/src/packet.cpp +++ b/src/packet.cpp @@ -24,7 +24,7 @@ PacketSerialiser::PacketSerialiser(uint32_t type) sbuf.insert(sbuf.begin(), (unsigned char*)(&header), (unsigned char*)(&header + 1)); } -std::pair PacketSerialiser::raw_packet() +std::pair PacketSerialiser::raw_packet() const { return std::make_pair(sbuf.data(), sbuf.size()); } diff --git a/src/packet.hpp b/src/packet.hpp index df95735..e428f5c 100644 --- a/src/packet.hpp +++ b/src/packet.hpp @@ -24,7 +24,7 @@ class PacketSerialiser public: PacketSerialiser(uint32_t type); - std::pair raw_packet(); + std::pair raw_packet() const; void append_null(); void append_dword(DWORD value); diff --git a/tests/DirectPlay8Peer.cpp b/tests/DirectPlay8Peer.cpp index 9e2b2ff..1df0b07 100644 --- a/tests/DirectPlay8Peer.cpp +++ b/tests/DirectPlay8Peer.cpp @@ -7,7 +7,7 @@ #include "../src/DirectPlay8Address.hpp" #include "../src/DirectPlay8Peer.hpp" -#define INSTANTIATE_FROM_COM +// #define INSTANTIATE_FROM_COM #define PORT 42895 @@ -1078,6 +1078,8 @@ TEST(DirectPlay8Peer, ConnectSync) EXPECT_EQ(ic->pvPlayerContext, (void*)(NULL)); /* TODO: Check pAddressPlayer, pAddressDevice */ + + ic->pvPlayerContext = (void*)(0xB441); } break; @@ -1091,7 +1093,7 @@ TEST(DirectPlay8Peer, ConnectSync) p1_player_id = cp->dpnidPlayer; EXPECT_EQ(cp->dwSize, sizeof(DPNMSG_CREATE_PLAYER)); - EXPECT_EQ(cp->pvPlayerContext, (void*)(0)); + EXPECT_EQ(cp->pvPlayerContext, (void*)(0xB441)); cp->pvPlayerContext = (void*)(0xFEED); } @@ -1349,6 +1351,8 @@ TEST(DirectPlay8Peer, ConnectAsync) EXPECT_EQ(ic->pvPlayerContext, (void*)(NULL)); /* TODO: Check pAddressPlayer, pAddressDevice */ + + ic->pvPlayerContext = (void*)(0xB441); } break; @@ -1362,7 +1366,7 @@ TEST(DirectPlay8Peer, ConnectAsync) p1_player_id = cp->dpnidPlayer; EXPECT_EQ(cp->dwSize, sizeof(DPNMSG_CREATE_PLAYER)); - EXPECT_EQ(cp->pvPlayerContext, (void*)(0)); + EXPECT_EQ(cp->pvPlayerContext, (void*)(0xB441)); cp->pvPlayerContext = (void*)(0xFEED); }