mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
have --profile profile proxy threads as well as warc writer threads
This commit is contained in:
parent
9cce03dc16
commit
c13fd9a40e
2
setup.py
2
setup.py
@ -51,7 +51,7 @@ except:
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='warcprox',
|
name='warcprox',
|
||||||
version='2.2.1b2.dev116',
|
version='2.2.1b2.dev117',
|
||||||
description='WARC writing MITM HTTP/S proxy',
|
description='WARC writing MITM HTTP/S proxy',
|
||||||
url='https://github.com/internetarchive/warcprox',
|
url='https://github.com/internetarchive/warcprox',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
@ -92,7 +92,7 @@ class RequestBlockedByRule(Exception):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "%s: %s" % (self.__class__.__name__, self.msg)
|
return "%s: %s" % (self.__class__.__name__, self.msg)
|
||||||
|
|
||||||
# monkey-patch log level TRACE
|
# monkey-patch log levels TRACE and NOTICE
|
||||||
TRACE = 5
|
TRACE = 5
|
||||||
import logging
|
import logging
|
||||||
def _logging_trace(msg, *args, **kwargs):
|
def _logging_trace(msg, *args, **kwargs):
|
||||||
@ -104,6 +104,17 @@ logging.trace = _logging_trace
|
|||||||
logging.Logger.trace = _logger_trace
|
logging.Logger.trace = _logger_trace
|
||||||
logging.addLevelName(TRACE, 'TRACE')
|
logging.addLevelName(TRACE, 'TRACE')
|
||||||
|
|
||||||
|
NOTICE = (logging.INFO + logging.WARN) // 2
|
||||||
|
import logging
|
||||||
|
def _logging_notice(msg, *args, **kwargs):
|
||||||
|
logging.root.notice(msg, *args, **kwargs)
|
||||||
|
def _logger_notice(self, msg, *args, **kwargs):
|
||||||
|
if self.isEnabledFor(NOTICE):
|
||||||
|
self._log(NOTICE, msg, args, **kwargs)
|
||||||
|
logging.notice = _logging_notice
|
||||||
|
logging.Logger.notice = _logger_notice
|
||||||
|
logging.addLevelName(NOTICE, 'NOTICE')
|
||||||
|
|
||||||
import warcprox.controller as controller
|
import warcprox.controller as controller
|
||||||
import warcprox.playback as playback
|
import warcprox.playback as playback
|
||||||
import warcprox.dedup as dedup
|
import warcprox.dedup as dedup
|
||||||
|
@ -254,6 +254,9 @@ class WarcproxController(object):
|
|||||||
# last_mem_dbg = datetime.datetime.utcnow()
|
# last_mem_dbg = datetime.datetime.utcnow()
|
||||||
|
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
if self.options.profile:
|
||||||
|
self._dump_profiling()
|
||||||
except:
|
except:
|
||||||
self.logger.critical(
|
self.logger.critical(
|
||||||
"shutting down in response to fatal exception",
|
"shutting down in response to fatal exception",
|
||||||
@ -262,3 +265,23 @@ class WarcproxController(object):
|
|||||||
finally:
|
finally:
|
||||||
self.shutdown()
|
self.shutdown()
|
||||||
|
|
||||||
|
def _dump_profiling(self):
|
||||||
|
import pstats
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
import io
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
files = []
|
||||||
|
for th_id, profiler in self.proxy.profilers.items():
|
||||||
|
file = os.path.join(tmpdir, '%s.dat' % th_id)
|
||||||
|
profiler.dump_stats(file)
|
||||||
|
files.append(file)
|
||||||
|
|
||||||
|
buf = io.StringIO()
|
||||||
|
stats = pstats.Stats(*files, stream=buf)
|
||||||
|
stats.sort_stats('cumulative')
|
||||||
|
stats.print_stats(0.1)
|
||||||
|
self.logger.notice(
|
||||||
|
'aggregate performance profile of %s proxy threads:\n%s',
|
||||||
|
len(files), buf.getvalue())
|
||||||
|
|
||||||
|
@ -126,6 +126,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
help=argparse.SUPPRESS)
|
help=argparse.SUPPRESS)
|
||||||
arg_parser.add_argument('--profile', action='store_true', default=False,
|
arg_parser.add_argument('--profile', action='store_true', default=False,
|
||||||
help=argparse.SUPPRESS)
|
help=argparse.SUPPRESS)
|
||||||
|
arg_parser.add_argument(
|
||||||
|
'--writer-threads', dest='writer_threads', type=int, default=None,
|
||||||
|
help=argparse.SUPPRESS)
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
|
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
|
||||||
default=None, help=(
|
default=None, help=(
|
||||||
@ -257,12 +260,14 @@ def init_controller(args):
|
|||||||
# number of warc writer threads = sqrt(proxy.max_threads)
|
# number of warc writer threads = sqrt(proxy.max_threads)
|
||||||
# I came up with this out of thin air because it strikes me as reasonable
|
# I came up with this out of thin air because it strikes me as reasonable
|
||||||
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
|
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
|
||||||
|
num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5)
|
||||||
|
logging.debug('initializing %d warc writer threads', num_writer_threads)
|
||||||
warc_writer_threads = [
|
warc_writer_threads = [
|
||||||
warcprox.writerthread.WarcWriterThread(
|
warcprox.writerthread.WarcWriterThread(
|
||||||
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
|
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
|
||||||
writer_pool=writer_pool, dedup_db=dedup_db,
|
writer_pool=writer_pool, dedup_db=dedup_db,
|
||||||
listeners=listeners, options=options)
|
listeners=listeners, options=options)
|
||||||
for i in range(int(proxy.max_threads ** 0.5))]
|
for i in range(num_writer_threads)]
|
||||||
|
|
||||||
if args.rethinkdb_servers:
|
if args.rethinkdb_servers:
|
||||||
svcreg = doublethink.ServiceRegistry(rr)
|
svcreg = doublethink.ServiceRegistry(rr)
|
||||||
|
@ -541,14 +541,33 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
|
|||||||
# This value is passed as the "backlog" argument to listen(2). The default
|
# This value is passed as the "backlog" argument to listen(2). The default
|
||||||
# value from socketserver.TCPServer is 5. Increasing this value is part of
|
# value from socketserver.TCPServer is 5. Increasing this value is part of
|
||||||
# the solution to client connections being closed suddenly and this message
|
# the solution to client connections being closed suddenly and this message
|
||||||
# appearing in kernel log on linux: "TCP: request_sock_TCP: # Possible SYN
|
# appearing in kernel log on linux: "TCP: request_sock_TCP: Possible SYN
|
||||||
# flooding on port 8000. Sending cookies. Check SNMP # counters." I think
|
# flooding on port 8000. Sending cookies. Check SNMP counters." I think
|
||||||
# this comes into play because we don't always accept(2) immediately (see
|
# this comes into play because we don't always accept(2) immediately (see
|
||||||
# PooledMixIn.get_request()).
|
# PooledMixIn.get_request()).
|
||||||
# See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html
|
# See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html
|
||||||
request_queue_size = 4096
|
request_queue_size = 4096
|
||||||
|
|
||||||
def process_request_thread(self, request, client_address):
|
def __init__(self, max_threads, options=warcprox.Options()):
|
||||||
|
PooledMixIn.__init__(self, max_threads)
|
||||||
|
|
||||||
|
if options.profile:
|
||||||
|
self.profilers = {}
|
||||||
|
self.process_request_thread = self._profile_process_request_thread
|
||||||
|
else:
|
||||||
|
self.profilers
|
||||||
|
self.process_request_thread = self._process_request_thread
|
||||||
|
|
||||||
|
def _profile_process_request_thread(self, request, client_address):
|
||||||
|
if not threading.current_thread().ident in self.profilers:
|
||||||
|
import cProfile
|
||||||
|
self.profilers[threading.current_thread().ident] = cProfile.Profile()
|
||||||
|
profiler = self.profilers[threading.current_thread().ident]
|
||||||
|
profiler.enable()
|
||||||
|
self._process_request_thread(request, client_address)
|
||||||
|
profiler.disable()
|
||||||
|
|
||||||
|
def _process_request_thread(self, request, client_address):
|
||||||
'''
|
'''
|
||||||
This an almost verbatim copy/paste of
|
This an almost verbatim copy/paste of
|
||||||
socketserver.ThreadingMixIn.process_request_thread.
|
socketserver.ThreadingMixIn.process_request_thread.
|
||||||
|
@ -425,7 +425,8 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
|
|||||||
self.logger.info(
|
self.logger.info(
|
||||||
"max_threads=%s set by command line option",
|
"max_threads=%s set by command line option",
|
||||||
options.max_threads)
|
options.max_threads)
|
||||||
warcprox.mitmproxy.PooledMitmProxy.__init__(self, options.max_threads)
|
warcprox.mitmproxy.PooledMitmProxy.__init__(
|
||||||
|
self, options.max_threads, options)
|
||||||
SingleThreadedWarcProxy.__init__(
|
SingleThreadedWarcProxy.__init__(
|
||||||
self, ca, recorded_url_q, stats_db, options)
|
self, ca, recorded_url_q, stats_db, options)
|
||||||
|
|
||||||
|
@ -33,7 +33,6 @@ import time
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
import warcprox
|
||||||
import cProfile
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
class WarcWriterThread(threading.Thread):
|
class WarcWriterThread(threading.Thread):
|
||||||
@ -59,7 +58,21 @@ class WarcWriterThread(threading.Thread):
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
if self.options.profile:
|
if self.options.profile:
|
||||||
cProfile.runctx('self._run()', globals(), locals(), sort='cumulative')
|
import cProfile
|
||||||
|
import pstats
|
||||||
|
import io
|
||||||
|
profiler = cProfile.Profile()
|
||||||
|
|
||||||
|
profiler.enable()
|
||||||
|
self._run()
|
||||||
|
profiler.disable()
|
||||||
|
|
||||||
|
buf = io.StringIO()
|
||||||
|
stats = pstats.Stats(profiler, stream=buf)
|
||||||
|
stats.sort_stats('cumulative')
|
||||||
|
stats.print_stats(0.1)
|
||||||
|
self.logger.notice(
|
||||||
|
'%s performance profile:\n%s', self, buf.getvalue())
|
||||||
else:
|
else:
|
||||||
self._run()
|
self._run()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user