diff --git a/README.rst b/README.rst index 397b930..113099b 100644 --- a/README.rst +++ b/README.rst @@ -46,7 +46,7 @@ have a method `notify(self, recorded_url, records)` or should subclass `warcprox.BasePostfetchProcessor`. More than one plugin can be configured by specifying `--plugin` multiples times. -XXX example? +`A minimal example `__ Usage ~~~~~ diff --git a/setup.py b/setup.py index 5169090..6a79e63 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev154', + version='2.4b2.dev157', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 33af61a..76abafa 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -235,7 +235,7 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.listener.stop() except: self.logger.error( - '%s raised exception', listener.stop, exc_info=True) + '%s raised exception', self.listener.stop, exc_info=True) def timestamp17(): now = datetime.datetime.utcnow() diff --git a/warcprox/main.py b/warcprox/main.py index 64d01c7..8ff466b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -166,6 +166,10 @@ def _build_arg_parser(prog='warcprox'): arg_parser.add_argument( '--socket-timeout', dest='socket_timeout', type=float, default=None, help=argparse.SUPPRESS) + # Increasing this value increases memory usage but reduces /tmp disk I/O. + arg_parser.add_argument( + '--tmp-file-max-memory-size', dest='tmp_file_max_memory_size', + type=int, default=512*1024, help=argparse.SUPPRESS) arg_parser.add_argument( '--max-resource-size', dest='max_resource_size', type=int, default=None, help='maximum resource size limit in bytes') diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 1bbd930..2776a97 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -64,6 +64,7 @@ import urlcanon import time import collections import cProfile +from urllib3.util import is_connection_dropped class ProxyingRecorder(object): """ @@ -73,10 +74,11 @@ class ProxyingRecorder(object): logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder") - def __init__(self, fp, proxy_client, digest_algorithm='sha1', url=None): + def __init__(self, fp, proxy_client, digest_algorithm='sha1', url=None, + tmp_file_max_memory_size=524288): self.fp = fp # "The file has no name, and will cease to exist when it is closed." - self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) + self.tempfile = tempfile.SpooledTemporaryFile(max_size=tmp_file_max_memory_size) self.digest_algorithm = digest_algorithm self.block_digest = hashlib.new(digest_algorithm) self.payload_offset = None @@ -146,7 +148,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): ''' def __init__( self, sock, debuglevel=0, method=None, proxy_client=None, - digest_algorithm='sha1', url=None): + digest_algorithm='sha1', url=None, tmp_file_max_memory_size=None): http_client.HTTPResponse.__init__( self, sock, debuglevel=debuglevel, method=method) self.proxy_client = proxy_client @@ -156,7 +158,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): # Keep around extra reference to self.fp because HTTPResponse sets # self.fp=None after it finishes reading, but we still need it self.recorder = ProxyingRecorder( - self.fp, proxy_client, digest_algorithm, url=url) + self.fp, proxy_client, digest_algorithm, url=url, + tmp_file_max_memory_size=tmp_file_max_memory_size) self.fp = self.recorder self.payload_digest = None @@ -208,6 +211,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") _socket_timeout = 60 _max_resource_size = None + _tmp_file_max_memory_size = 512 * 1024 def __init__(self, request, client_address, server): threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) @@ -236,44 +240,55 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.hostname = urlcanon.normalize_host(host).decode('ascii') def _connect_to_remote_server(self): - # Connect to destination - if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'): - self.logger.info( - "using tor socks proxy at %s:%s to connect to %s", - self.onion_tor_socks_proxy_host, - self.onion_tor_socks_proxy_port or 1080, self.hostname) - self._remote_server_sock = socks.socksocket() - self._remote_server_sock.set_proxy( - socks.SOCKS5, addr=self.onion_tor_socks_proxy_host, - port=self.onion_tor_socks_proxy_port, rdns=True) - else: - self._remote_server_sock = socket.socket() - self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + ''' + Connect to destination. + Note that connection_from_host has hard-coded `scheme='http'` + to avoid internal urllib3 logic when scheme is https. We handle ssl and + socks inside the current method. + self._conn_pool._get_conn() will either return an existing connection + or a new one. If its new, it needs initialization. + ''' + self._conn_pool = self.server.remote_connection_pool.connection_from_host( + host=self.hostname, port=int(self.port), scheme='http', + pool_kwargs={'maxsize': 6}) - self._remote_server_sock.settimeout(self._socket_timeout) - self._remote_server_sock.connect((self.hostname, int(self.port))) + self._remote_server_conn = self._conn_pool._get_conn() + if is_connection_dropped(self._remote_server_conn): + if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'): + self.logger.info( + "using tor socks proxy at %s:%s to connect to %s", + self.onion_tor_socks_proxy_host, + self.onion_tor_socks_proxy_port or 1080, self.hostname) + self._remote_server_conn.sock = socks.socksocket() + self._remote_server_conn.sock.set_proxy( + socks.SOCKS5, addr=self.onion_tor_socks_proxy_host, + port=self.onion_tor_socks_proxy_port, rdns=True) + self._remote_server_conn.timeout = self._socket_timeout + self._remote_server_conn.sock.connect((self.hostname, int(self.port))) + else: + self._remote_server_conn.timeout = self._socket_timeout + self._remote_server_conn.connect() - # Wrap socket if SSL is required - if self.is_connect: - try: - context = ssl.create_default_context() - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - self._remote_server_sock = context.wrap_socket( - self._remote_server_sock, server_hostname=self.hostname) - except AttributeError: + # Wrap socket if SSL is required + if self.is_connect: try: - self._remote_server_sock = ssl.wrap_socket( - self._remote_server_sock) - except ssl.SSLError: - self.logger.warn( - "failed to establish ssl connection to %s; python " - "ssl library does not support SNI, considering " - "upgrading to python >= 2.7.9 or python 3.4", - self.hostname) + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + self._remote_server_conn.sock = context.wrap_socket( + self._remote_server_conn.sock, server_hostname=self.hostname) + except AttributeError: + try: + self._remote_server_conn.sock = ssl.wrap_socket( + self._remote_server_conn.sock) + except ssl.SSLError: + self.logger.warn( + "failed to establish ssl connection to %s; python " + "ssl library does not support SNI, considering " + "upgrading to python >= 2.7.9 or python 3.4", + self.hostname) raise - - return self._remote_server_sock + return self._remote_server_conn.sock def _transition_to_ssl(self): certfile = self.server.ca.get_wildcard_cert(self.hostname) @@ -420,12 +435,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.debug('sending to remote server req=%r', req) # Send it down the pipe! - self._remote_server_sock.sendall(req) + self._remote_server_conn.sock.sendall(req) prox_rec_res = ProxyingRecordingHTTPResponse( - self._remote_server_sock, proxy_client=self.connection, + self._remote_server_conn.sock, proxy_client=self.connection, digest_algorithm=self.server.digest_algorithm, - url=self.url, method=self.command) + url=self.url, method=self.command, + tmp_file_max_memory_size=self._tmp_file_max_memory_size) prox_rec_res.begin(extra_response_headers=extra_response_headers) buf = prox_rec_res.read(65536) @@ -440,11 +456,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): break self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) + # Let's close off the remote end. If remote connection is fine, + # put it back in the pool to reuse it later. + if not is_connection_dropped(self._remote_server_conn): + self._conn_pool._put_conn(self._remote_server_conn) + except: + self._remote_server_conn.sock.close() finally: - # Let's close off the remote end if prox_rec_res: prox_rec_res.close() - self._remote_server_sock.close() return req, prox_rec_res diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 5b42655..2f63a77 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -43,6 +43,7 @@ import warcprox import datetime import urlcanon import os +from urllib3 import PoolManager class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -173,7 +174,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): limits and block rules in the Warcprox-Meta request header, if any. Raises `warcprox.RequestBlockedByRule` if a rule has been enforced. Otherwise calls `MitmProxyHandler._connect_to_remote_server`, which - initializes `self._remote_server_sock`. + initializes `self._remote_server_conn`. ''' if 'Warcprox-Meta' in self.headers: warcprox_meta = json.loads(self.headers['Warcprox-Meta']) @@ -192,7 +193,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): warcprox_meta = json.loads(raw_warcprox_meta) del self.headers['Warcprox-Meta'] - remote_ip = self._remote_server_sock.getpeername()[0] + remote_ip = self._remote_server_conn.sock.getpeername()[0] timestamp = datetime.datetime.utcnow() extra_response_headers = {} if warcprox_meta and 'accept' in warcprox_meta and \ @@ -387,7 +388,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.status_callback = status_callback self.stats_db = stats_db self.options = options - + self.remote_connection_pool = PoolManager( + num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) server_address = ( options.address or 'localhost', options.port if options.port is not None else 8000) @@ -405,6 +407,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): WarcProxyHandler._socket_timeout = options.socket_timeout if options.max_resource_size: WarcProxyHandler._max_resource_size = options.max_resource_size + if options.tmp_file_max_memory_size: + WarcProxyHandler._tmp_file_max_memory_size = options.tmp_file_max_memory_size http_server.HTTPServer.__init__( self, server_address, WarcProxyHandler, bind_and_activate=True)