Merge branch 'master' into qa

* master:
  bump version number after PR #72
  Fix SOCKS connection error
  Improve Connection Pool
  Reduce the PoolManager num_pools size and fix bugs
  bump dev version after PR #75
  bump dev version number
  Fix ListenerPostfetchProcessor typo
  Configurable tmp file max memory size
  Address unit test failure in Python 3.4
  a minimal example
  Extra connection evaluation before putting it back to the pool
  Fix typo
  Remove whitespace
  Remote server connection pool
This commit is contained in:
Noah Levitt 2018-03-20 10:53:21 -07:00
commit 1ee901c6fa
6 changed files with 77 additions and 49 deletions

View File

@ -46,7 +46,7 @@ have a method `notify(self, recorded_url, records)` or should subclass
`warcprox.BasePostfetchProcessor`. More than one plugin can be configured by `warcprox.BasePostfetchProcessor`. More than one plugin can be configured by
specifying `--plugin` multiples times. specifying `--plugin` multiples times.
XXX example? `A minimal example <https://github.com/internetarchive/warcprox/blob/318405e795ac0ab8760988a1a482cf0a17697148/warcprox/__init__.py#L165>`__
Usage Usage
~~~~~ ~~~~~

View File

@ -40,7 +40,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.4b2.dev154', version='2.4b2.dev157',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -235,7 +235,7 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
self.listener.stop() self.listener.stop()
except: except:
self.logger.error( self.logger.error(
'%s raised exception', listener.stop, exc_info=True) '%s raised exception', self.listener.stop, exc_info=True)
def timestamp17(): def timestamp17():
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()

View File

@ -166,6 +166,10 @@ def _build_arg_parser(prog='warcprox'):
arg_parser.add_argument( arg_parser.add_argument(
'--socket-timeout', dest='socket_timeout', type=float, '--socket-timeout', dest='socket_timeout', type=float,
default=None, help=argparse.SUPPRESS) default=None, help=argparse.SUPPRESS)
# Increasing this value increases memory usage but reduces /tmp disk I/O.
arg_parser.add_argument(
'--tmp-file-max-memory-size', dest='tmp_file_max_memory_size',
type=int, default=512*1024, help=argparse.SUPPRESS)
arg_parser.add_argument( arg_parser.add_argument(
'--max-resource-size', dest='max_resource_size', type=int, '--max-resource-size', dest='max_resource_size', type=int,
default=None, help='maximum resource size limit in bytes') default=None, help='maximum resource size limit in bytes')

View File

@ -64,6 +64,7 @@ import urlcanon
import time import time
import collections import collections
import cProfile import cProfile
from urllib3.util import is_connection_dropped
class ProxyingRecorder(object): class ProxyingRecorder(object):
""" """
@ -73,10 +74,11 @@ class ProxyingRecorder(object):
logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder") logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder")
def __init__(self, fp, proxy_client, digest_algorithm='sha1', url=None): def __init__(self, fp, proxy_client, digest_algorithm='sha1', url=None,
tmp_file_max_memory_size=524288):
self.fp = fp self.fp = fp
# "The file has no name, and will cease to exist when it is closed." # "The file has no name, and will cease to exist when it is closed."
self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) self.tempfile = tempfile.SpooledTemporaryFile(max_size=tmp_file_max_memory_size)
self.digest_algorithm = digest_algorithm self.digest_algorithm = digest_algorithm
self.block_digest = hashlib.new(digest_algorithm) self.block_digest = hashlib.new(digest_algorithm)
self.payload_offset = None self.payload_offset = None
@ -146,7 +148,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
''' '''
def __init__( def __init__(
self, sock, debuglevel=0, method=None, proxy_client=None, self, sock, debuglevel=0, method=None, proxy_client=None,
digest_algorithm='sha1', url=None): digest_algorithm='sha1', url=None, tmp_file_max_memory_size=None):
http_client.HTTPResponse.__init__( http_client.HTTPResponse.__init__(
self, sock, debuglevel=debuglevel, method=method) self, sock, debuglevel=debuglevel, method=method)
self.proxy_client = proxy_client self.proxy_client = proxy_client
@ -156,7 +158,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
# Keep around extra reference to self.fp because HTTPResponse sets # Keep around extra reference to self.fp because HTTPResponse sets
# self.fp=None after it finishes reading, but we still need it # self.fp=None after it finishes reading, but we still need it
self.recorder = ProxyingRecorder( self.recorder = ProxyingRecorder(
self.fp, proxy_client, digest_algorithm, url=url) self.fp, proxy_client, digest_algorithm, url=url,
tmp_file_max_memory_size=tmp_file_max_memory_size)
self.fp = self.recorder self.fp = self.recorder
self.payload_digest = None self.payload_digest = None
@ -208,6 +211,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler")
_socket_timeout = 60 _socket_timeout = 60
_max_resource_size = None _max_resource_size = None
_tmp_file_max_memory_size = 512 * 1024
def __init__(self, request, client_address, server): def __init__(self, request, client_address, server):
threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1])
@ -236,44 +240,55 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.hostname = urlcanon.normalize_host(host).decode('ascii') self.hostname = urlcanon.normalize_host(host).decode('ascii')
def _connect_to_remote_server(self): def _connect_to_remote_server(self):
# Connect to destination '''
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'): Connect to destination.
self.logger.info( Note that connection_from_host has hard-coded `scheme='http'`
"using tor socks proxy at %s:%s to connect to %s", to avoid internal urllib3 logic when scheme is https. We handle ssl and
self.onion_tor_socks_proxy_host, socks inside the current method.
self.onion_tor_socks_proxy_port or 1080, self.hostname) self._conn_pool._get_conn() will either return an existing connection
self._remote_server_sock = socks.socksocket() or a new one. If its new, it needs initialization.
self._remote_server_sock.set_proxy( '''
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host, self._conn_pool = self.server.remote_connection_pool.connection_from_host(
port=self.onion_tor_socks_proxy_port, rdns=True) host=self.hostname, port=int(self.port), scheme='http',
else: pool_kwargs={'maxsize': 6})
self._remote_server_sock = socket.socket()
self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._remote_server_sock.settimeout(self._socket_timeout) self._remote_server_conn = self._conn_pool._get_conn()
self._remote_server_sock.connect((self.hostname, int(self.port))) if is_connection_dropped(self._remote_server_conn):
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
self.logger.info(
"using tor socks proxy at %s:%s to connect to %s",
self.onion_tor_socks_proxy_host,
self.onion_tor_socks_proxy_port or 1080, self.hostname)
self._remote_server_conn.sock = socks.socksocket()
self._remote_server_conn.sock.set_proxy(
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host,
port=self.onion_tor_socks_proxy_port, rdns=True)
self._remote_server_conn.timeout = self._socket_timeout
self._remote_server_conn.sock.connect((self.hostname, int(self.port)))
else:
self._remote_server_conn.timeout = self._socket_timeout
self._remote_server_conn.connect()
# Wrap socket if SSL is required # Wrap socket if SSL is required
if self.is_connect: if self.is_connect:
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self._remote_server_sock = context.wrap_socket(
self._remote_server_sock, server_hostname=self.hostname)
except AttributeError:
try: try:
self._remote_server_sock = ssl.wrap_socket( context = ssl.create_default_context()
self._remote_server_sock) context.check_hostname = False
except ssl.SSLError: context.verify_mode = ssl.CERT_NONE
self.logger.warn( self._remote_server_conn.sock = context.wrap_socket(
"failed to establish ssl connection to %s; python " self._remote_server_conn.sock, server_hostname=self.hostname)
"ssl library does not support SNI, considering " except AttributeError:
"upgrading to python >= 2.7.9 or python 3.4", try:
self.hostname) self._remote_server_conn.sock = ssl.wrap_socket(
self._remote_server_conn.sock)
except ssl.SSLError:
self.logger.warn(
"failed to establish ssl connection to %s; python "
"ssl library does not support SNI, considering "
"upgrading to python >= 2.7.9 or python 3.4",
self.hostname)
raise raise
return self._remote_server_conn.sock
return self._remote_server_sock
def _transition_to_ssl(self): def _transition_to_ssl(self):
certfile = self.server.ca.get_wildcard_cert(self.hostname) certfile = self.server.ca.get_wildcard_cert(self.hostname)
@ -420,12 +435,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.debug('sending to remote server req=%r', req) self.logger.debug('sending to remote server req=%r', req)
# Send it down the pipe! # Send it down the pipe!
self._remote_server_sock.sendall(req) self._remote_server_conn.sock.sendall(req)
prox_rec_res = ProxyingRecordingHTTPResponse( prox_rec_res = ProxyingRecordingHTTPResponse(
self._remote_server_sock, proxy_client=self.connection, self._remote_server_conn.sock, proxy_client=self.connection,
digest_algorithm=self.server.digest_algorithm, digest_algorithm=self.server.digest_algorithm,
url=self.url, method=self.command) url=self.url, method=self.command,
tmp_file_max_memory_size=self._tmp_file_max_memory_size)
prox_rec_res.begin(extra_response_headers=extra_response_headers) prox_rec_res.begin(extra_response_headers=extra_response_headers)
buf = prox_rec_res.read(65536) buf = prox_rec_res.read(65536)
@ -440,11 +456,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
break break
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
# Let's close off the remote end. If remote connection is fine,
# put it back in the pool to reuse it later.
if not is_connection_dropped(self._remote_server_conn):
self._conn_pool._put_conn(self._remote_server_conn)
except:
self._remote_server_conn.sock.close()
finally: finally:
# Let's close off the remote end
if prox_rec_res: if prox_rec_res:
prox_rec_res.close() prox_rec_res.close()
self._remote_server_sock.close()
return req, prox_rec_res return req, prox_rec_res

View File

@ -43,6 +43,7 @@ import warcprox
import datetime import datetime
import urlcanon import urlcanon
import os import os
from urllib3 import PoolManager
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
''' '''
@ -173,7 +174,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
limits and block rules in the Warcprox-Meta request header, if any. limits and block rules in the Warcprox-Meta request header, if any.
Raises `warcprox.RequestBlockedByRule` if a rule has been enforced. Raises `warcprox.RequestBlockedByRule` if a rule has been enforced.
Otherwise calls `MitmProxyHandler._connect_to_remote_server`, which Otherwise calls `MitmProxyHandler._connect_to_remote_server`, which
initializes `self._remote_server_sock`. initializes `self._remote_server_conn`.
''' '''
if 'Warcprox-Meta' in self.headers: if 'Warcprox-Meta' in self.headers:
warcprox_meta = json.loads(self.headers['Warcprox-Meta']) warcprox_meta = json.loads(self.headers['Warcprox-Meta'])
@ -192,7 +193,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
warcprox_meta = json.loads(raw_warcprox_meta) warcprox_meta = json.loads(raw_warcprox_meta)
del self.headers['Warcprox-Meta'] del self.headers['Warcprox-Meta']
remote_ip = self._remote_server_sock.getpeername()[0] remote_ip = self._remote_server_conn.sock.getpeername()[0]
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
extra_response_headers = {} extra_response_headers = {}
if warcprox_meta and 'accept' in warcprox_meta and \ if warcprox_meta and 'accept' in warcprox_meta and \
@ -387,7 +388,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.status_callback = status_callback self.status_callback = status_callback
self.stats_db = stats_db self.stats_db = stats_db
self.options = options self.options = options
self.remote_connection_pool = PoolManager(
num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200)
server_address = ( server_address = (
options.address or 'localhost', options.address or 'localhost',
options.port if options.port is not None else 8000) options.port if options.port is not None else 8000)
@ -405,6 +407,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
WarcProxyHandler._socket_timeout = options.socket_timeout WarcProxyHandler._socket_timeout = options.socket_timeout
if options.max_resource_size: if options.max_resource_size:
WarcProxyHandler._max_resource_size = options.max_resource_size WarcProxyHandler._max_resource_size = options.max_resource_size
if options.tmp_file_max_memory_size:
WarcProxyHandler._tmp_file_max_memory_size = options.tmp_file_max_memory_size
http_server.HTTPServer.__init__( http_server.HTTPServer.__init__(
self, server_address, WarcProxyHandler, bind_and_activate=True) self, server_address, WarcProxyHandler, bind_and_activate=True)