From 5c44ce26e00a8ce23babaec471b7f97729c4122a Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 31 Oct 2018 23:31:25 +0000 Subject: [PATCH] Send messages and track statistics in soak test. --- tests/soak-peer-client.cpp | 224 +++++++++++++++++++++++++++++++------ tests/soak-peer-server.cpp | 20 +++- 2 files changed, 205 insertions(+), 39 deletions(-) diff --git a/tests/soak-peer-client.cpp b/tests/soak-peer-client.cpp index cd7def0..da87529 100644 --- a/tests/soak-peer-client.cpp +++ b/tests/soak-peer-client.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include #include #include @@ -8,25 +10,44 @@ #include #include #include +#include +#include #include /* Durations specified in milliseconds. */ -#define TEST_DURATION (60 * 60 * 1000) -#define MEMORY_STATS_INTERVAL (60 * 1000) +#define TEST_DURATION (8 * 60 * 60 * 1000) +#define STATS_INTERVAL (30 * 1000) + +static const int MESSAGE_SIZES[] = { 17, 33, 257, 1025, 4097, 8193, 16385, 70000 }; static const GUID APP_GUID = { 0x8723c2c6, 0x0b89, 0x4ea0, { 0xad, 0xe8, 0xec, 0x53, 0x66, 0x51, 0x68, 0x9f } }; +struct message_header { + int64_t timestamp_us; + int64_t msg_size_idx; +}; + static int64_t pc_freq; static int64_t now_ms(); +static int64_t now_us(); static int64_t start_time; static int64_t usage_time; static IDirectPlay8Peer *instance; static bool disconnected; +static int inflight_messages; + +static std::atomic msg_num_complete; +static std::atomic msg_total_send_us; +static std::atomic msg_total_rtt_us; +static std::atomic msg_total_recv_bytes; +static std::atomic msg_stats_start; static HRESULT CALLBACK callback(PVOID pvUserContext, DWORD dwMessageType, PVOID pMessage); -static void print_usage(); -static void timed_printf(const char *fmt, ...); +static void print_stats(); +static void timed_fprintf(FILE *fh, const char *fmt, ...); + +#define timed_printf(...) timed_fprintf(stdout, __VA_ARGS__) int main(int argc, char **argv) { @@ -39,7 +60,7 @@ int main(int argc, char **argv) HRESULT res = CoInitialize(NULL); if(res != S_OK) { - fprintf(stderr, "CoInitialize failed with HRESULT %08x\n", (unsigned)(res)); + timed_fprintf(stderr, "CoInitialize failed with HRESULT %08x", (unsigned)(res)); return 1; } @@ -51,11 +72,12 @@ int main(int argc, char **argv) int64_t reconstruct_interval = (120 * 1000); int64_t reinitialise_interval = (30 * 1000); + inflight_messages = 1; start_time = now_ms(); int64_t end_time = start_time + TEST_DURATION; - print_usage(); + msg_num_complete = 0; while(now_ms() < end_time) { @@ -64,12 +86,10 @@ int main(int argc, char **argv) res = CoCreateInstance(CLSID_DirectPlay8Peer, NULL, CLSCTX_INPROC_SERVER, IID_IDirectPlay8Peer, (void**)(&instance)); if(res != S_OK) { - fprintf(stderr, "Failed to construct DirectPlay8Peer instance (HRESULT %08x)\n", (unsigned)(res)); + timed_fprintf(stderr, "Failed to construct DirectPlay8Peer instance (HRESULT %08x)", (unsigned)(res)); return 1; } - print_usage(); - int64_t construct_time = now_ms(); int64_t destruct_time = construct_time + reconstruct_interval; reconstruct_interval *= 2; @@ -79,6 +99,10 @@ int main(int argc, char **argv) timed_printf("Initialising DirectPlay8Peer instance..."); disconnected = false; + msg_num_complete = 0; + msg_total_send_us = 0; + msg_total_rtt_us = 0; + msg_total_recv_bytes = 0; int64_t initialise_time = now_ms(); int64_t close_time = now_ms() + reinitialise_interval; @@ -87,12 +111,10 @@ int main(int argc, char **argv) res = instance->Initialize(NULL, &callback, 0); if(res != S_OK) { - fprintf(stderr, "IDirectPlay8Peer::Initialize failed with HRESULT %08x\n", (unsigned)(res)); + timed_fprintf(stderr, "IDirectPlay8Peer::Initialize failed with HRESULT %08x", (unsigned)(res)); return 1; } - print_usage(); - bool connected = false; while(!disconnected && now_ms() < close_time && now_ms() < destruct_time && now_ms() < end_time) @@ -111,14 +133,14 @@ int main(int argc, char **argv) res = CoCreateInstance(CLSID_DirectPlay8Address, NULL, CLSCTX_INPROC_SERVER, IID_IDirectPlay8Address, (void**)(&enum_address)); if(res != S_OK) { - fprintf(stderr, "Failed to construct DirectPlay8Address instance (HRESULT %08x)\n", (unsigned)(res)); + timed_fprintf(stderr, "Failed to construct DirectPlay8Address instance (HRESULT %08x)", (unsigned)(res)); return 1; } res = enum_address->SetSP(&CLSID_DP8SP_TCPIP); if(res != S_OK) { - fprintf(stderr, "IDirectPlay8Address::SetSP failed with HRESULT %08x\n", (unsigned)(res)); + timed_fprintf(stderr, "IDirectPlay8Address::SetSP failed with HRESULT %08x", (unsigned)(res)); return 1; } @@ -127,7 +149,7 @@ int main(int argc, char **argv) res = instance->EnumHosts(&app_desc, NULL, enum_address, NULL, 0, 0, 0, 0, &connect_address, NULL, DPNENUMHOSTS_SYNC); if(res != S_OK) { - fprintf(stderr, "IDirectPlay8Peer::EnumHosts failed with HRESULT %08x\n", (unsigned)(res)); + timed_fprintf(stderr, "IDirectPlay8Peer::EnumHosts failed with HRESULT %08x", (unsigned)(res)); return 1; } @@ -148,17 +170,18 @@ int main(int argc, char **argv) NULL, /* pdnCredentials */ NULL, /* pvUserConnectData */ 0, /* dwUserConnectDataSize */ - NULL, /* pvPlayerContext */ + (void*)(0xAAAA), /* pvPlayerContext */ NULL, /* pvAsyncContext */ NULL, /* phAsyncHandle */ DPNCONNECT_SYNC); /* dwFlags */ if(res != S_OK) { - fprintf(stderr, "IDirectPlay8Peer::Connect failed with HRESULT %08x\n", (unsigned)(res)); - return 1; + timed_fprintf(stderr, "IDirectPlay8Peer::Connect failed with HRESULT %08x", (unsigned)(res)); + continue; } connected = true; + msg_stats_start = now_ms(); connect_address->Release(); } @@ -173,7 +196,7 @@ int main(int argc, char **argv) if(now_ms() >= usage_time) { - print_usage(); + print_stats(); } } @@ -185,13 +208,11 @@ int main(int argc, char **argv) res = instance->Close(hard_close ? DPNCLOSE_IMMEDIATE : 0); if(res != S_OK) { - fprintf(stderr, "IDirectPlay8Peer::Close() failed with HRESULT %08x\n", (unsigned int)(res)); + timed_fprintf(stderr, "IDirectPlay8Peer::Close() failed with HRESULT %08x", (unsigned int)(res)); return 1; } hard_close = !hard_close; - - print_usage(); } timed_printf("Destroying DirectPlay8Peer instance..."); @@ -199,7 +220,7 @@ int main(int argc, char **argv) instance->Release(); instance = NULL; - print_usage(); + inflight_messages *= 2; } CoUninitialize(); @@ -208,15 +229,27 @@ int main(int argc, char **argv) } static int64_t now_ms() +{ + return now_us() / 1000; +} + +static int64_t now_us() { LARGE_INTEGER ticks; QueryPerformanceCounter(&ticks); - return ticks.QuadPart / (pc_freq / 1000); + return ticks.QuadPart / (pc_freq / 1000000); } static HRESULT CALLBACK callback(PVOID pvUserContext, DWORD dwMessageType, PVOID pMessage) { + /* Pool of int64_t values used to pass the start time of any asyncronous SendTo() operation + * back into the callback to time how long it took to complete. + */ + static const int SEND_BEGIN_SIZE = 1024; + static int64_t send_begin_buf[SEND_BEGIN_SIZE]; + static std::atomic send_begin_idx; + switch(dwMessageType) { case DPN_MSGID_ENUM_HOSTS_RESPONSE: @@ -241,8 +274,41 @@ static HRESULT CALLBACK callback(PVOID pvUserContext, DWORD dwMessageType, PVOID case DPN_MSGID_CREATE_PLAYER: { DPNMSG_CREATE_PLAYER *cp = (DPNMSG_CREATE_PLAYER*)(pMessage); + + if(cp->pvPlayerContext == (void*)(0xAAAA)) + { + /* Ignore our own player. */ + break; + } + timed_printf("New player ID: %u", (unsigned)(cp->dpnidPlayer)); + for(int i = 0; i < inflight_messages; ++i) + { + int this_msg_size_idx = 0; + std::vector buf(MESSAGE_SIZES[this_msg_size_idx]); + + assert(buf.size() >= sizeof(struct message_header)); + struct message_header *header = (struct message_header*)(buf.data()); + + int64_t now = now_us(); + + header->timestamp_us = now; + header->msg_size_idx = this_msg_size_idx; + + DPN_BUFFER_DESC bd = { buf.size(), (BYTE*)(buf.data()) }; + + int64_t *now_p = &(send_begin_buf[++send_begin_idx % SEND_BEGIN_SIZE]); + *now_p = now; + + DPNHANDLE s_handle; + HRESULT res = instance->SendTo(cp->dpnidPlayer, &bd, 1, 0, now_p, &s_handle, 0); + if(res != DPNSUCCESS_PENDING) + { + timed_fprintf(stderr, "IDirectPlay8Peer::SendTo() failed with HRESULT %08x", (unsigned int)(res)); + } + } + break; } @@ -254,6 +320,67 @@ static HRESULT CALLBACK callback(PVOID pvUserContext, DWORD dwMessageType, PVOID break; } + case DPN_MSGID_SEND_COMPLETE: + { + DPNMSG_SEND_COMPLETE *sc = (DPNMSG_SEND_COMPLETE*)(pMessage); + + int64_t now = now_us(); + int64_t then = *(int64_t*)(sc->pvUserContext); + + msg_total_send_us += now - then; + + break; + } + + case DPN_MSGID_RECEIVE: + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + int64_t now = now_us(); + + assert(r->dwReceiveDataSize >= sizeof(struct message_header)); + struct message_header *r_header = (struct message_header*)(r->pReceiveData); + + int64_t rtt = now - r_header->timestamp_us; + assert(rtt >= 0); + + msg_total_rtt_us += rtt; + msg_total_recv_bytes += r->dwReceiveDataSize; + ++msg_num_complete; + + { + int this_msg_size_idx = r_header->msg_size_idx + 1; + if(this_msg_size_idx >= sizeof(MESSAGE_SIZES) / sizeof(*MESSAGE_SIZES)) + { + this_msg_size_idx = 0; + } + + std::vector buf(MESSAGE_SIZES[this_msg_size_idx]); + + assert(buf.size() >= sizeof(struct message_header)); + struct message_header *header = (struct message_header*)(buf.data()); + + int64_t now = now_us(); + + header->timestamp_us = now; + header->msg_size_idx = this_msg_size_idx; + + DPN_BUFFER_DESC bd = { buf.size(), (BYTE*)(buf.data()) }; + + int64_t *now_p = &(send_begin_buf[++send_begin_idx % SEND_BEGIN_SIZE]); + *now_p = now; + + DPNHANDLE s_handle; + HRESULT res = instance->SendTo(r->dpnidSender, &bd, 1, 0, now_p, &s_handle, 0); + if(res != DPNSUCCESS_PENDING) + { + timed_fprintf(stderr, "IDirectPlay8Peer::SendTo() failed with HRESULT %08x", (unsigned int)(res)); + } + } + + break; + } + default: { break; @@ -263,40 +390,63 @@ static HRESULT CALLBACK callback(PVOID pvUserContext, DWORD dwMessageType, PVOID return S_OK; } -static void print_usage() +static void print_stats() { - static SIZE_T peak_usage = 0; - PROCESS_MEMORY_COUNTERS_EX mc; mc.cb = sizeof(mc); GetProcessMemoryInfo(GetCurrentProcess(), (PROCESS_MEMORY_COUNTERS*)(&mc), sizeof(mc)); - if(peak_usage < mc.PrivateUsage) + timed_printf("memory usage = %u KiB", (unsigned)(mc.PrivateUsage / 1024)); + + /* Take copies of the msg_XXX variables so another thread can't change them under us. + * Slight inaccuracies from the variables going out of sync as they are copied is fine, but + * we don't want any division by zero errors. + */ + + uint64_t l_msg_num_complete = msg_num_complete; + uint64_t l_msg_total_send_us = msg_total_send_us; + uint64_t l_msg_total_rtt_us = msg_total_rtt_us; + uint64_t l_msg_total_recv_bytes = msg_total_recv_bytes; + int64_t l_msg_stats_start = msg_stats_start; + + int stats_period_s = (now_ms() - l_msg_stats_start) / 1000; + + if(l_msg_num_complete > 0 && stats_period_s > 0) { - peak_usage = mc.PrivateUsage; + /* Reset the stats so they don't smooth out to be almost static over a long run. */ + + msg_num_complete = 0; + msg_total_send_us = 0; + msg_total_rtt_us = 0; + msg_total_recv_bytes = 0; + msg_stats_start = now_ms(); + + unsigned send_avg = l_msg_total_send_us / l_msg_num_complete; + unsigned rtt_avg = l_msg_total_rtt_us / l_msg_num_complete; + unsigned bps = l_msg_total_recv_bytes / stats_period_s; + + timed_printf("concurrent = %d, msg/sec = %u, send us = %u, rtt us = %u, KiB/s = %u", + inflight_messages, (unsigned)(l_msg_num_complete / stats_period_s), send_avg, rtt_avg, (bps / 1024)); } - timed_printf("Current memory usage: %u bytes, peak usage: %u bytes", - (unsigned)(mc.PrivateUsage), (unsigned)(peak_usage)); - - usage_time = now_ms() + MEMORY_STATS_INTERVAL; + usage_time = now_ms() + STATS_INTERVAL; } -static void timed_printf(const char *fmt, ...) +static void timed_fprintf(FILE *fh, const char *fmt, ...) { static std::mutex lock; std::unique_lock l(lock); int64_t now = now_ms(); - printf("[T+%06u.%03us] ", + fprintf(fh, "[T+%06u.%03us] ", (unsigned)((now - start_time) / 1000), (unsigned)((now - start_time) % 1000)); va_list argv; va_start(argv, fmt); - vprintf(fmt, argv); + vfprintf(fh, fmt, argv); va_end(argv); - printf("\n"); + fprintf(fh, "\n"); } diff --git a/tests/soak-peer-server.cpp b/tests/soak-peer-server.cpp index d33f15b..07f4543 100644 --- a/tests/soak-peer-server.cpp +++ b/tests/soak-peer-server.cpp @@ -11,8 +11,8 @@ #include /* Durations specified in milliseconds. */ -#define TEST_DURATION (60 * 60 * 1000) -#define MEMORY_STATS_INTERVAL (60 * 1000) +#define TEST_DURATION (8 * 60 * 60 * 1000) +#define MEMORY_STATS_INTERVAL (30 * 1000) static const GUID APP_GUID = { 0x8723c2c6, 0x0b89, 0x4ea0, { 0xad, 0xe8, 0xec, 0x53, 0x66, 0x51, 0x68, 0x9f } }; @@ -199,6 +199,22 @@ static HRESULT CALLBACK callback(PVOID pvUserContext, DWORD dwMessageType, PVOID break; } + case DPN_MSGID_RECEIVE: + { + DPNMSG_RECEIVE *r = (DPNMSG_RECEIVE*)(pMessage); + + DPN_BUFFER_DESC bd = { r->dwReceiveDataSize, r->pReceiveData }; + + DPNHANDLE s_handle; + HRESULT res = instance->SendTo(r->dpnidSender, &bd, 1, 0, NULL, &s_handle, 0); + if(res != DPNSUCCESS_PENDING) + { + fprintf(stderr, "IDirectPlay8Peer::SendTo() failed with HRESULT %08x\n", (unsigned int)(res)); + } + + break; + } + default: { break;