diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index a05db59..8267806 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -160,90 +160,9 @@ with different options. Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. ''' - arg_parser = argparse.ArgumentParser( - prog=prog, description=desc, - formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter) + arg_parser = warcprox.main._build_arg_parser() + arg_parser.description = desc - ### these warcprox options are not configurable for the benchmarks - # arg_parser.add_argument('-p', '--port', dest='port', default='8000', - # type=int, help='port to listen on') - # arg_parser.add_argument('-b', '--address', dest='address', - # default='localhost', help='address to listen on') - # arg_parser.add_argument('-c', '--cacert', dest='cacert', - # default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), - # help='CA certificate file; if file does not exist, it will be created') - # arg_parser.add_argument('--certs-dir', dest='certs_dir', - # default='./{0}-warcprox-ca'.format(socket.gethostname()), - # help='where to store and load generated certificates') - # arg_parser.add_argument('-d', '--dir', dest='directory', - # default='./warcs', help='where to write warcs') - - arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', - help='write gzip-compressed warc records') - arg_parser.add_argument('-n', '--prefix', dest='prefix', - default='WARCPROX', help='WARC filename prefix') - arg_parser.add_argument( - '-s', '--size', dest='rollover_size', default=1000*1000*1000, - type=int, help='WARC file rollover size threshold in bytes') - arg_parser.add_argument('--rollover-idle-time', - dest='rollover_idle_time', default=None, type=int, - help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") - try: - hash_algos = hashlib.algorithms_guaranteed - except AttributeError: - hash_algos = hashlib.algorithms - arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', - default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) - arg_parser.add_argument('--base32', dest='base32', action='store_true', - default=False, help='write digests in Base32 instead of hex') - arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', - action='append', help='only record requests with the given http method(s) (can be used more than once)') - arg_parser.add_argument('--stats-db-file', dest='stats_db_file', - default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') - arg_parser.add_argument('-P', '--playback-port', dest='playback_port', - type=int, default=None, help='port to listen on for instant playback') - arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', - default='./warcprox-playback-index.db', - help='playback index database file (only used if --playback-port is specified)') - group = arg_parser.add_mutually_exclusive_group() - group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', - default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') - group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', - help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') - arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', - help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') - arg_parser.add_argument('--rethinkdb-big-table', - dest='rethinkdb_big_table', action='store_true', default=False, - help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') - arg_parser.add_argument( - '--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', - default='captures', help=argparse.SUPPRESS) - arg_parser.add_argument('--queue-size', dest='queue_size', type=int, - default=500, help=argparse.SUPPRESS) - arg_parser.add_argument('--max-threads', dest='max_threads', type=int, - help=argparse.SUPPRESS) - arg_parser.add_argument('--profile', action='store_true', default=False, - help=argparse.SUPPRESS) - arg_parser.add_argument( - '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', - default=None, help=( - 'host:port of tor socks proxy, used only to connect to ' - '.onion sites')) - arg_parser.add_argument( - '--plugin', metavar='PLUGIN_CLASS', dest='plugins', - action='append', help=( - 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' - 'May be used multiple times to register multiple plugins. ' - 'Plugin classes are loaded from the regular python module ' - 'search path. They will be instantiated with no arguments and ' - 'must have a method `notify(self, recorded_url, records)` ' - 'which will be called for each url, after warc records have ' - 'been written.')) - arg_parser.add_argument('--version', action='version', - version="warcprox {}".format(warcprox.__version__)) - arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') - arg_parser.add_argument('--trace', dest='trace', action='store_true') - arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') arg_parser.add_argument( '--requests', dest='requests', type=int, default=200, help='number of urls to fetch') @@ -253,6 +172,15 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. arg_parser.add_argument( '--skip-baseline', dest='skip_baseline', action='store_true', help='skip the baseline bechmarks') + + # filter out options that are not configurable for the benchmarks + filtered = [] + for action in arg_parser._action_groups[1]._group_actions: + if action.dest not in ( + 'port', 'address', 'cacert', 'certs_dir', 'directory'): + filtered.append(action) + arg_parser._action_groups[1]._group_actions = filtered + return arg_parser if __name__ == '__main__': diff --git a/setup.py b/setup.py index 677dda8..5f8ff09 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev115', + version='2.2.1b2.dev123', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index e8c140b..db6a657 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -238,6 +238,9 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): raise Exception('bad path') headers = b'HTTP/1.1 200 OK\r\n' + actual_headers + b'\r\n' logging.info('headers=%r payload=%r', headers, payload) + elif self.path == '/empty-response': + headers = b'' + payload = b'' else: payload = b'404 Not Found\n' headers = (b'HTTP/1.1 404 Not Found\r\n' @@ -1661,6 +1664,20 @@ def test_long_warcprox_meta( with pytest.raises(StopIteration): next(rec_iter) +def test_empty_response( + warcprox_, http_daemon, https_daemon, archiving_proxies, + playback_proxies): + + url = 'http://localhost:%s/empty-response' % http_daemon.server_port + response = requests.get(url, proxies=archiving_proxies, verify=False) + assert response.status_code == 502 + # this is the reason in python >= 3.5 but not in 3.4 and 2.7 + # assert response.reason == 'Remote end closed connection without response' + + url = 'https://localhost:%s/empty-response' % https_daemon.server_port + response = requests.get(url, proxies=archiving_proxies, verify=False) + assert response.status_code == 502 + def test_payload_digest(warcprox_, http_daemon): ''' Tests that digest is of RFC2616 "entity body" diff --git a/warcprox/__init__.py b/warcprox/__init__.py index e50a415..6ac4fff 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -92,7 +92,7 @@ class RequestBlockedByRule(Exception): def __str__(self): return "%s: %s" % (self.__class__.__name__, self.msg) -# monkey-patch log level TRACE +# monkey-patch log levels TRACE and NOTICE TRACE = 5 import logging def _logging_trace(msg, *args, **kwargs): @@ -104,6 +104,17 @@ logging.trace = _logging_trace logging.Logger.trace = _logger_trace logging.addLevelName(TRACE, 'TRACE') +NOTICE = (logging.INFO + logging.WARN) // 2 +import logging +def _logging_notice(msg, *args, **kwargs): + logging.root.notice(msg, *args, **kwargs) +def _logger_notice(self, msg, *args, **kwargs): + if self.isEnabledFor(NOTICE): + self._log(NOTICE, msg, args, **kwargs) +logging.notice = _logging_notice +logging.Logger.notice = _logger_notice +logging.addLevelName(NOTICE, 'NOTICE') + import warcprox.controller as controller import warcprox.playback as playback import warcprox.dedup as dedup diff --git a/warcprox/controller.py b/warcprox/controller.py index 42f71de..11291c4 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -44,7 +44,7 @@ class WarcproxController(object): Create warcprox controller. If supplied, `proxy` should be an instance of WarcProxy, and - `warc_writer_threads` should be an list of WarcWriterThread instances. + `warc_writer_threads` should be a list of WarcWriterThread instances. If not supplied, they are created with default values. If supplied, playback_proxy should be an instance of PlaybackProxy. If @@ -254,6 +254,9 @@ class WarcproxController(object): # last_mem_dbg = datetime.datetime.utcnow() time.sleep(0.5) + + if self.options.profile: + self._dump_profiling() except: self.logger.critical( "shutting down in response to fatal exception", @@ -262,3 +265,36 @@ class WarcproxController(object): finally: self.shutdown() + def _dump_profiling(self): + import pstats, tempfile, os, io + with tempfile.TemporaryDirectory() as tmpdir: + # proxy threads + files = [] + for th_id, profiler in self.proxy.profilers.items(): + file = os.path.join(tmpdir, '%s.dat' % th_id) + profiler.dump_stats(file) + files.append(file) + + buf = io.StringIO() + stats = pstats.Stats(*files, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'aggregate performance profile of %s proxy threads:\n%s', + len(files), buf.getvalue()) + + # warc writer threads + files = [] + for wwt in self.warc_writer_threads: + file = os.path.join(tmpdir, '%s.dat' % wwt.ident) + wwt.profiler.dump_stats(file) + files.append(file) + + buf = io.StringIO() + stats = pstats.Stats(*files, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'aggregate performance profile of %s warc writer threads:\n%s', + len(self.warc_writer_threads), buf.getvalue()) + diff --git a/warcprox/main.py b/warcprox/main.py index a2fca9c..5968038 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -147,6 +147,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): help=argparse.SUPPRESS) arg_parser.add_argument('--profile', action='store_true', default=False, help=argparse.SUPPRESS) + arg_parser.add_argument( + '--writer-threads', dest='writer_threads', type=int, default=None, + help=argparse.SUPPRESS) arg_parser.add_argument( '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', default=None, help=( @@ -273,12 +276,14 @@ def init_controller(args): # number of warc writer threads = sqrt(proxy.max_threads) # I came up with this out of thin air because it strikes me as reasonable # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5) + logging.debug('initializing %d warc writer threads', num_writer_threads) warc_writer_threads = [ warcprox.writerthread.WarcWriterThread( name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q, writer_pool=writer_pool, dedup_db=dedup_db, listeners=listeners, options=options) - for i in range(int(proxy.max_threads ** 0.5))] + for i in range(num_writer_threads)] if args.rethinkdb_services_url: parsed = doublethink.parse_rethinkdb_url( diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 2c34bcd..8d950fa 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -363,9 +363,12 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): try: return self._proxy_request() - except: - self.logger.error("exception proxying request", exc_info=True) - raise + except Exception as e: + self.logger.error( + 'error from remote server(?) %r: %r', + self.requestline, e, exc_info=True) + self.send_error(502, str(e)) + return def _proxy_request(self, extra_response_headers={}): ''' @@ -425,10 +428,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): buf = prox_rec_res.read(65536) self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) - except Exception as e: - self.logger.error( - "%r proxying %s %s", e, self.command, self.url, - exc_info=True) finally: # Let's close off the remote end if prox_rec_res: @@ -541,14 +540,32 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): # This value is passed as the "backlog" argument to listen(2). The default # value from socketserver.TCPServer is 5. Increasing this value is part of # the solution to client connections being closed suddenly and this message - # appearing in kernel log on linux: "TCP: request_sock_TCP: # Possible SYN - # flooding on port 8000. Sending cookies. Check SNMP # counters." I think + # appearing in kernel log on linux: "TCP: request_sock_TCP: Possible SYN + # flooding on port 8000. Sending cookies. Check SNMP counters." I think # this comes into play because we don't always accept(2) immediately (see # PooledMixIn.get_request()). # See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html request_queue_size = 4096 - def process_request_thread(self, request, client_address): + def __init__(self, max_threads, options=warcprox.Options()): + PooledMixIn.__init__(self, max_threads) + self.profilers = {} + + if options.profile: + self.process_request_thread = self._profile_process_request_thread + else: + self.process_request_thread = self._process_request_thread + + def _profile_process_request_thread(self, request, client_address): + if not threading.current_thread().ident in self.profilers: + import cProfile + self.profilers[threading.current_thread().ident] = cProfile.Profile() + profiler = self.profilers[threading.current_thread().ident] + profiler.enable() + self._process_request_thread(request, client_address) + profiler.disable() + + def _process_request_thread(self, request, client_address): ''' This an almost verbatim copy/paste of socketserver.ThreadingMixIn.process_request_thread. diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index f1de01e..12aac74 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -311,7 +311,10 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.server.recorded_url_q.put(rec_custom) self.send_response(204, 'OK') else: - self.send_error(400, 'Bad request') + self.send_error(400, message='Bad request', explain=( + 'Bad request. WARC-Type, Content-Length, and Content-Type ' + 'request headers required for WARCPROX_WRITE_RECORD ' + 'request.')) self.end_headers() except: @@ -425,7 +428,8 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): self.logger.info( "max_threads=%s set by command line option", options.max_threads) - warcprox.mitmproxy.PooledMitmProxy.__init__(self, options.max_threads) + warcprox.mitmproxy.PooledMitmProxy.__init__( + self, options.max_threads, options) SingleThreadedWarcProxy.__init__( self, ca, recorded_url_q, stats_db, options) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index a8a6ef7..3f42fc1 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -33,7 +33,6 @@ import time from datetime import datetime from hanzo import warctools import warcprox -import cProfile import sys class WarcWriterThread(threading.Thread): @@ -59,7 +58,11 @@ class WarcWriterThread(threading.Thread): def run(self): if self.options.profile: - cProfile.runctx('self._run()', globals(), locals(), sort='cumulative') + import cProfile + self.profiler = cProfile.Profile() + self.profiler.enable() + self._run() + self.profiler.disable() else: self._run()