Merge pull request #72 from vbanos/remote-conn-pool

Remote server connection pooling
This commit is contained in:
Noah Levitt 2018-03-20 10:52:14 -07:00 committed by GitHub
commit 3ece9cbe6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 41 deletions

View File

@ -64,6 +64,7 @@ import urlcanon
import time
import collections
import cProfile
from urllib3.util import is_connection_dropped
class ProxyingRecorder(object):
"""
@ -239,44 +240,55 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.hostname = urlcanon.normalize_host(host).decode('ascii')
def _connect_to_remote_server(self):
# Connect to destination
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
self.logger.info(
"using tor socks proxy at %s:%s to connect to %s",
self.onion_tor_socks_proxy_host,
self.onion_tor_socks_proxy_port or 1080, self.hostname)
self._remote_server_sock = socks.socksocket()
self._remote_server_sock.set_proxy(
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host,
port=self.onion_tor_socks_proxy_port, rdns=True)
else:
self._remote_server_sock = socket.socket()
self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
'''
Connect to destination.
Note that connection_from_host has hard-coded `scheme='http'`
to avoid internal urllib3 logic when scheme is https. We handle ssl and
socks inside the current method.
self._conn_pool._get_conn() will either return an existing connection
or a new one. If its new, it needs initialization.
'''
self._conn_pool = self.server.remote_connection_pool.connection_from_host(
host=self.hostname, port=int(self.port), scheme='http',
pool_kwargs={'maxsize': 6})
self._remote_server_sock.settimeout(self._socket_timeout)
self._remote_server_sock.connect((self.hostname, int(self.port)))
self._remote_server_conn = self._conn_pool._get_conn()
if is_connection_dropped(self._remote_server_conn):
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
self.logger.info(
"using tor socks proxy at %s:%s to connect to %s",
self.onion_tor_socks_proxy_host,
self.onion_tor_socks_proxy_port or 1080, self.hostname)
self._remote_server_conn.sock = socks.socksocket()
self._remote_server_conn.sock.set_proxy(
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host,
port=self.onion_tor_socks_proxy_port, rdns=True)
self._remote_server_conn.timeout = self._socket_timeout
self._remote_server_conn.sock.connect((self.hostname, int(self.port)))
else:
self._remote_server_conn.timeout = self._socket_timeout
self._remote_server_conn.connect()
# Wrap socket if SSL is required
if self.is_connect:
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self._remote_server_sock = context.wrap_socket(
self._remote_server_sock, server_hostname=self.hostname)
except AttributeError:
# Wrap socket if SSL is required
if self.is_connect:
try:
self._remote_server_sock = ssl.wrap_socket(
self._remote_server_sock)
except ssl.SSLError:
self.logger.warn(
"failed to establish ssl connection to %s; python "
"ssl library does not support SNI, considering "
"upgrading to python >= 2.7.9 or python 3.4",
self.hostname)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self._remote_server_conn.sock = context.wrap_socket(
self._remote_server_conn.sock, server_hostname=self.hostname)
except AttributeError:
try:
self._remote_server_conn.sock = ssl.wrap_socket(
self._remote_server_conn.sock)
except ssl.SSLError:
self.logger.warn(
"failed to establish ssl connection to %s; python "
"ssl library does not support SNI, considering "
"upgrading to python >= 2.7.9 or python 3.4",
self.hostname)
raise
return self._remote_server_sock
return self._remote_server_conn.sock
def _transition_to_ssl(self):
certfile = self.server.ca.get_wildcard_cert(self.hostname)
@ -423,10 +435,10 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.debug('sending to remote server req=%r', req)
# Send it down the pipe!
self._remote_server_sock.sendall(req)
self._remote_server_conn.sock.sendall(req)
prox_rec_res = ProxyingRecordingHTTPResponse(
self._remote_server_sock, proxy_client=self.connection,
self._remote_server_conn.sock, proxy_client=self.connection,
digest_algorithm=self.server.digest_algorithm,
url=self.url, method=self.command,
tmp_file_max_memory_size=self._tmp_file_max_memory_size)
@ -444,11 +456,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
break
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
# Let's close off the remote end. If remote connection is fine,
# put it back in the pool to reuse it later.
if not is_connection_dropped(self._remote_server_conn):
self._conn_pool._put_conn(self._remote_server_conn)
except:
self._remote_server_conn.sock.close()
finally:
# Let's close off the remote end
if prox_rec_res:
prox_rec_res.close()
self._remote_server_sock.close()
return req, prox_rec_res

View File

@ -43,6 +43,7 @@ import warcprox
import datetime
import urlcanon
import os
from urllib3 import PoolManager
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'''
@ -173,7 +174,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
limits and block rules in the Warcprox-Meta request header, if any.
Raises `warcprox.RequestBlockedByRule` if a rule has been enforced.
Otherwise calls `MitmProxyHandler._connect_to_remote_server`, which
initializes `self._remote_server_sock`.
initializes `self._remote_server_conn`.
'''
if 'Warcprox-Meta' in self.headers:
warcprox_meta = json.loads(self.headers['Warcprox-Meta'])
@ -192,7 +193,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
warcprox_meta = json.loads(raw_warcprox_meta)
del self.headers['Warcprox-Meta']
remote_ip = self._remote_server_sock.getpeername()[0]
remote_ip = self._remote_server_conn.sock.getpeername()[0]
timestamp = datetime.datetime.utcnow()
extra_response_headers = {}
if warcprox_meta and 'accept' in warcprox_meta and \
@ -387,7 +388,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.status_callback = status_callback
self.stats_db = stats_db
self.options = options
self.remote_connection_pool = PoolManager(
num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200)
server_address = (
options.address or 'localhost',
options.port if options.port is not None else 8000)