mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
improvements to run-benchmark.py, primarily to actually make multiple requests in parallel
This commit is contained in:
parent
eea582c6db
commit
2a0c8c28c9
@ -41,14 +41,16 @@ import threading
|
|||||||
|
|
||||||
# https://medium.com/@generativist/a-simple-streaming-http-server-in-aiohttp-4233dbc173c7
|
# https://medium.com/@generativist/a-simple-streaming-http-server-in-aiohttp-4233dbc173c7
|
||||||
async def do_get(request):
|
async def do_get(request):
|
||||||
# return aiohttp.web.Response(text='foo=%s' % request.match_info.get('foo'))
|
|
||||||
n = int(request.match_info.get('n'))
|
n = int(request.match_info.get('n'))
|
||||||
response = aiohttp.web.StreamResponse(
|
response = aiohttp.web.StreamResponse(
|
||||||
status=200, reason='OK', headers={'Content-Type': 'text/plain'})
|
status=200, reason='OK', headers={
|
||||||
|
'Content-Type': 'text/plain', 'Content-Length': str(n)})
|
||||||
await response.prepare(request)
|
await response.prepare(request)
|
||||||
for i in range(n):
|
for i in range(n // 80):
|
||||||
for i in range(10):
|
response.write(b'x' * 79 + b'\n')
|
||||||
response.write(b'x' * 99 + b'\n')
|
await response.drain()
|
||||||
|
if n % 80 > 0:
|
||||||
|
response.write(b'x' * (n % 80 - 1) + b'\n')
|
||||||
await response.drain()
|
await response.drain()
|
||||||
|
|
||||||
return response
|
return response
|
||||||
@ -98,27 +100,69 @@ def start_servers():
|
|||||||
app.make_handler(access_log=None), '127.0.0.1', 4443, ssl=sslc)
|
app.make_handler(access_log=None), '127.0.0.1', 4443, ssl=sslc)
|
||||||
loop.run_until_complete(https)
|
loop.run_until_complete(https)
|
||||||
|
|
||||||
async def benchmarking_client(base_url, n, proxy=None):
|
async def fetch(session, url, proxy=None):
|
||||||
|
# logging.info('sending request to %s', url)
|
||||||
|
n_bytes = 0
|
||||||
|
async with session.get(url, proxy=proxy) as response:
|
||||||
|
assert response.status == 200
|
||||||
|
while True:
|
||||||
|
chunk = await response.content.read(2**16)
|
||||||
|
n_bytes += len(chunk)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
# logging.info('finished receiving response from %s', url)
|
||||||
|
return n_bytes
|
||||||
|
|
||||||
|
async def benchmarking_client(base_url, duration, proxy=None):
|
||||||
|
start = time.time()
|
||||||
|
connector = aiohttp.TCPConnector(verify_ssl=False)
|
||||||
|
n = 1000
|
||||||
n_urls = 0
|
n_urls = 0
|
||||||
n_bytes = 0
|
n_bytes = 0
|
||||||
for i in range(n):
|
outstanding_requests = set()
|
||||||
url = '%s/%s' % (base_url, i)
|
async with aiohttp.ClientSession(connector=connector) as session:
|
||||||
connector = aiohttp.TCPConnector(verify_ssl=False)
|
while True:
|
||||||
async with aiohttp.ClientSession(connector=connector) as session:
|
if (time.time() - start < duration
|
||||||
async with session.get(url, proxy=proxy) as response:
|
and len(outstanding_requests) < 100):
|
||||||
assert response.status == 200
|
url = '%s/%s' % (base_url, n)
|
||||||
while True:
|
n += 1000
|
||||||
chunk = await response.content.read(2**16)
|
# task = asyncio.get_event_loop().create_task(fetch(session, url))
|
||||||
n_bytes += len(chunk)
|
future = asyncio.ensure_future(fetch(session, url, proxy))
|
||||||
if not chunk:
|
outstanding_requests.add(future)
|
||||||
n_urls += 1
|
# logging.info('scheduled future fetch of %s', url)
|
||||||
break
|
else:
|
||||||
return n_urls, n_bytes
|
done, pending = await asyncio.wait(
|
||||||
|
outstanding_requests,
|
||||||
|
return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
for future in done:
|
||||||
|
outstanding_requests.remove(future)
|
||||||
|
n_urls += 1
|
||||||
|
n_bytes += future.result()
|
||||||
|
if time.time() - start >= duration and not pending:
|
||||||
|
return n_urls, n_bytes, time.time() - start
|
||||||
|
|
||||||
def build_arg_parser(tmpdir, prog=os.path.basename(sys.argv[0])):
|
def build_arg_parser(tmpdir, prog=os.path.basename(sys.argv[0])):
|
||||||
|
desc = '''
|
||||||
|
Warcprox benchmarker. Runs simple http and https servers and uses them to
|
||||||
|
benchmark warcprox. Runs 4 benchmarks:
|
||||||
|
|
||||||
|
1. baseline http (no warcprox)
|
||||||
|
2. baseline https (no warcprox)
|
||||||
|
3. http with warcprox
|
||||||
|
4. https with warcprox
|
||||||
|
|
||||||
|
Each of these runs for a predetermined amount of time, which is 1/4 of the time
|
||||||
|
specified by the --time option.
|
||||||
|
|
||||||
|
Uses a temporary directory for warcs and other files. Otherwise, most warcprox
|
||||||
|
options can be specified on the command line. Useful for comparing performance
|
||||||
|
with different options.
|
||||||
|
|
||||||
|
Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
|
||||||
|
'''
|
||||||
arg_parser = argparse.ArgumentParser(
|
arg_parser = argparse.ArgumentParser(
|
||||||
prog=prog, description='warcprox benchmarker',
|
prog=prog, description=desc,
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter)
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'-z', '--gzip', dest='gzip', action='store_true',
|
'-z', '--gzip', dest='gzip', action='store_true',
|
||||||
help='write gzip-compressed warc records')
|
help='write gzip-compressed warc records')
|
||||||
@ -178,23 +222,41 @@ def build_arg_parser(tmpdir, prog=os.path.basename(sys.argv[0])):
|
|||||||
'--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
|
'--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
|
||||||
default=None, help='kafka capture feed topic')
|
default=None, help='kafka capture feed topic')
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--queue-size', dest='queue_size', type=int, default=1)
|
'--queue-size', dest='queue_size', type=int, default=1, help=(
|
||||||
|
'max size of the queue of urls waiting to be processed by '
|
||||||
|
'the warc writer thread'))
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--max-threads', dest='max_threads', type=int)
|
'--max-threads', dest='max_threads', type=int, help=(
|
||||||
|
'number of proxy server threads (if not specified, chosen based '
|
||||||
|
'on system resource limits'))
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--version', action='version',
|
'--version', action='version',
|
||||||
version='warcprox %s' % warcprox.__version__)
|
version='warcprox %s' % warcprox.__version__)
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'-v', '--verbose', dest='verbose', action='store_true')
|
'-v', '--verbose', dest='verbose', action='store_true',
|
||||||
arg_parser.add_argument('--trace', dest='trace', action='store_true')
|
help='verbose logging')
|
||||||
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
|
arg_parser.add_argument(
|
||||||
|
'--trace', dest='trace', action='store_true',
|
||||||
|
help='trace-level logging')
|
||||||
|
arg_parser.add_argument(
|
||||||
|
'--profile', dest='profile', action='store_true', default=False,
|
||||||
|
help='profile the warc writer thread')
|
||||||
|
arg_parser.add_argument(
|
||||||
|
'--time', dest='time', type=float, default=20.0, help=(
|
||||||
|
'time to spend running benchmarks; total allotment will be '
|
||||||
|
'divided among the 4 benchmark cases'))
|
||||||
|
arg_parser.add_argument(
|
||||||
|
'--skip-baseline', dest='skip_baseline', action='store_true',
|
||||||
|
help='skip the baseline bechmarks')
|
||||||
return arg_parser
|
return arg_parser
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# see https://github.com/pyca/cryptography/issues/2911
|
# see https://github.com/pyca/cryptography/issues/2911
|
||||||
cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
|
cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
# with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
tmpdir = tempfile.mkdtemp()
|
||||||
|
if True:
|
||||||
arg_parser = build_arg_parser(tmpdir)
|
arg_parser = build_arg_parser(tmpdir)
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
|
|
||||||
@ -202,17 +264,18 @@ if __name__ == '__main__':
|
|||||||
loglevel = warcprox.TRACE
|
loglevel = warcprox.TRACE
|
||||||
elif args.verbose:
|
elif args.verbose:
|
||||||
loglevel = logging.DEBUG
|
loglevel = logging.DEBUG
|
||||||
elif args.quiet:
|
|
||||||
loglevel = logging.WARNING
|
|
||||||
else:
|
else:
|
||||||
loglevel = logging.INFO
|
loglevel = logging.INFO
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
stream=sys.stdout, level=loglevel, format=(
|
stream=sys.stdout, level=loglevel, format=(
|
||||||
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
|
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
|
||||||
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
|
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) '
|
||||||
|
'%(message)s'))
|
||||||
logging.getLogger('warcprox').setLevel(loglevel + 5)
|
logging.getLogger('warcprox').setLevel(loglevel + 5)
|
||||||
|
|
||||||
|
logging.info('using temp dir %s', tmpdir)
|
||||||
|
|
||||||
args.playback_port = None
|
args.playback_port = None
|
||||||
args.address = '127.0.0.1'
|
args.address = '127.0.0.1'
|
||||||
args.port = 0
|
args.port = 0
|
||||||
@ -222,10 +285,6 @@ if __name__ == '__main__':
|
|||||||
if args.rethinkdb_servers:
|
if args.rethinkdb_servers:
|
||||||
args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % (
|
args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % (
|
||||||
datetime.datetime.utcnow())
|
datetime.datetime.utcnow())
|
||||||
warcprox_controller = warcprox.main.init_controller(args)
|
|
||||||
warcprox_controller_thread = threading.Thread(
|
|
||||||
target=warcprox_controller.run_until_shutdown)
|
|
||||||
warcprox_controller_thread.start()
|
|
||||||
|
|
||||||
start_servers()
|
start_servers()
|
||||||
logging.info(
|
logging.info(
|
||||||
@ -234,44 +293,54 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
logging.info("===== baseline benchmark starting (no proxy) =====")
|
logging.info('===== baseline benchmark starting (no proxy) =====')
|
||||||
start = time.time()
|
if not args.skip_baseline:
|
||||||
n_urls, n_bytes = loop.run_until_complete(
|
n_urls, n_bytes, elapsed = loop.run_until_complete(
|
||||||
benchmarking_client('http://127.0.0.1:4080', 100))
|
benchmarking_client(
|
||||||
finish = time.time()
|
'http://127.0.0.1:4080', args.time / 4.0))
|
||||||
logging.info(
|
logging.info(
|
||||||
'http baseline (no proxy): n_urls=%s n_bytes=%s in %.1f sec',
|
'http baseline (no proxy): n_urls=%s n_bytes=%s in %.1f '
|
||||||
n_urls, n_bytes, finish - start)
|
'sec', n_urls, n_bytes, elapsed)
|
||||||
|
|
||||||
start = time.time()
|
n_urls, n_bytes, elapsed = loop.run_until_complete(
|
||||||
n_urls, n_bytes = loop.run_until_complete(
|
benchmarking_client(
|
||||||
benchmarking_client('https://127.0.0.1:4443', 100))
|
'https://127.0.0.1:4443', args.time / 4.0))
|
||||||
finish = time.time()
|
logging.info(
|
||||||
logging.info(
|
'https baseline (no proxy): n_urls=%s n_bytes=%s in %.1f '
|
||||||
'https baseline (no proxy): n_urls=%s n_bytes=%s in %.1f sec',
|
'sec', n_urls, n_bytes, elapsed)
|
||||||
n_urls, n_bytes, finish - start)
|
else:
|
||||||
logging.info("===== baseline benchmark finished =====")
|
logging.info('SKIPPED')
|
||||||
|
logging.info('===== baseline benchmark finished =====')
|
||||||
|
|
||||||
proxy = "http://%s:%s" % (
|
if args.skip_baseline:
|
||||||
|
t = args.time / 2.0
|
||||||
|
else:
|
||||||
|
t = args.time / 4.0
|
||||||
|
|
||||||
|
warcprox_controller = warcprox.main.init_controller(args)
|
||||||
|
warcprox_controller_thread = threading.Thread(
|
||||||
|
target=warcprox_controller.run_until_shutdown)
|
||||||
|
warcprox_controller_thread.start()
|
||||||
|
|
||||||
|
proxy = 'http://%s:%s' % (
|
||||||
warcprox_controller.proxy.server_address[0],
|
warcprox_controller.proxy.server_address[0],
|
||||||
warcprox_controller.proxy.server_address[1])
|
warcprox_controller.proxy.server_address[1])
|
||||||
logging.info("===== warcprox benchmark starting =====")
|
logging.info('===== warcprox benchmark starting =====')
|
||||||
start = time.time()
|
n_urls, n_bytes, elapsed = loop.run_until_complete(
|
||||||
n_urls, n_bytes = loop.run_until_complete(
|
benchmarking_client('http://127.0.0.1:4080', t, proxy))
|
||||||
benchmarking_client('http://127.0.0.1:4080', 100, proxy))
|
|
||||||
finish = time.time()
|
|
||||||
logging.info(
|
logging.info(
|
||||||
'http: n_urls=%s n_bytes=%s in %.1f sec',
|
'http: n_urls=%s n_bytes=%s in %.1f sec',
|
||||||
n_urls, n_bytes, finish - start)
|
n_urls, n_bytes, elapsed)
|
||||||
|
|
||||||
start = time.time()
|
n_urls, n_bytes, elapsed = loop.run_until_complete(
|
||||||
n_urls, n_bytes = loop.run_until_complete(
|
benchmarking_client('https://127.0.0.1:4443', t, proxy))
|
||||||
benchmarking_client('https://127.0.0.1:4443', 100, proxy))
|
|
||||||
finish = time.time()
|
|
||||||
logging.info(
|
logging.info(
|
||||||
'https: n_urls=%s n_bytes=%s in %.1f sec',
|
'https: n_urls=%s n_bytes=%s in %.1f sec',
|
||||||
n_urls, n_bytes, finish - start)
|
n_urls, n_bytes, elapsed)
|
||||||
logging.info("===== warcprox benchmark finished =====")
|
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
warcprox_controller.stop.set()
|
warcprox_controller.stop.set()
|
||||||
warcprox_controller_thread.join()
|
warcprox_controller_thread.join()
|
||||||
|
logging.info(
|
||||||
|
'waited %.1f sec for warcprox to finish', time.time() - start)
|
||||||
|
logging.info('===== warcprox benchmark finished =====')
|
||||||
|
4
setup.py
4
setup.py
@ -37,7 +37,7 @@ class PyTest(setuptools.command.test.test):
|
|||||||
sys.exit(errno)
|
sys.exit(errno)
|
||||||
|
|
||||||
deps = [
|
deps = [
|
||||||
'certauth>=1.1.0',
|
'certauth>=1.1.6',
|
||||||
'warctools',
|
'warctools',
|
||||||
'kafka-python>=1.0.1',
|
'kafka-python>=1.0.1',
|
||||||
'urlcanon>=0.1.dev16',
|
'urlcanon>=0.1.dev16',
|
||||||
@ -51,7 +51,7 @@ except:
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='warcprox',
|
name='warcprox',
|
||||||
version='2.1b1.dev78',
|
version='2.1b1.dev79',
|
||||||
description='WARC writing MITM HTTP/S proxy',
|
description='WARC writing MITM HTTP/S proxy',
|
||||||
url='https://github.com/internetarchive/warcprox',
|
url='https://github.com/internetarchive/warcprox',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
@ -43,10 +43,27 @@ import re
|
|||||||
import doublethink
|
import doublethink
|
||||||
import cryptography.hazmat.backends.openssl
|
import cryptography.hazmat.backends.openssl
|
||||||
|
|
||||||
|
class BetterArgumentDefaultsHelpFormatter(
|
||||||
|
argparse.ArgumentDefaultsHelpFormatter,
|
||||||
|
argparse.RawDescriptionHelpFormatter):
|
||||||
|
'''
|
||||||
|
HelpFormatter with these properties:
|
||||||
|
|
||||||
|
- formats option help like argparse.ArgumentDefaultsHelpFormatter except
|
||||||
|
that it - omits the default value for arguments with action='store_const'
|
||||||
|
- like argparse.RawDescriptionHelpFormatter, does not reformat description
|
||||||
|
string
|
||||||
|
'''
|
||||||
|
def _get_help_string(self, action):
|
||||||
|
if isinstance(action, argparse._StoreConstAction):
|
||||||
|
return action.help
|
||||||
|
else:
|
||||||
|
return super()._get_help_string(action)
|
||||||
|
|
||||||
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
||||||
arg_parser = argparse.ArgumentParser(prog=prog,
|
arg_parser = argparse.ArgumentParser(prog=prog,
|
||||||
description='warcprox - WARC writing MITM HTTP/S proxy',
|
description='warcprox - WARC writing MITM HTTP/S proxy',
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class=BetterArgumentDefaultsHelpFormatter)
|
||||||
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
|
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
|
||||||
type=int, help='port to listen on')
|
type=int, help='port to listen on')
|
||||||
arg_parser.add_argument('-b', '--address', dest='address',
|
arg_parser.add_argument('-b', '--address', dest='address',
|
||||||
@ -272,7 +289,7 @@ def ensure_rethinkdb_tables():
|
|||||||
'''
|
'''
|
||||||
arg_parser = argparse.ArgumentParser(
|
arg_parser = argparse.ArgumentParser(
|
||||||
prog=os.path.basename(sys.argv[0]),
|
prog=os.path.basename(sys.argv[0]),
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
formatter_class=BetterArgumentDefaultsHelpFormatter)
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--rethinkdb-servers', dest='rethinkdb_servers', default='localhost',
|
'--rethinkdb-servers', dest='rethinkdb_servers', default='localhost',
|
||||||
help='rethinkdb servers e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
help='rethinkdb servers e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
||||||
|
@ -260,9 +260,10 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
return self._remote_server_sock
|
return self._remote_server_sock
|
||||||
|
|
||||||
def _transition_to_ssl(self):
|
def _transition_to_ssl(self):
|
||||||
|
certfile = self.server.ca.cert_for_host(self.hostname)
|
||||||
self.request = self.connection = ssl.wrap_socket(
|
self.request = self.connection = ssl.wrap_socket(
|
||||||
self.connection, server_side=True,
|
self.connection, server_side=True, certfile=certfile)
|
||||||
certfile=self.server.ca.cert_for_host(self.hostname))
|
# logging.info('self.hostname=%s certfile=%s', self.hostname, certfile)
|
||||||
|
|
||||||
def do_CONNECT(self):
|
def do_CONNECT(self):
|
||||||
'''
|
'''
|
||||||
|
Loading…
x
Reference in New Issue
Block a user