diff --git a/warcprox/main.py b/warcprox/main.py index 0854cee..bcff5b0 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -78,7 +78,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default=None, help='kafka broker list for capture feed') arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic', default=None, help='kafka capture feed topic') - arg_parser.add_argument('--queue-size', dest='queue_size', default=1000, + arg_parser.add_argument('--queue-size', dest='queue_size', default=500, + help=argparse.SUPPRESS) + arg_parser.add_argument('--max-threads', dest='max_threads', default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--profile', action='store_true', default=False, help=argparse.SUPPRESS) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 532c221..3df9f33 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -1,5 +1,3 @@ -# vim:set sw=4 et: - from __future__ import absolute_import try: @@ -17,14 +15,16 @@ import logging import ssl import warcprox import threading +import datetime class MitmProxyHandler(http_server.BaseHTTPRequestHandler): logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") def __init__(self, request, client_address, server): - threading.current_thread.name = 'MitmProxyHandler-thread(tid={})'.format(warcprox.gettid()) + threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) self.is_connect = False self._headers_buffer = [] + request.settimeout(60) # XXX what value should this have? http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server) def _determine_host_port(self): @@ -52,7 +52,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): def _connect_to_host(self): # Connect to destination self._proxy_sock = socket.socket() - self._proxy_sock.settimeout(60) + self._proxy_sock.settimeout(60) # XXX what value should this have? self._proxy_sock.connect((self.hostname, int(self.port))) # Wrap socket if SSL is required @@ -146,4 +146,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if item.startswith('do_'): return self.do_COMMAND + def log_error(self, fmt, *args): + self.logger.warn(fmt, *args) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 93107b1..2b83564 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -35,6 +35,7 @@ from hanzo import warctools from certauth.certauth import CertificateAuthority import warcprox import datetime +import concurrent.futures class ProxyingRecorder(object): """ @@ -294,10 +295,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise - def log_error(self, fmt, *args): - # logging better handled elsewhere? - pass - def log_message(self, fmt, *args): # logging better handled elsewhere? pass @@ -385,5 +382,14 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): def handle_error(self, request, client_address): self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True) -class WarcProxy(socketserver.ThreadingMixIn, SingleThreadedWarcProxy): - pass +class PooledMixIn(socketserver.ThreadingMixIn): + def process_request(self, request, client_address): + if hasattr(self, 'pool') and self.pool: + self.pool.submit(self.process_request_thread, request, client_address) + else: + socketserver.ThreadingMixIn.process_request(self, request, client_address) + +class WarcProxy(PooledMixIn, SingleThreadedWarcProxy): + def __init__(self, *args, **kwargs): + SingleThreadedWarcProxy.__init__(self, *args, **kwargs) + self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.options.max_threads or 500)