diff --git a/setup.py b/setup.py index 9e5d580..90a7719 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev116', + version='2.2.1b2.dev117', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index e50a415..6ac4fff 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -92,7 +92,7 @@ class RequestBlockedByRule(Exception): def __str__(self): return "%s: %s" % (self.__class__.__name__, self.msg) -# monkey-patch log level TRACE +# monkey-patch log levels TRACE and NOTICE TRACE = 5 import logging def _logging_trace(msg, *args, **kwargs): @@ -104,6 +104,17 @@ logging.trace = _logging_trace logging.Logger.trace = _logger_trace logging.addLevelName(TRACE, 'TRACE') +NOTICE = (logging.INFO + logging.WARN) // 2 +import logging +def _logging_notice(msg, *args, **kwargs): + logging.root.notice(msg, *args, **kwargs) +def _logger_notice(self, msg, *args, **kwargs): + if self.isEnabledFor(NOTICE): + self._log(NOTICE, msg, args, **kwargs) +logging.notice = _logging_notice +logging.Logger.notice = _logger_notice +logging.addLevelName(NOTICE, 'NOTICE') + import warcprox.controller as controller import warcprox.playback as playback import warcprox.dedup as dedup diff --git a/warcprox/controller.py b/warcprox/controller.py index 42f71de..217f519 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -254,6 +254,9 @@ class WarcproxController(object): # last_mem_dbg = datetime.datetime.utcnow() time.sleep(0.5) + + if self.options.profile: + self._dump_profiling() except: self.logger.critical( "shutting down in response to fatal exception", @@ -262,3 +265,23 @@ class WarcproxController(object): finally: self.shutdown() + def _dump_profiling(self): + import pstats + import tempfile + import os + import io + with tempfile.TemporaryDirectory() as tmpdir: + files = [] + for th_id, profiler in self.proxy.profilers.items(): + file = os.path.join(tmpdir, '%s.dat' % th_id) + profiler.dump_stats(file) + files.append(file) + + buf = io.StringIO() + stats = pstats.Stats(*files, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'aggregate performance profile of %s proxy threads:\n%s', + len(files), buf.getvalue()) + diff --git a/warcprox/main.py b/warcprox/main.py index 1e6aaf8..462da8e 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -126,6 +126,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): help=argparse.SUPPRESS) arg_parser.add_argument('--profile', action='store_true', default=False, help=argparse.SUPPRESS) + arg_parser.add_argument( + '--writer-threads', dest='writer_threads', type=int, default=None, + help=argparse.SUPPRESS) arg_parser.add_argument( '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', default=None, help=( @@ -257,12 +260,14 @@ def init_controller(args): # number of warc writer threads = sqrt(proxy.max_threads) # I came up with this out of thin air because it strikes me as reasonable # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5) + logging.debug('initializing %d warc writer threads', num_writer_threads) warc_writer_threads = [ warcprox.writerthread.WarcWriterThread( name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q, writer_pool=writer_pool, dedup_db=dedup_db, listeners=listeners, options=options) - for i in range(int(proxy.max_threads ** 0.5))] + for i in range(num_writer_threads)] if args.rethinkdb_servers: svcreg = doublethink.ServiceRegistry(rr) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 2c34bcd..d1ac12e 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -541,14 +541,33 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): # This value is passed as the "backlog" argument to listen(2). The default # value from socketserver.TCPServer is 5. Increasing this value is part of # the solution to client connections being closed suddenly and this message - # appearing in kernel log on linux: "TCP: request_sock_TCP: # Possible SYN - # flooding on port 8000. Sending cookies. Check SNMP # counters." I think + # appearing in kernel log on linux: "TCP: request_sock_TCP: Possible SYN + # flooding on port 8000. Sending cookies. Check SNMP counters." I think # this comes into play because we don't always accept(2) immediately (see # PooledMixIn.get_request()). # See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html request_queue_size = 4096 - def process_request_thread(self, request, client_address): + def __init__(self, max_threads, options=warcprox.Options()): + PooledMixIn.__init__(self, max_threads) + + if options.profile: + self.profilers = {} + self.process_request_thread = self._profile_process_request_thread + else: + self.profilers + self.process_request_thread = self._process_request_thread + + def _profile_process_request_thread(self, request, client_address): + if not threading.current_thread().ident in self.profilers: + import cProfile + self.profilers[threading.current_thread().ident] = cProfile.Profile() + profiler = self.profilers[threading.current_thread().ident] + profiler.enable() + self._process_request_thread(request, client_address) + profiler.disable() + + def _process_request_thread(self, request, client_address): ''' This an almost verbatim copy/paste of socketserver.ThreadingMixIn.process_request_thread. diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index f1de01e..3331221 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -425,7 +425,8 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): self.logger.info( "max_threads=%s set by command line option", options.max_threads) - warcprox.mitmproxy.PooledMitmProxy.__init__(self, options.max_threads) + warcprox.mitmproxy.PooledMitmProxy.__init__( + self, options.max_threads, options) SingleThreadedWarcProxy.__init__( self, ca, recorded_url_q, stats_db, options) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index a8a6ef7..1041a30 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -33,7 +33,6 @@ import time from datetime import datetime from hanzo import warctools import warcprox -import cProfile import sys class WarcWriterThread(threading.Thread): @@ -59,7 +58,21 @@ class WarcWriterThread(threading.Thread): def run(self): if self.options.profile: - cProfile.runctx('self._run()', globals(), locals(), sort='cumulative') + import cProfile + import pstats + import io + profiler = cProfile.Profile() + + profiler.enable() + self._run() + profiler.disable() + + buf = io.StringIO() + stats = pstats.Stats(profiler, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + '%s performance profile:\n%s', self, buf.getvalue()) else: self._run()