From 2a0c8c28c9f84f759fc33a88506b11842f408798 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 10 May 2017 18:01:56 +0000 Subject: [PATCH] improvements to run-benchmark.py, primarily to actually make multiple requests in parallel --- benchmarks/run-benchmarks.py | 195 ++++++++++++++++++++++++----------- setup.py | 4 +- warcprox/main.py | 21 +++- warcprox/mitmproxy.py | 5 +- 4 files changed, 156 insertions(+), 69 deletions(-) diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index 114081a..7e2889c 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -41,14 +41,16 @@ import threading # https://medium.com/@generativist/a-simple-streaming-http-server-in-aiohttp-4233dbc173c7 async def do_get(request): - # return aiohttp.web.Response(text='foo=%s' % request.match_info.get('foo')) n = int(request.match_info.get('n')) response = aiohttp.web.StreamResponse( - status=200, reason='OK', headers={'Content-Type': 'text/plain'}) + status=200, reason='OK', headers={ + 'Content-Type': 'text/plain', 'Content-Length': str(n)}) await response.prepare(request) - for i in range(n): - for i in range(10): - response.write(b'x' * 99 + b'\n') + for i in range(n // 80): + response.write(b'x' * 79 + b'\n') + await response.drain() + if n % 80 > 0: + response.write(b'x' * (n % 80 - 1) + b'\n') await response.drain() return response @@ -98,27 +100,69 @@ def start_servers(): app.make_handler(access_log=None), '127.0.0.1', 4443, ssl=sslc) loop.run_until_complete(https) -async def benchmarking_client(base_url, n, proxy=None): +async def fetch(session, url, proxy=None): + # logging.info('sending request to %s', url) + n_bytes = 0 + async with session.get(url, proxy=proxy) as response: + assert response.status == 200 + while True: + chunk = await response.content.read(2**16) + n_bytes += len(chunk) + if not chunk: + break + # logging.info('finished receiving response from %s', url) + return n_bytes + +async def benchmarking_client(base_url, duration, proxy=None): + start = time.time() + connector = aiohttp.TCPConnector(verify_ssl=False) + n = 1000 n_urls = 0 n_bytes = 0 - for i in range(n): - url = '%s/%s' % (base_url, i) - connector = aiohttp.TCPConnector(verify_ssl=False) - async with aiohttp.ClientSession(connector=connector) as session: - async with session.get(url, proxy=proxy) as response: - assert response.status == 200 - while True: - chunk = await response.content.read(2**16) - n_bytes += len(chunk) - if not chunk: - n_urls += 1 - break - return n_urls, n_bytes + outstanding_requests = set() + async with aiohttp.ClientSession(connector=connector) as session: + while True: + if (time.time() - start < duration + and len(outstanding_requests) < 100): + url = '%s/%s' % (base_url, n) + n += 1000 + # task = asyncio.get_event_loop().create_task(fetch(session, url)) + future = asyncio.ensure_future(fetch(session, url, proxy)) + outstanding_requests.add(future) + # logging.info('scheduled future fetch of %s', url) + else: + done, pending = await asyncio.wait( + outstanding_requests, + return_when=asyncio.FIRST_COMPLETED) + for future in done: + outstanding_requests.remove(future) + n_urls += 1 + n_bytes += future.result() + if time.time() - start >= duration and not pending: + return n_urls, n_bytes, time.time() - start def build_arg_parser(tmpdir, prog=os.path.basename(sys.argv[0])): + desc = ''' +Warcprox benchmarker. Runs simple http and https servers and uses them to +benchmark warcprox. Runs 4 benchmarks: + + 1. baseline http (no warcprox) + 2. baseline https (no warcprox) + 3. http with warcprox + 4. https with warcprox + +Each of these runs for a predetermined amount of time, which is 1/4 of the time +specified by the --time option. + +Uses a temporary directory for warcs and other files. Otherwise, most warcprox +options can be specified on the command line. Useful for comparing performance +with different options. + +Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. +''' arg_parser = argparse.ArgumentParser( - prog=prog, description='warcprox benchmarker', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + prog=prog, description=desc, + formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter) arg_parser.add_argument( '-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') @@ -178,23 +222,41 @@ def build_arg_parser(tmpdir, prog=os.path.basename(sys.argv[0])): '--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', default=None, help='kafka capture feed topic') arg_parser.add_argument( - '--queue-size', dest='queue_size', type=int, default=1) + '--queue-size', dest='queue_size', type=int, default=1, help=( + 'max size of the queue of urls waiting to be processed by ' + 'the warc writer thread')) arg_parser.add_argument( - '--max-threads', dest='max_threads', type=int) + '--max-threads', dest='max_threads', type=int, help=( + 'number of proxy server threads (if not specified, chosen based ' + 'on system resource limits')) arg_parser.add_argument( '--version', action='version', version='warcprox %s' % warcprox.__version__) arg_parser.add_argument( - '-v', '--verbose', dest='verbose', action='store_true') - arg_parser.add_argument('--trace', dest='trace', action='store_true') - arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') + '-v', '--verbose', dest='verbose', action='store_true', + help='verbose logging') + arg_parser.add_argument( + '--trace', dest='trace', action='store_true', + help='trace-level logging') + arg_parser.add_argument( + '--profile', dest='profile', action='store_true', default=False, + help='profile the warc writer thread') + arg_parser.add_argument( + '--time', dest='time', type=float, default=20.0, help=( + 'time to spend running benchmarks; total allotment will be ' + 'divided among the 4 benchmark cases')) + arg_parser.add_argument( + '--skip-baseline', dest='skip_baseline', action='store_true', + help='skip the baseline bechmarks') return arg_parser if __name__ == '__main__': # see https://github.com/pyca/cryptography/issues/2911 cryptography.hazmat.backends.openssl.backend.activate_builtin_random() - with tempfile.TemporaryDirectory() as tmpdir: + # with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = tempfile.mkdtemp() + if True: arg_parser = build_arg_parser(tmpdir) args = arg_parser.parse_args(args=sys.argv[1:]) @@ -202,17 +264,18 @@ if __name__ == '__main__': loglevel = warcprox.TRACE elif args.verbose: loglevel = logging.DEBUG - elif args.quiet: - loglevel = logging.WARNING else: loglevel = logging.INFO logging.basicConfig( stream=sys.stdout, level=loglevel, format=( '%(asctime)s %(process)d %(levelname)s %(threadName)s ' - '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) ' + '%(message)s')) logging.getLogger('warcprox').setLevel(loglevel + 5) + logging.info('using temp dir %s', tmpdir) + args.playback_port = None args.address = '127.0.0.1' args.port = 0 @@ -222,10 +285,6 @@ if __name__ == '__main__': if args.rethinkdb_servers: args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % ( datetime.datetime.utcnow()) - warcprox_controller = warcprox.main.init_controller(args) - warcprox_controller_thread = threading.Thread( - target=warcprox_controller.run_until_shutdown) - warcprox_controller_thread.start() start_servers() logging.info( @@ -234,44 +293,54 @@ if __name__ == '__main__': loop = asyncio.get_event_loop() - logging.info("===== baseline benchmark starting (no proxy) =====") - start = time.time() - n_urls, n_bytes = loop.run_until_complete( - benchmarking_client('http://127.0.0.1:4080', 100)) - finish = time.time() - logging.info( - 'http baseline (no proxy): n_urls=%s n_bytes=%s in %.1f sec', - n_urls, n_bytes, finish - start) + logging.info('===== baseline benchmark starting (no proxy) =====') + if not args.skip_baseline: + n_urls, n_bytes, elapsed = loop.run_until_complete( + benchmarking_client( + 'http://127.0.0.1:4080', args.time / 4.0)) + logging.info( + 'http baseline (no proxy): n_urls=%s n_bytes=%s in %.1f ' + 'sec', n_urls, n_bytes, elapsed) - start = time.time() - n_urls, n_bytes = loop.run_until_complete( - benchmarking_client('https://127.0.0.1:4443', 100)) - finish = time.time() - logging.info( - 'https baseline (no proxy): n_urls=%s n_bytes=%s in %.1f sec', - n_urls, n_bytes, finish - start) - logging.info("===== baseline benchmark finished =====") + n_urls, n_bytes, elapsed = loop.run_until_complete( + benchmarking_client( + 'https://127.0.0.1:4443', args.time / 4.0)) + logging.info( + 'https baseline (no proxy): n_urls=%s n_bytes=%s in %.1f ' + 'sec', n_urls, n_bytes, elapsed) + else: + logging.info('SKIPPED') + logging.info('===== baseline benchmark finished =====') - proxy = "http://%s:%s" % ( + if args.skip_baseline: + t = args.time / 2.0 + else: + t = args.time / 4.0 + + warcprox_controller = warcprox.main.init_controller(args) + warcprox_controller_thread = threading.Thread( + target=warcprox_controller.run_until_shutdown) + warcprox_controller_thread.start() + + proxy = 'http://%s:%s' % ( warcprox_controller.proxy.server_address[0], warcprox_controller.proxy.server_address[1]) - logging.info("===== warcprox benchmark starting =====") - start = time.time() - n_urls, n_bytes = loop.run_until_complete( - benchmarking_client('http://127.0.0.1:4080', 100, proxy)) - finish = time.time() + logging.info('===== warcprox benchmark starting =====') + n_urls, n_bytes, elapsed = loop.run_until_complete( + benchmarking_client('http://127.0.0.1:4080', t, proxy)) logging.info( 'http: n_urls=%s n_bytes=%s in %.1f sec', - n_urls, n_bytes, finish - start) + n_urls, n_bytes, elapsed) - start = time.time() - n_urls, n_bytes = loop.run_until_complete( - benchmarking_client('https://127.0.0.1:4443', 100, proxy)) - finish = time.time() + n_urls, n_bytes, elapsed = loop.run_until_complete( + benchmarking_client('https://127.0.0.1:4443', t, proxy)) logging.info( 'https: n_urls=%s n_bytes=%s in %.1f sec', - n_urls, n_bytes, finish - start) - logging.info("===== warcprox benchmark finished =====") + n_urls, n_bytes, elapsed) + start = time.time() warcprox_controller.stop.set() warcprox_controller_thread.join() + logging.info( + 'waited %.1f sec for warcprox to finish', time.time() - start) + logging.info('===== warcprox benchmark finished =====') diff --git a/setup.py b/setup.py index 7018bfe..98f3d65 100755 --- a/setup.py +++ b/setup.py @@ -37,7 +37,7 @@ class PyTest(setuptools.command.test.test): sys.exit(errno) deps = [ - 'certauth>=1.1.0', + 'certauth>=1.1.6', 'warctools', 'kafka-python>=1.0.1', 'urlcanon>=0.1.dev16', @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev78', + version='2.1b1.dev79', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/main.py b/warcprox/main.py index 59de37e..166ce0c 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -43,10 +43,27 @@ import re import doublethink import cryptography.hazmat.backends.openssl +class BetterArgumentDefaultsHelpFormatter( + argparse.ArgumentDefaultsHelpFormatter, + argparse.RawDescriptionHelpFormatter): + ''' + HelpFormatter with these properties: + + - formats option help like argparse.ArgumentDefaultsHelpFormatter except + that it - omits the default value for arguments with action='store_const' + - like argparse.RawDescriptionHelpFormatter, does not reformat description + string + ''' + def _get_help_string(self, action): + if isinstance(action, argparse._StoreConstAction): + return action.help + else: + return super()._get_help_string(action) + def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser = argparse.ArgumentParser(prog=prog, description='warcprox - WARC writing MITM HTTP/S proxy', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + formatter_class=BetterArgumentDefaultsHelpFormatter) arg_parser.add_argument('-p', '--port', dest='port', default='8000', type=int, help='port to listen on') arg_parser.add_argument('-b', '--address', dest='address', @@ -272,7 +289,7 @@ def ensure_rethinkdb_tables(): ''' arg_parser = argparse.ArgumentParser( prog=os.path.basename(sys.argv[0]), - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + formatter_class=BetterArgumentDefaultsHelpFormatter) arg_parser.add_argument( '--rethinkdb-servers', dest='rethinkdb_servers', default='localhost', help='rethinkdb servers e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 09c2b2d..c768b8a 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -260,9 +260,10 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return self._remote_server_sock def _transition_to_ssl(self): + certfile = self.server.ca.cert_for_host(self.hostname) self.request = self.connection = ssl.wrap_socket( - self.connection, server_side=True, - certfile=self.server.ca.cert_for_host(self.hostname)) + self.connection, server_side=True, certfile=certfile) + # logging.info('self.hostname=%s certfile=%s', self.hostname, certfile) def do_CONNECT(self): '''