diff --git a/setup.py b/setup.py index 31481ea..9e87a09 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.0b2.dev31', + version='2.0b2.dev32', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/single-threaded-proxy.py b/tests/single-threaded-proxy.py index 5954fd1..dd5e709 100755 --- a/tests/single-threaded-proxy.py +++ b/tests/single-threaded-proxy.py @@ -1,27 +1,25 @@ #!/usr/bin/env python -# -# tests/single-threaded-proxy.py - single-threaded recording proxy, useful for -# debugging -# -# Copyright (C) 2015-2016 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# +""" +tests/single-threaded-proxy.py - single-threaded MITM proxy, useful for +debugging, does not write warcs -"""Useful for debugging. Does not write warcs.""" +Copyright (C) 2015-2016 Internet Archive + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +USA. +""" from __future__ import absolute_import diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 85960ec..6f48d30 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -48,6 +48,12 @@ import datetime import socks import tempfile import hashlib +try: + import socketserver +except ImportError: + import SocketServer as socketserver +import resource +import concurrent.futures class ProxyingRecorder(object): """ @@ -397,3 +403,67 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): def log_error(self, fmt, *args): self.logger.warn(fmt, *args) +class PooledMixIn(socketserver.ThreadingMixIn): + logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn") + def __init__(self, max_threads=None): + ''' + If max_threads is not supplied, calculates a reasonable value based + on system resource limits. + ''' + if not max_threads: + # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, + # more precisely on Linux, threads) that can be created for the + # real user ID of the calling process." + rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0] + rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2) + self.logger.info( + "max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)", + max_threads, rlimit_nproc, rlimit_nofile) + self.pool = concurrent.futures.ThreadPoolExecutor(max_threads) + + def process_request(self, request, client_address): + self.pool.submit(self.process_request_thread, request, client_address) + +class MitmProxy(http_server.HTTPServer): + def finish_request(self, request, client_address): + ''' + We override socketserver.BaseServer.finish_request to get at + MitmProxyHandler's self.request. A normal socket server's self.request + is set to `request` and never changes, but in our case, it may be + replaced with an SSL socket. The caller of this method (e.g. + self.process_request or PooledMitmProxy.process_request_thread) needs + to get a hold of that socket so it can close it. + ''' + req_handler = self.RequestHandlerClass(request, client_address, self) + return req_handler.request + + def process_request(self, request, client_address): + ''' + This an almost verbatim copy/paste of + socketserver.BaseServer.process_request. + The only difference is that it expects self.finish_request to return + the request (i.e. the socket). This new value of request is passed on + to self.shutdown_request. See the comment on self.finish_request for + the rationale. + ''' + request = self.finish_request(request, client_address) + self.shutdown_request(request) + +class PooledMitmProxy(PooledMixIn, MitmProxy): + def process_request_thread(self, request, client_address): + ''' + This an almost verbatim copy/paste of + socketserver.ThreadingMixIn.process_request_thread. + The only difference is that it expects self.finish_request to return + the request (i.e. the socket). This new value of request is passed on + to self.shutdown_request. See the comment on MitmProxy.finish_request + for the rationale. + ''' + try: + request = self.finish_request(request, client_address) + self.shutdown_request(request) + except: + self.handle_error(request, client_address) + self.shutdown_request(request) + diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 61cccf7..f9e07c3 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -43,8 +43,6 @@ from hanzo import warctools from certauth.certauth import CertificateAuthority import warcprox import datetime -import concurrent.futures -import resource import ipaddress import surt @@ -387,64 +385,31 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): self.options = options +class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): + logger = logging.getLogger("warcprox.warcproxy.WarcProxy") + + def __init__( + self, ca=None, recorded_url_q=None, stats_db=None, + options=warcprox.Options()): + if options.max_threads: + self.logger.info( + "max_threads=%s set by command line option", + options.max_threads) + warcprox.mitmproxy.PooledMitmProxy.__init__(self, options.max_threads) + SingleThreadedWarcProxy.__init__( + self, ca, recorded_url_q, stats_db, options) + def server_activate(self): http_server.HTTPServer.server_activate(self) - self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) + self.logger.info( + 'listening on %s:%s', self.server_address[0], + self.server_address[1]) def server_close(self): - self.logger.info('WarcProxy shutting down') + self.logger.info('shutting down') http_server.HTTPServer.server_close(self) def handle_error(self, request, client_address): - self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True) - - def finish_request(self, request, client_address): - ''' - We override socketserver.BaseServer.finish_request to get at - WarcProxyHandler's self.request. A normal socket server's self.request - is set to `request` and never changes, but in our case, it may be - replaced with an SSL socket. The caller of this method, e.g. - PooledMixIn.process_request_thread, needs to get a hold of that socket - so it can close it. - ''' - req_handler = WarcProxyHandler(request, client_address, self) - return req_handler.request - -class PooledMixIn(socketserver.ThreadingMixIn): - def process_request(self, request, client_address): - self.pool.submit(self.process_request_thread, request, client_address) - def process_request_thread(self, request, client_address): - ''' - This an almost verbatim copy/paste of - socketserver.ThreadingMixIn.process_request_thread. - The only difference is that it expects self.finish_request to return - a request. See the comment on SingleThreadedWarcProxy.finish_request - above. - ''' - try: - request = self.finish_request(request, client_address) - self.shutdown_request(request) - except: - self.handle_error(request, client_address) - self.shutdown_request(request) - -class WarcProxy(PooledMixIn, SingleThreadedWarcProxy): - logger = logging.getLogger("warcprox.warcproxy.WarcProxy") - - def __init__(self, *args, **kwargs): - SingleThreadedWarcProxy.__init__(self, *args, **kwargs) - if self.options.max_threads: - max_threads = self.options.max_threads - self.logger.info("max_threads=%s set by command line option", - max_threads) - else: - # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, - # more precisely on Linux, threads) that can be created for the - # real user ID of the calling process." - rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0] - rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2) - self.logger.info("max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)", - max_threads, rlimit_nproc, rlimit_nofile) - - self.pool = concurrent.futures.ThreadPoolExecutor(max_threads) + self.logger.warn( + "exception processing request %s from %s", request, + client_address, exc_info=True)