From 89d987a181d650b2438251c878793a42388aa2e0 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 9 May 2019 10:03:16 +0000 Subject: [PATCH 1/6] Cache bad target hostname:port to avoid reconnection attempts If connection to a hostname:port fails, add it to a `TTLCache` with 60 sec expiration time. Subsequent requests to the same hostname:port return really quickly as we check the cache and avoid trying a new network connection. The short expiration time guarantees that if a host becomes OK again, we'll be able to connect to it quickly. Adding `cachetools` dependency was necessary as there isn't any other way to have an expiring in-memory cache using stdlib. The library doesn't have any other dependencies, it has good test coverage and seems maintained. It also supports Python 3.7. --- setup.py | 1 + warcprox/mitmproxy.py | 31 +++++++++++++++++++++++++++++-- warcprox/warcproxy.py | 7 +++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 2f3aeda..61f474e 100755 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ deps = [ 'cryptography>=2.3', 'idna>=2.5', 'PyYAML>=5.1', + 'cachetools', ] try: import concurrent.futures diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b29dcaf..c2c4183 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -77,6 +77,7 @@ import time import collections import cProfile from urllib3.util import is_connection_dropped +from urllib3.exceptions import NewConnectionError import doublethink class ProxyingRecorder(object): @@ -252,6 +253,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): query=u.query, fragment=u.fragment)) self.hostname = urlcanon.normalize_host(host).decode('ascii') + def _hostname_port_cache_key(self): + return '%s:%s' % (self.hostname, self.port) + def _connect_to_remote_server(self): ''' Connect to destination. @@ -380,7 +384,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self._determine_host_port() assert self.url - + # Check if target hostname:port is in `bad_hostnames_ports` cache + # to avoid retrying to connect. + with self.server.bad_hostnames_ports_lock: + hostname_port = self._hostname_port_cache_key() + if hostname_port in self.server.bad_hostnames_ports: + self.logger.info('Cannot connect to %s (cache)', + hostname_port) + self.send_error(502, 'message timed out') + return # Connect to destination self._connect_to_remote_server() except warcprox.RequestBlockedByRule as e: @@ -388,6 +400,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.info("%r: %r", self.requestline, e) return except Exception as e: + # If connection fails, add hostname:port to cache to avoid slow + # subsequent reconnection attempts. `NewConnectionError` can be + # caused by many types of errors which are handled by urllib3. + if type(e) in (socket.timeout, NewConnectionError): + with self.server.bad_hostnames_ports_lock: + host_port = self._hostname_port_cache_key() + self.server.bad_hostnames_ports[host_port] = 1 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) @@ -527,7 +548,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # put it back in the pool to reuse it later. if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) - except: + except Exception as e: + if type(e) in (socket.timeout, NewConnectionError): + with self.server.bad_hostnames_ports_lock: + hostname_port = self._hostname_port_cache_key() + self.server.bad_hostnames_ports[hostname_port] = 1 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 8898898..2d072b9 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -48,6 +48,8 @@ import tempfile import hashlib import doublethink import re +from threading import RLock +from cachetools import TTLCache class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): ''' @@ -431,6 +433,11 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.status_callback = status_callback self.stats_db = stats_db self.options = options + # TTLCache is not thread-safe. Access to the shared cache from multiple + # threads must be properly synchronized with an RLock according to ref: + # https://cachetools.readthedocs.io/en/latest/ + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = RLock() self.remote_connection_pool = PoolManager( num_pools=max(round(options.max_threads / 6), 200) if options.max_threads else 200) server_address = ( From bbe41bc900f2216639fe2aa8d3ce5a57abf5794a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 9 May 2019 15:57:01 +0000 Subject: [PATCH 2/6] Add bad_hostnames_ports in PlaybackProxy These vars are required also there in addition to `SingleThreadedWarcProxy`. --- warcprox/playback.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/warcprox/playback.py b/warcprox/playback.py index 91f86aa..8bfa42f 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -42,6 +42,7 @@ from warcprox.mitmproxy import MitmProxyHandler import warcprox import sqlite3 import threading +from cachetools import TTLCache class PlaybackProxyHandler(MitmProxyHandler): logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") @@ -219,6 +220,8 @@ class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): self.playback_index_db = playback_index_db self.warcs_dir = options.directory self.options = options + self.bad_hostnames_ports = TTLCache(maxsize=1024, ttl=60) + self.bad_hostnames_ports_lock = threading.RLock() def server_activate(self): http_server.HTTPServer.server_activate(self) From 75e789c15fc99518857fecccbb8d0553f03456a3 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 9 May 2019 20:44:47 +0000 Subject: [PATCH 3/6] Add entries to bad_hostnames_ports only on connection init Do not add entries to bad_hostnames_ports during connection running if an exception occurs. Do it only on connection init because for some unclear reason unit tests fail in that case. --- warcprox/mitmproxy.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index c2c4183..39469d5 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -549,12 +549,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) except Exception as e: - if type(e) in (socket.timeout, NewConnectionError): - with self.server.bad_hostnames_ports_lock: - hostname_port = self._hostname_port_cache_key() - self.server.bad_hostnames_ports[hostname_port] = 1 - self.logger.info('bad_hostnames_ports cache size: %d', - len(self.server.bad_hostnames_ports)) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise From 89041e83b4707a8e4229eef3e7384849c5f1c3fd Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 10 May 2019 07:32:42 +0000 Subject: [PATCH 4/6] Catch RemoteDisconnected case when starting downloading A common error is to connect to the remote server successfully but raise a `http_client.RemoteDisconnected` exception when trying to begin downloading. Its caused by call `prox_rec_res.begin(...)` which calls `http_client._read_status()`. In that case, we also add the target `hostname:port` to the `bad_hostnames_ports` cache. Modify 2 unit tests to clear the `bad_hostnames_ports` cache because localhost is added from previous tests and this breaks them. --- tests/test_warcprox.py | 8 ++++++++ warcprox/mitmproxy.py | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 6c49f0a..4323a6c 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1986,6 +1986,10 @@ def test_socket_timeout_response( def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): + # localhost:server_port was added to the `bad_hostnames_ports` cache by + # previous tests and this causes subsequent tests to fail. We clear it. + warcprox_.proxy.bad_hostnames_ports.clear() + url = 'http://localhost:%s/empty-response' % http_daemon.server_port response = requests.get(url, proxies=archiving_proxies, verify=False) assert response.status_code == 502 @@ -2001,6 +2005,10 @@ def test_payload_digest(warcprox_, http_daemon): Tests that digest is of RFC2616 "entity body" (transfer-decoded but not content-decoded) ''' + # localhost:server_port was added to the `bad_hostnames_ports` cache by + # previous tests and this causes subsequent tests to fail. We clear it. + warcprox_.proxy.bad_hostnames_ports.clear() + class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler): def __init__(self, url): self.path = url diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 39469d5..cc16281 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -549,6 +549,18 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) except Exception as e: + # A common error is to connect to the remote server successfully + # but raise a `RemoteDisconnected` exception when trying to begin + # downloading. Its caused by prox_rec_res.begin(...) which calls + # http_client._read_status(). In that case, the host is also bad + # and we must add it to `bad_hostnames_ports` cache. + if type(e) == http_client.RemoteDisconnected: + with self.server.bad_hostnames_ports_lock: + host_port = self._hostname_port_cache_key() + self.server.bad_hostnames_ports[host_port] = 1 + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise From f0d2898326810f03c00c26c4fc8b84727c32ad2a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 14 May 2019 19:08:30 +0000 Subject: [PATCH 5/6] Tighten up the use of the lock for the TTLCache Move out of the lock instructions that are thread safe. --- warcprox/mitmproxy.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index cc16281..65d2992 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -404,11 +404,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # subsequent reconnection attempts. `NewConnectionError` can be # caused by many types of errors which are handled by urllib3. if type(e) in (socket.timeout, NewConnectionError): + host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - host_port = self._hostname_port_cache_key() self.server.bad_hostnames_ports[host_port] = 1 - self.logger.info('bad_hostnames_ports cache size: %d', - len(self.server.bad_hostnames_ports)) + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) @@ -555,11 +555,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): # http_client._read_status(). In that case, the host is also bad # and we must add it to `bad_hostnames_ports` cache. if type(e) == http_client.RemoteDisconnected: + host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - host_port = self._hostname_port_cache_key() self.server.bad_hostnames_ports[host_port] = 1 - self.logger.info('bad_hostnames_ports cache size: %d', - len(self.server.bad_hostnames_ports)) + self.logger.info('bad_hostnames_ports cache size: %d', + len(self.server.bad_hostnames_ports)) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() From 5b30dd4576e5f9b1558c8cff5c4aa0102d2aa715 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 14 May 2019 19:35:46 +0000 Subject: [PATCH 6/6] Cache error status and message Instead of returning a generic error status and message when hitting the bad_hostnames_ports cache, we cache and return the original error. --- warcprox/mitmproxy.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 65d2992..b158162 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -385,14 +385,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._determine_host_port() assert self.url # Check if target hostname:port is in `bad_hostnames_ports` cache - # to avoid retrying to connect. + # to avoid retrying to connect. cached is a tuple containing + # (status_code, error message) + cached = None + hostname_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - hostname_port = self._hostname_port_cache_key() - if hostname_port in self.server.bad_hostnames_ports: - self.logger.info('Cannot connect to %s (cache)', - hostname_port) - self.send_error(502, 'message timed out') - return + cached = self.server.bad_hostnames_ports.get(hostname_port) + if cached: + self.logger.info('Cannot connect to %s (cache)', hostname_port) + self.send_error(cached[0], cached[1]) + return # Connect to destination self._connect_to_remote_server() except warcprox.RequestBlockedByRule as e: @@ -406,7 +408,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if type(e) in (socket.timeout, NewConnectionError): host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - self.server.bad_hostnames_ports[host_port] = 1 + self.server.bad_hostnames_ports[host_port] = (500, str(e)) self.logger.info('bad_hostnames_ports cache size: %d', len(self.server.bad_hostnames_ports)) self.logger.error( @@ -557,7 +559,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if type(e) == http_client.RemoteDisconnected: host_port = self._hostname_port_cache_key() with self.server.bad_hostnames_ports_lock: - self.server.bad_hostnames_ports[host_port] = 1 + self.server.bad_hostnames_ports[host_port] = (502, str(e)) self.logger.info('bad_hostnames_ports cache size: %d', len(self.server.bad_hostnames_ports))