mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Remote server connection pool
Use urllib3 connection pooling to improve remote server connection speed. Our aim is to reuse socket connections to the same target hosts when possible. Initialize a `urllib3.PoolManager` in `SingleThreadedWarcProxy` and use it in `MitmProxyHandler` to connect to remote servers. Socket read / write and ssl / socks code is exactly the same, only the connection management changes. Use arbitratry settings: pool_size=2000 and maxsize=100 (number of connections per host) for now. Maybe we can come up with better values in the future.
This commit is contained in:
parent
1b4fbef26a
commit
3e165916f0
@ -65,6 +65,8 @@ import time
|
||||
import collections
|
||||
import cProfile
|
||||
|
||||
|
||||
|
||||
class ProxyingRecorder(object):
|
||||
"""
|
||||
Wraps a socket._fileobject, recording the bytes as they are read,
|
||||
@ -236,44 +238,54 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
||||
self.hostname = urlcanon.normalize_host(host).decode('ascii')
|
||||
|
||||
def _connect_to_remote_server(self):
|
||||
# Connect to destination
|
||||
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
|
||||
self.logger.info(
|
||||
"using tor socks proxy at %s:%s to connect to %s",
|
||||
self.onion_tor_socks_proxy_host,
|
||||
self.onion_tor_socks_proxy_port or 1080, self.hostname)
|
||||
self._remote_server_sock = socks.socksocket()
|
||||
self._remote_server_sock.set_proxy(
|
||||
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host,
|
||||
port=self.onion_tor_socks_proxy_port, rdns=True)
|
||||
else:
|
||||
self._remote_server_sock = socket.socket()
|
||||
self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
'''
|
||||
Connect to destination.
|
||||
Note that connection_from_host has hard-coded `scheme='http'`
|
||||
to avoid internal urllib3 logic when scheme is https. We handle ssl and
|
||||
socks inside the current method.
|
||||
self._conn_pool._get_conn() will either return an existing connection
|
||||
or a new one. If its new, it needs initialization.
|
||||
'''
|
||||
self._conn_pool = self.server.remote_connection_pool.connection_from_host(
|
||||
host=self.hostname, port=int(self.port), scheme='http',
|
||||
pool_kwargs={'maxsize': 100})
|
||||
|
||||
self._remote_server_sock.settimeout(self._socket_timeout)
|
||||
self._remote_server_sock.connect((self.hostname, int(self.port)))
|
||||
self._remote_server_conn = self._conn_pool._get_conn()
|
||||
if self._remote_server_conn.sock is None:
|
||||
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
|
||||
self.logger.info(
|
||||
"using tor socks proxy at %s:%s to connect to %s",
|
||||
self.onion_tor_socks_proxy_host,
|
||||
self.onion_tor_socks_proxy_port or 1080, self.hostname)
|
||||
self._remote_server_conn.sock = socks.socksocket()
|
||||
self._remote_server_sock.set_proxy(
|
||||
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host,
|
||||
port=self.onion_tor_socks_proxy_port, rdns=True)
|
||||
else:
|
||||
self._remote_server_conn.timeout = self._socket_timeout
|
||||
self._remote_server_conn.connect()
|
||||
|
||||
# Wrap socket if SSL is required
|
||||
if self.is_connect:
|
||||
try:
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
self._remote_server_sock = context.wrap_socket(
|
||||
self._remote_server_sock, server_hostname=self.hostname)
|
||||
except AttributeError:
|
||||
# Wrap socket if SSL is required
|
||||
if self.is_connect:
|
||||
try:
|
||||
self._remote_server_sock = ssl.wrap_socket(
|
||||
self._remote_server_sock)
|
||||
except ssl.SSLError:
|
||||
self.logger.warn(
|
||||
"failed to establish ssl connection to %s; python "
|
||||
"ssl library does not support SNI, considering "
|
||||
"upgrading to python >= 2.7.9 or python 3.4",
|
||||
self.hostname)
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
self._remote_server_conn.sock = context.wrap_socket(
|
||||
self._remote_server_conn.sock, server_hostname=self.hostname)
|
||||
except AttributeError:
|
||||
try:
|
||||
self._remote_server_conn.sock = ssl.wrap_socket(
|
||||
self._remote_server_conn.sock)
|
||||
except ssl.SSLError:
|
||||
self.logger.warn(
|
||||
"failed to establish ssl connection to %s; python "
|
||||
"ssl library does not support SNI, considering "
|
||||
"upgrading to python >= 2.7.9 or python 3.4",
|
||||
self.hostname)
|
||||
raise
|
||||
|
||||
return self._remote_server_sock
|
||||
return self._remote_server_conn.sock
|
||||
|
||||
def _transition_to_ssl(self):
|
||||
certfile = self.server.ca.get_wildcard_cert(self.hostname)
|
||||
@ -416,14 +428,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
||||
req += self.rfile.read(int(self.headers['Content-Length']))
|
||||
|
||||
prox_rec_res = None
|
||||
connection_is_fine = False
|
||||
try:
|
||||
self.logger.debug('sending to remote server req=%r', req)
|
||||
|
||||
# Send it down the pipe!
|
||||
self._remote_server_sock.sendall(req)
|
||||
self._remote_server_conn.sock.sendall(req)
|
||||
|
||||
prox_rec_res = ProxyingRecordingHTTPResponse(
|
||||
self._remote_server_sock, proxy_client=self.connection,
|
||||
self._remote_server_conn.sock, proxy_client=self.connection,
|
||||
digest_algorithm=self.server.digest_algorithm,
|
||||
url=self.url, method=self.command)
|
||||
prox_rec_res.begin(extra_response_headers=extra_response_headers)
|
||||
@ -439,12 +452,17 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
||||
self._max_resource_size, self.url)
|
||||
break
|
||||
|
||||
connection_is_fine = True
|
||||
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
|
||||
finally:
|
||||
# Let's close off the remote end
|
||||
# Let's close off the remote end. If remote connection is fine,
|
||||
# put it back in the pool to reuse it later.
|
||||
if prox_rec_res:
|
||||
prox_rec_res.close()
|
||||
self._remote_server_sock.close()
|
||||
if connection_is_fine:
|
||||
self._conn_pool._put_conn(self._remote_conn)
|
||||
else:
|
||||
self._remote_server_conn.sock.close()
|
||||
|
||||
return req, prox_rec_res
|
||||
|
||||
|
@ -43,6 +43,7 @@ import warcprox
|
||||
import datetime
|
||||
import urlcanon
|
||||
import os
|
||||
from urllib3 import PoolManager
|
||||
|
||||
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
'''
|
||||
@ -173,7 +174,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
limits and block rules in the Warcprox-Meta request header, if any.
|
||||
Raises `warcprox.RequestBlockedByRule` if a rule has been enforced.
|
||||
Otherwise calls `MitmProxyHandler._connect_to_remote_server`, which
|
||||
initializes `self._remote_server_sock`.
|
||||
initializes `self._remote_server_conn`.
|
||||
'''
|
||||
if 'Warcprox-Meta' in self.headers:
|
||||
warcprox_meta = json.loads(self.headers['Warcprox-Meta'])
|
||||
@ -192,7 +193,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
warcprox_meta = json.loads(raw_warcprox_meta)
|
||||
del self.headers['Warcprox-Meta']
|
||||
|
||||
remote_ip = self._remote_server_sock.getpeername()[0]
|
||||
remote_ip = self._remote_server_conn.sock.getpeername()[0]
|
||||
timestamp = datetime.datetime.utcnow()
|
||||
extra_response_headers = {}
|
||||
if warcprox_meta and 'accept' in warcprox_meta and \
|
||||
@ -387,7 +388,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
|
||||
self.status_callback = status_callback
|
||||
self.stats_db = stats_db
|
||||
self.options = options
|
||||
|
||||
self.remote_connection_pool = PoolManager(num_pools=2000)
|
||||
server_address = (
|
||||
options.address or 'localhost',
|
||||
options.port if options.port is not None else 8000)
|
||||
|
Loading…
x
Reference in New Issue
Block a user