1
0
mirror of https://github.com/solemnwarning/directplay-lite synced 2024-12-30 16:45:37 +01:00

Milestone: Connect a single peer!

This commit is contained in:
Daniel Collins 2018-09-20 00:58:41 +01:00
parent 98b37cf76c
commit 25341f2488
11 changed files with 1310 additions and 244 deletions

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,8 @@ class DirectPlay8Peer: public IDirectPlay8Peer
STATE_NEW, STATE_NEW,
STATE_INITIALISED, STATE_INITIALISED,
STATE_HOSTING, STATE_HOSTING,
STATE_CONNECTING,
STATE_CONNECT_FAILED,
STATE_CONNECTED, STATE_CONNECTED,
} state; } state;
@ -45,6 +47,10 @@ class DirectPlay8Peer: public IDirectPlay8Peer
std::wstring password; std::wstring password;
std::vector<unsigned char> application_data; std::vector<unsigned char> application_data;
/* Local IP and port for all our sockets, except discovery_socket. */
uint32_t local_ip;
uint16_t local_port;
int udp_socket; /* UDP socket, used for general send/recv operations. */ int udp_socket; /* UDP socket, used for general send/recv operations. */
int listener_socket; /* TCP listener socket. */ int listener_socket; /* TCP listener socket. */
int discovery_socket; /* Discovery UDP sockets, RECIEVES broadcasts only. */ int discovery_socket; /* Discovery UDP sockets, RECIEVES broadcasts only. */
@ -56,17 +62,29 @@ class DirectPlay8Peer: public IDirectPlay8Peer
SendQueue udp_sq; SendQueue udp_sq;
struct Player struct Peer
{ {
enum PlayerState { enum PeerState {
/* Peer has connected to us, we're waiting for the initial message from it. */ /* Peer has connected to us, we're waiting for the initial message from it. */
PS_INIT, PS_ACCEPTED,
/* We are the host and the peer has sent the initial connect request, we are waiting /* We have started connecting to the host, waiting for async connect() to complete. */
PS_CONNECTING_HOST,
/* TCP connection to host open, waiting for response to DPLITE_MSGID_CONNECT_HOST. */
PS_REQUESTING_HOST,
/* We have started connecting to a peer, waiting for async connect() to complete. */
PS_CONNECTING_PEER,
/* TCP connection to peer open, waiting for response to DPLITE_MSGID_CONNECT_PEER. */
PS_REQUESTING_PEER,
/* We are the host and the peer has sent DPLITE_MSGID_CONNECT_HOST, we are waiting
* for the application to process DPN_MSGID_INDICATE_CONNECT before we either add the * for the application to process DPN_MSGID_INDICATE_CONNECT before we either add the
* player to the session or reject it. * player to the session or reject it.
*/ */
PS_CONNECTING, PS_INDICATING,
/* This is a fully-fledged peer. */ /* This is a fully-fledged peer. */
PS_CONNECTED, PS_CONNECTED,
@ -79,7 +97,7 @@ class DirectPlay8Peer: public IDirectPlay8Peer
PS_CLOSING, PS_CLOSING,
}; };
enum PlayerState state; enum PeerState state;
/* This is the TCP socket to the peer, we may have connected to it, or it /* This is the TCP socket to the peer, we may have connected to it, or it
* may have connected to us depending who joined the session first, that * may have connected to us depending who joined the session first, that
@ -90,25 +108,30 @@ class DirectPlay8Peer: public IDirectPlay8Peer
uint32_t ip; /* IPv4 address, network byte order. */ uint32_t ip; /* IPv4 address, network byte order. */
uint16_t port; /* TCP and UDP port, host byte order. */ uint16_t port; /* TCP and UDP port, host byte order. */
DPNID id; /* Player ID, not initialised before state PS_CONNECTED. */ DPNID player_id; /* Player ID, not initialised before state PS_CONNECTED. */
void *player_ctx; /* Player context, not initialised before state PS_CONNECTED. */
bool recv_busy;
unsigned char recv_buf[MAX_PACKET_SIZE]; unsigned char recv_buf[MAX_PACKET_SIZE];
size_t recv_buf_cur; size_t recv_buf_cur;
EventObject event; EventObject event;
SendQueue sq; SendQueue sq;
SendQueue::Buffer *sqb;
const unsigned char *send_buf; Peer(enum PeerState state, int sock, uint32_t ip, uint16_t port);
size_t send_remain;
Player(int sock, uint32_t ip, uint16_t port):
state(PS_INIT), sock(sock), ip(ip), port(port), recv_buf_cur(0), sq(event), send_buf(NULL) {}
}; };
std::list<Player> peers; DPNID local_player_id;
std::map<DPNID, std::list<Player>::iterator> other_player_ids; void *local_player_ctx;
DPNID next_player_id;
DPNID host_player_id;
unsigned int next_peer_id;
std::map<unsigned int, Peer*> peers;
std::map<DPNID, unsigned int> player_to_peer_id;
/* Serialises access to: /* Serialises access to:
* *
@ -118,17 +141,39 @@ class DirectPlay8Peer: public IDirectPlay8Peer
*/ */
std::mutex lock; std::mutex lock;
void io_main(); std::condition_variable connect_cv;
void *connect_ctx;
DPNHANDLE connect_handle;
std::vector<unsigned char> connect_req_data;
HRESULT connect_result;
std::vector<unsigned char> connect_reply_data;
Peer *get_peer_by_peer_id(unsigned int peer_id);
Peer *get_peer_by_player_id(DPNID player_id);
void handle_udp_socket_event(); void handle_udp_socket_event();
void io_udp_send(std::unique_lock<std::mutex> &l);
void handle_other_socket_event(); void handle_other_socket_event();
void io_udp_send(int sock, SendQueue &q); void io_peer_triggered(unsigned int peer_id);
void io_listener_accept(int sock); void io_peer_connected(std::unique_lock<std::mutex> &l, unsigned int peer_id);
bool io_tcp_recv(Player *player); void io_peer_send(std::unique_lock<std::mutex> &l, unsigned int peer_id);
bool io_tcp_send(Player *player); void io_peer_recv(std::unique_lock<std::mutex> &l, unsigned int peer_id);
void handle_host_enum_request(const PacketDeserialiser &pd, const struct sockaddr_in *from_addr); void peer_accept(std::unique_lock<std::mutex> &l);
bool peer_connect(Peer::PeerState initial_state, uint32_t remote_ip, uint16_t remote_port);
void peer_destroy(std::unique_lock<std::mutex> &l, unsigned int peer_id, HRESULT outstanding_op_result);
void close_everything_now(std::unique_lock<std::mutex> &l);
void handle_host_enum_request(std::unique_lock<std::mutex> &l, const PacketDeserialiser &pd, const struct sockaddr_in *from_addr);
void handle_host_connect_request(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);
void handle_host_connect_ok(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);
void handle_host_connect_fail(std::unique_lock<std::mutex> &l, unsigned int peer_id, const PacketDeserialiser &pd);
void connect_check(std::unique_lock<std::mutex> &l);
void connect_fail(std::unique_lock<std::mutex> &l, HRESULT hResultCode, const void *pvApplicationReplyData, DWORD dwApplicationReplyDataSize);
public: public:
DirectPlay8Peer(std::atomic<unsigned int> *global_refcount); DirectPlay8Peer(std::atomic<unsigned int> *global_refcount);

View File

@ -7,6 +7,9 @@
class EventObject class EventObject
{ {
private: private:
/* No copy c'tor. */
EventObject(const EventObject&) = delete;
HANDLE handle; HANDLE handle;
public: public:

View File

@ -26,4 +26,40 @@
* DWORD - Tick count from DPLITE_MSGID_HOST_ENUM_REQUEST * DWORD - Tick count from DPLITE_MSGID_HOST_ENUM_REQUEST
*/ */
#define DPLITE_MSGID_CONNECT_HOST 3
/* Initial Connect() request to host.
*
* GUID | NULL - Instance GUID
* GUID - Application GUID
* WSTRING | NULL - Password
* DATA | NULL - Request data
*/
#define DPLITE_MSGID_CONNECT_HOST_OK 4
/* Successful response to DPLITE_MSGID_CONNECT_HOST from host.
*
* GUID - Instance GUID
* DWORD - Player ID of current host
* DWORD - Player ID assigned to receiving client
* DWORD - Number of other peers (total - 2)
*
* For each peer:
* DWORD - Player ID
* DWORD - IPv4 address (network byte order)
* DWORD - Port (host byte order)
*
* DATA | NULL - Response data
*/
#define DPLITE_MSGID_CONNECT_HOST_FAIL 5
/* Negative response to DPLITE_MSGID_CONNECT_HOST from host.
* Host will close the connection after sending this.
*
* DWORD - Error code (DPNERR_HOSTREJECTEDCONNECTION, DPNERR_INVALIDAPPLICATION, etc)
* DATA | NULL - Response data
*/
#endif /* !DPLITE_MESSAGES_HPP */ #endif /* !DPLITE_MESSAGES_HPP */

View File

@ -4,27 +4,34 @@
#include "SendQueue.hpp" #include "SendQueue.hpp"
void SendQueue::send(SendPriority priority, Buffer *buffer) void SendQueue::send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback)
{ {
std::pair<const void*, size_t> data = ps.raw_packet();
SendOp *op = new SendOp(
data.first, data.second,
(const struct sockaddr*)(dest_addr), (dest_addr != NULL ? sizeof(*dest_addr) : 0),
callback);
switch(priority) switch(priority)
{ {
case SEND_PRI_LOW: case SEND_PRI_LOW:
low_queue.push_back(buffer); low_queue.push_back(op);
break; break;
case SEND_PRI_MEDIUM: case SEND_PRI_MEDIUM:
medium_queue.push_back(buffer); medium_queue.push_back(op);
break; break;
case SEND_PRI_HIGH: case SEND_PRI_HIGH:
high_queue.push_back(buffer); high_queue.push_back(op);
break; break;
} }
SetEvent(signal_on_queue); SetEvent(signal_on_queue);
} }
SendQueue::Buffer *SendQueue::get_next() SendQueue::SendOp *SendQueue::get_pending()
{ {
if(current != NULL) if(current != NULL)
{ {
@ -50,33 +57,48 @@ SendQueue::Buffer *SendQueue::get_next()
return current; return current;
} }
void SendQueue::complete(SendQueue::Buffer *buffer, HRESULT result) void SendQueue::pop_pending(SendQueue::SendOp *op)
{ {
assert(buffer == current); assert(op == current);
current = NULL; current = NULL;
buffer->complete(result);
delete buffer;
} }
SendQueue::Buffer::Buffer(const void *data, size_t data_size, const struct sockaddr *dest_addr, int dest_addr_len): SendQueue::SendOp::SendOp(const void *data, size_t data_size,
data((const unsigned char*)(data), (const unsigned char*)(data) + data_size) const struct sockaddr *dest_addr, size_t dest_addr_size,
const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback):
data((const unsigned char*)(data), (const unsigned char*)(data) + data_size),
sent_data(0),
callback(callback)
{ {
assert((size_t)(dest_addr_len) <= sizeof(this->dest_addr)); assert((size_t)(dest_addr_size) <= sizeof(this->dest_addr));
memcpy(&(this->dest_addr), dest_addr, dest_addr_len); memcpy(&(this->dest_addr), dest_addr, dest_addr_size);
this->dest_addr_len = dest_addr_len; this->dest_addr_size = dest_addr_size;
} }
SendQueue::Buffer::~Buffer() {} std::pair<const void*, size_t> SendQueue::SendOp::get_data() const
std::pair<const void*, size_t> SendQueue::Buffer::get_data()
{ {
return std::make_pair<const void*, size_t>(data.data(), data.size()); return std::make_pair<const void*, size_t>(data.data(), data.size());
} }
std::pair<const struct sockaddr*, int> SendQueue::Buffer::get_dest_addr() std::pair<const struct sockaddr*, size_t> SendQueue::SendOp::get_dest_addr() const
{ {
return std::make_pair((struct sockaddr*)(&dest_addr), (int)(dest_addr_len)); return std::make_pair((const struct sockaddr*)(&dest_addr), dest_addr_size);
}
void SendQueue::SendOp::inc_sent_data(size_t sent)
{
sent_data += sent;
assert(sent_data <= data.size());
}
std::pair<const void*, size_t> SendQueue::SendOp::get_pending_data() const
{
return std::make_pair<const void*, size_t>(data.data() + sent_data, data.size() - sent_data);
}
void SendQueue::SendOp::invoke_callback(std::unique_lock<std::mutex> &l, HRESULT result) const
{
callback(l, result);
} }

View File

@ -3,13 +3,17 @@
#include <winsock2.h> #include <winsock2.h>
#include <functional>
#include <list> #include <list>
#include <mutex>
#include <set> #include <set>
#include <stdlib.h> #include <stdlib.h>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <windows.h> #include <windows.h>
#include "packet.hpp"
class SendQueue class SendQueue
{ {
public: public:
@ -19,32 +23,38 @@ class SendQueue
SEND_PRI_HIGH = 4, SEND_PRI_HIGH = 4,
}; };
class Buffer { class SendOp
{
private: private:
std::vector<unsigned char> data; std::vector<unsigned char> data;
size_t sent_data;
struct sockaddr_storage dest_addr; struct sockaddr_storage dest_addr;
int dest_addr_len; size_t dest_addr_size;
protected: std::function<void(std::unique_lock<std::mutex>&, HRESULT)> callback;
Buffer(const void *data, size_t data_size, const struct sockaddr *dest_addr = NULL, int dest_addr_len = 0);
public: public:
virtual ~Buffer(); SendOp(
const void *data, size_t data_size,
const struct sockaddr *dest_addr, size_t dest_addr_size,
const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback);
std::pair<const void*, size_t> get_data(); std::pair<const void*, size_t> get_data() const;
std::pair<const struct sockaddr*, size_t> get_dest_addr() const;
std::pair<const struct sockaddr*, int> get_dest_addr(); void inc_sent_data(size_t sent);
std::pair<const void*, size_t> get_pending_data() const;
virtual void complete(HRESULT result) = 0; void invoke_callback(std::unique_lock<std::mutex> &l, HRESULT result) const;
}; };
private: private:
std::list<Buffer*> low_queue; std::list<SendOp*> low_queue;
std::list<Buffer*> medium_queue; std::list<SendOp*> medium_queue;
std::list<Buffer*> high_queue; std::list<SendOp*> high_queue;
Buffer *current; SendOp *current;
HANDLE signal_on_queue; HANDLE signal_on_queue;
@ -54,10 +64,10 @@ class SendQueue
/* No copy c'tor. */ /* No copy c'tor. */
SendQueue(const SendQueue &src) = delete; SendQueue(const SendQueue &src) = delete;
void send(SendPriority priority, Buffer *buffer); void send(SendPriority priority, const PacketSerialiser &ps, const struct sockaddr_in *dest_addr, const std::function<void(std::unique_lock<std::mutex>&, HRESULT)> &callback);
Buffer *get_next(); SendOp *get_pending();
void complete(Buffer *buffer, HRESULT result); void pop_pending(SendOp *op);
}; };
#endif /* !DPLITE_SENDQUEUE_HPP */ #endif /* !DPLITE_SENDQUEUE_HPP */

View File

@ -55,6 +55,13 @@ int create_listener_socket(uint32_t ipaddr, uint16_t port)
return -1; return -1;
} }
BOOL reuse = TRUE;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)(&reuse), sizeof(BOOL)) == -1)
{
closesocket(sock);
return -1;
}
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipaddr; addr.sin_addr.s_addr = ipaddr;
@ -75,6 +82,42 @@ int create_listener_socket(uint32_t ipaddr, uint16_t port)
return sock; return sock;
} }
int create_client_socket(uint32_t local_ipaddr, uint16_t local_port)
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1)
{
return -1;
}
u_long non_blocking = 1;
if(ioctlsocket(sock, FIONBIO, &non_blocking) != 0)
{
closesocket(sock);
return -1;
}
BOOL reuse = TRUE;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)(&reuse), sizeof(BOOL)) == -1)
{
closesocket(sock);
return -1;
}
struct sockaddr_in l_addr;
l_addr.sin_family = AF_INET;
l_addr.sin_addr.s_addr = local_ipaddr;
l_addr.sin_port = htons(local_port);
if(bind(sock, (struct sockaddr*)(&l_addr), sizeof(l_addr)) == -1)
{
closesocket(sock);
return -1;
}
return sock;
}
int create_discovery_socket() int create_discovery_socket()
{ {
int sock = socket(AF_INET, SOCK_DGRAM, 0); int sock = socket(AF_INET, SOCK_DGRAM, 0);

View File

@ -10,6 +10,7 @@
int create_udp_socket(uint32_t ipaddr, uint16_t port); int create_udp_socket(uint32_t ipaddr, uint16_t port);
int create_listener_socket(uint32_t ipaddr, uint16_t port); int create_listener_socket(uint32_t ipaddr, uint16_t port);
int create_client_socket(uint32_t local_ipaddr, uint16_t local_port);
int create_discovery_socket(); int create_discovery_socket();
#endif /* !DPLITE_NETWORK_HPP */ #endif /* !DPLITE_NETWORK_HPP */

View File

@ -24,7 +24,7 @@ PacketSerialiser::PacketSerialiser(uint32_t type)
sbuf.insert(sbuf.begin(), (unsigned char*)(&header), (unsigned char*)(&header + 1)); sbuf.insert(sbuf.begin(), (unsigned char*)(&header), (unsigned char*)(&header + 1));
} }
std::pair<const void*, size_t> PacketSerialiser::raw_packet() std::pair<const void*, size_t> PacketSerialiser::raw_packet() const
{ {
return std::make_pair<const void*, size_t>(sbuf.data(), sbuf.size()); return std::make_pair<const void*, size_t>(sbuf.data(), sbuf.size());
} }

View File

@ -24,7 +24,7 @@ class PacketSerialiser
public: public:
PacketSerialiser(uint32_t type); PacketSerialiser(uint32_t type);
std::pair<const void*, size_t> raw_packet(); std::pair<const void*, size_t> raw_packet() const;
void append_null(); void append_null();
void append_dword(DWORD value); void append_dword(DWORD value);

View File

@ -7,7 +7,7 @@
#include "../src/DirectPlay8Address.hpp" #include "../src/DirectPlay8Address.hpp"
#include "../src/DirectPlay8Peer.hpp" #include "../src/DirectPlay8Peer.hpp"
#define INSTANTIATE_FROM_COM // #define INSTANTIATE_FROM_COM
#define PORT 42895 #define PORT 42895
@ -1078,6 +1078,8 @@ TEST(DirectPlay8Peer, ConnectSync)
EXPECT_EQ(ic->pvPlayerContext, (void*)(NULL)); EXPECT_EQ(ic->pvPlayerContext, (void*)(NULL));
/* TODO: Check pAddressPlayer, pAddressDevice */ /* TODO: Check pAddressPlayer, pAddressDevice */
ic->pvPlayerContext = (void*)(0xB441);
} }
break; break;
@ -1091,7 +1093,7 @@ TEST(DirectPlay8Peer, ConnectSync)
p1_player_id = cp->dpnidPlayer; p1_player_id = cp->dpnidPlayer;
EXPECT_EQ(cp->dwSize, sizeof(DPNMSG_CREATE_PLAYER)); EXPECT_EQ(cp->dwSize, sizeof(DPNMSG_CREATE_PLAYER));
EXPECT_EQ(cp->pvPlayerContext, (void*)(0)); EXPECT_EQ(cp->pvPlayerContext, (void*)(0xB441));
cp->pvPlayerContext = (void*)(0xFEED); cp->pvPlayerContext = (void*)(0xFEED);
} }
@ -1349,6 +1351,8 @@ TEST(DirectPlay8Peer, ConnectAsync)
EXPECT_EQ(ic->pvPlayerContext, (void*)(NULL)); EXPECT_EQ(ic->pvPlayerContext, (void*)(NULL));
/* TODO: Check pAddressPlayer, pAddressDevice */ /* TODO: Check pAddressPlayer, pAddressDevice */
ic->pvPlayerContext = (void*)(0xB441);
} }
break; break;
@ -1362,7 +1366,7 @@ TEST(DirectPlay8Peer, ConnectAsync)
p1_player_id = cp->dpnidPlayer; p1_player_id = cp->dpnidPlayer;
EXPECT_EQ(cp->dwSize, sizeof(DPNMSG_CREATE_PLAYER)); EXPECT_EQ(cp->dwSize, sizeof(DPNMSG_CREATE_PLAYER));
EXPECT_EQ(cp->pvPlayerContext, (void*)(0)); EXPECT_EQ(cp->pvPlayerContext, (void*)(0xB441));
cp->pvPlayerContext = (void*)(0xFEED); cp->pvPlayerContext = (void*)(0xFEED);
} }