From e989b2f667e69b553807a668ab4c78afa70e16bb Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 3 Apr 2018 11:12:25 -0700 Subject: [PATCH 01/31] work around odd problem (see comment in code) --- setup.py | 2 +- warcprox/mitmproxy.py | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 629ed64..17e03c4 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev159', + version='2.4b2.dev160', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 8778822..fdcd8a1 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -390,6 +390,28 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.send_error(502, str(e)) return + def send_error(self, code, message=None, explain=None): + # BaseHTTPRequestHandler.send_response_only() in http/server.py + # does this: + # if not hasattr(self, '_headers_buffer'): + # self._headers_buffer = [] + # but we sometimes see self._headers_buffer == None + # (This happened before! see commit dc9fdc34125dd2357) + # Workaround: + if hasattr(self, '_headers_buffer') and not self._headers_buffer: + self._headers_buffer = [] + try: + return http_server.BaseHTTPRequestHandler.send_error( + self, code, message, explain) + except: + self.logger.error( + '''WTF: self=%r hasattr(self,'_headers_buffer')=%r''', + self, hasattr(self,'_headers_buffer')) + if hasattr(self,'_headers_buffer'): + self.logger.error( + 'WTF: self._headers_buffer=%r', self._headers_buffer) + return None + def _proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to From 08aada3ca92423d7dcf4529ebcded71185ab1970 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 3 Apr 2018 11:15:48 -0700 Subject: [PATCH 02/31] this is some logging meant to debug the mysterious MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test failure we've been seeing which so far has made the problem go away(!?!?) 😀😞 ¯\_(ツ)_/¯ 😞😀 ¯\_(ツ)_/¯ 😀😞 ¯\_(ツ)_/¯ 😞😀 here is the last time the failure happened: https://travis-ci.org/internetarchive/warcprox/jobs/361409280 --- tests/test_warcprox.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 9d6db7d..26e1017 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -315,9 +315,17 @@ def cert(request): finally: f.close() +class UhhhServer(http_server.HTTPServer): + def get_request(self): + try: + return self.socket.accept() + except: + logging.error('socket.accept() raised exception', exc_info=True) + raise + @pytest.fixture(scope="module") def http_daemon(request): - http_daemon = http_server.HTTPServer( + http_daemon = UhhhServer( ('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler) logging.info('starting http://{}:{}'.format(http_daemon.server_address[0], http_daemon.server_address[1])) http_daemon_thread = threading.Thread(name='HttpDaemonThread', From c2b2a844d9f472409adb029d27b7cd3edfe6db0f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 10:22:02 -0700 Subject: [PATCH 03/31] remove some debug logging --- warcprox/mitmproxy.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index fdcd8a1..1b7dcbd 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -405,11 +405,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self, code, message, explain) except: self.logger.error( - '''WTF: self=%r hasattr(self,'_headers_buffer')=%r''', - self, hasattr(self,'_headers_buffer')) - if hasattr(self,'_headers_buffer'): - self.logger.error( - 'WTF: self._headers_buffer=%r', self._headers_buffer) + 'send_error(%r, %r, %r) raised exception', exc_info=True) return None def _proxy_request(self, extra_response_headers={}): From 2fa0f232b710879e9ed8363e786e25119921b54a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 15:35:28 -0700 Subject: [PATCH 04/31] more logging --- warcprox/writer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/warcprox/writer.py b/warcprox/writer.py index e8f4b79..9ab249c 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -88,6 +88,7 @@ class _OneWritableWarc: os.mkdir(self.directory) self.finalname = self.next_filename(serial) + self.logger.trace('opening %s', self.finalname) self.path = os.path.sep.join( [self.directory, self.finalname + self.open_suffix]) From 8ac0420cb2dc1c5e475a811919d26a94bbdd569b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 15:28:36 -0700 Subject: [PATCH 05/31] enable keepalive on test http server As of fairly recently, warcprox does keepalive with the remote server using the urllib3 connection pool. The test http server in test_warcprox.py was acting as if it supported keepalive (sending HTTP/1.1 and not sending "Connection: close"). But in fact it did not support keepalive. It was closing the connection after each request. Depending on the timing of what was happening in different threads, sometimes the client thread would try to send another request on a connection it still believed to be open for keepalive. Then the server side would complete its request processing and close the connection. This resulted in test failures with error messages like this (depending on python version): 2018-04-03 21:20:06,555 12586 ERROR MainThread warcprox.mitmproxy.MitmProxyHandler.do_COMMAND(mitmproxy.py:389) error from remote server(?) None: BadStatusLine("''",) 2018-04-04 19:06:29,599 11632 ERROR MainThread warcprox.mitmproxy.MitmProxyHandler.do_COMMAND(mitmproxy.py:389) error from remote server(?) None: RemoteDisconnected('Remote end closed connection without response',) For instance https://travis-ci.org/internetarchive/warcprox/jobs/362288603 --- tests/test_warcprox.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 26e1017..d175309 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -166,6 +166,9 @@ def chunkify(buf, chunk_size=13): # return outbuf.getvalue() class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + # enable keepalive + protocol_version = 'HTTP/1.1' + def build_response(self): m = re.match(r'^/([^/]+)/([^/]+)$', self.path) if m is not None: @@ -346,7 +349,6 @@ def https_daemon(request, cert): # http://www.piware.de/2011/01/creating-an-https-server-in-python/ https_daemon = http_server.HTTPServer(('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler) - # https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True) https_daemon.socket = ssl.wrap_socket(https_daemon.socket, certfile=cert, server_side=True) logging.info('starting https://{}:{}'.format(https_daemon.server_address[0], https_daemon.server_address[1])) https_daemon_thread = threading.Thread(name='HttpsDaemonThread', From 3f9ecbacaca861f42a6d05d1c514e1c01c4e1227 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 15:29:16 -0700 Subject: [PATCH 06/31] tweak tests to make them pass now that keepalive is enabled on the test server --- tests/test_warcprox.py | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index d175309..9b789a4 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -279,6 +279,11 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): headers, payload = self.build_response() self.connection.sendall(headers) self.connection.sendall(payload) + if self.path == '/missing-content-length': + # response without content-length (and not chunked) must close the + # connection, else client has no idea if there is more data coming + self.connection.shutdown(socket.SHUT_RDWR) + self.connection.close() def do_HEAD(self): logging.info('HEAD {}'.format(self.path)) @@ -364,8 +369,11 @@ def https_daemon(request, cert): return https_daemon +# specify http_daemon and https_daemon as dependencies so that their finalizers +# run after warcprox is shut down, otherwise warcprox holds connections open +# and prevents the servers from shutting down @pytest.fixture(scope="module") -def warcprox_(request): +def warcprox_(request, http_daemon, https_daemon): orig_dir = os.getcwd() work_dir = tempfile.mkdtemp() logging.info('changing to working directory %r', work_dir) @@ -971,6 +979,14 @@ def test_domain_doc_soft_limit( http_daemon, https_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls + # we need to clear the connection pool here because + # - connection pool already may already have an open connection localhost + # - we're about to make a connection to foo.localhost + # - but our test server, which handles all the hosts, is single threaded + # - so it will fail to connect (socket timeout) + # must close connections before each connection to a different hostname + warcprox_.proxy.remote_connection_pool.clear() + request_meta = { "stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]}, "soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10}, @@ -988,6 +1004,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + warcprox_.proxy.remote_connection_pool.clear() + # make sure stats from different domain don't count url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port) for i in range(10): @@ -1000,6 +1018,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11) + warcprox_.proxy.remote_connection_pool.clear() + # (2) same host but different scheme and port: domain limit applies url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1009,6 +1029,8 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' + warcprox_.proxy.remote_connection_pool.clear() + # (3-9) different subdomain: host limit applies url = 'https://baz.foo.localhost:{}/o/p'.format(https_daemon.server_port) for i in range(7): @@ -1037,6 +1059,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20) + warcprox_.proxy.remote_connection_pool.clear() + # (11) back to http, and this is the 11th request url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port) response = requests.get( @@ -1048,6 +1072,8 @@ def test_domain_doc_soft_limit( assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n" + warcprox_.proxy.remote_connection_pool.clear() + # make sure limit doesn't get applied to a different domain url = 'https://localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1060,6 +1086,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 21) + warcprox_.proxy.remote_connection_pool.clear() + # https also blocked url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1072,6 +1100,8 @@ def test_domain_doc_soft_limit( assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n" + warcprox_.proxy.remote_connection_pool.clear() + # same host, different capitalization still blocked url = 'https://HEHEHE.fOO.lOcALhoST:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1096,6 +1126,8 @@ def test_domain_data_soft_limit( } headers = {"Warcprox-Meta": json.dumps(request_meta)} + warcprox_.proxy.remote_connection_pool.clear() + url = 'http://ÞZz.localhost:{}/y/z'.format(http_daemon.server_port) response = requests.get( url, proxies=archiving_proxies, headers=headers, stream=True) @@ -1106,6 +1138,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + warcprox_.proxy.remote_connection_pool.clear() + # duplicate, does not count toward limit url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port) response = requests.get( @@ -1118,6 +1152,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + warcprox_.proxy.remote_connection_pool.clear() + # novel, pushes stats over the limit url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port) response = requests.get( @@ -1130,6 +1166,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) + warcprox_.proxy.remote_connection_pool.clear() + # make sure limit doesn't get applied to a different host url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port) response = requests.get( @@ -1141,6 +1179,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) + warcprox_.proxy.remote_connection_pool.clear() + # blocked because we're over the limit now url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port) response = requests.get( From 7c814d71ba3da27dd431848a82c8ae19408cc472 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 13:26:47 -0700 Subject: [PATCH 07/31] close all remote connections at shutdown to avoid hang --- warcprox/warcproxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 2f63a77..b6a0943 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -483,6 +483,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): def server_close(self): self.logger.info('shutting down') http_server.HTTPServer.server_close(self) + self.remote_connection_pool.clear() def handle_error(self, request, client_address): self.logger.warn( From 595e8199614df3fb53626d3c7c1c10bfa311ff03 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 15:30:13 -0700 Subject: [PATCH 08/31] test another request after truncated response to check for hangs or timeouts --- tests/test_warcprox.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 9b789a4..3936b73 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1276,14 +1276,23 @@ def test_limit_large_resource(archiving_proxies, http_daemon, warcprox_): """ urls_before = warcprox_.proxy.running_stats.urls + # this should be truncated url = 'http://localhost:%s/300k-content' % http_daemon.server_port response = requests.get( url, proxies=archiving_proxies, verify=False, timeout=10) assert len(response.content) == 262144 + # test that the connection is cleaned up properly after truncating a + # response (no hang or timeout) + url = 'http://localhost:%s/' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert response.status_code == 404 + assert response.content == b'404 Not Found\n' + # wait for processing of this url to finish so that it doesn't interfere # with subsequent tests - wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) def test_method_filter( warcprox_, https_daemon, http_daemon, archiving_proxies, From 7ef0612fa61785619da805ea689a5f3b84192daf Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 15:34:53 -0700 Subject: [PATCH 09/31] close connection when truncating response --- warcprox/mitmproxy.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 1b7dcbd..6d87470 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -465,13 +465,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): buf = prox_rec_res.read(65536) while buf != b'': buf = prox_rec_res.read(65536) - if self._max_resource_size: - if prox_rec_res.recorder.len > self._max_resource_size: - prox_rec_res.truncated = b'length' - self.logger.error( - 'Max resource size %d bytes exceeded for URL %s', + if (self._max_resource_size and + prox_rec_res.recorder.len > self._max_resource_size): + prox_rec_res.truncated = b'length' + self._remote_server_conn.sock.close() + self.logger.info( + 'truncating response because max resource size %d ' + 'bytes exceeded for URL %s', self._max_resource_size, self.url) - break + break self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) # Let's close off the remote end. If remote connection is fine, From ab52e81019c5bd243fb00a2cec5f308f132fb8db Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 15:45:50 -0700 Subject: [PATCH 10/31] bump dev version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 17e03c4..8207d7c 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev160', + version='2.4b2.dev161', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 385014c32298b254d154881062b8fe8d706e2174 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 4 Apr 2018 17:49:08 -0700 Subject: [PATCH 11/31] always call socket.shutdown() to close connections --- setup.py | 2 +- tests/test_warcprox.py | 14 +++++++++++--- warcprox/mitmproxy.py | 2 ++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 8207d7c..94d0061 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev161', + version='2.4b2.dev162', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 3936b73..32db17f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -279,9 +279,9 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): headers, payload = self.build_response() self.connection.sendall(headers) self.connection.sendall(payload) - if self.path == '/missing-content-length': - # response without content-length (and not chunked) must close the - # connection, else client has no idea if there is more data coming + if self.path in ('/missing-content-length', '/empty-response'): + # server must close the connection, else client has no idea if + # there is more data coming self.connection.shutdown(socket.SHUT_RDWR) self.connection.close() @@ -1823,6 +1823,14 @@ def test_socket_timeout_response( response = requests.get(url, proxies=archiving_proxies, verify=False) assert response.status_code == 502 + # test that the connection is cleaned up properly after truncating a + # response (no hang or timeout) + url = 'http://localhost:%s/' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert response.status_code == 404 + assert response.content == b'404 Not Found\n' + def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 6d87470..8bd1861 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -468,6 +468,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if (self._max_resource_size and prox_rec_res.recorder.len > self._max_resource_size): prox_rec_res.truncated = b'length' + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() self.logger.info( 'truncating response because max resource size %d ' @@ -481,6 +482,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) except: + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise finally: From 38e2a87f3169552dc1acb692d71b0fc79e9d69a1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 5 Apr 2018 17:59:10 -0700 Subject: [PATCH 12/31] make test server multithreaded so tests will pass --- tests/test_warcprox.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 32db17f..9f578a3 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -50,6 +50,7 @@ import io import gzip import mock import email.message +import socketserver try: import http.server as http_server @@ -323,17 +324,20 @@ def cert(request): finally: f.close() -class UhhhServer(http_server.HTTPServer): - def get_request(self): - try: - return self.socket.accept() - except: - logging.error('socket.accept() raised exception', exc_info=True) - raise +# We need this test server to accept multiple simultaneous connections in order +# to avoid mysterious looking test failures like these: +# https://travis-ci.org/internetarchive/warcprox/builds/362892231 +# This is because we can't guarantee (without jumping through hoops) that +# MitmProxyHandler._proxy_request() returns the connection to the pool before +# the next request tries to get a connection from the pool in +# MitmProxyHandler._connect_to_remote_server(). (Unless we run warcprox +# single-threaded for these tests, which maybe we should consider?) +class ThreadedHTTPServer(socketserver.ThreadingMixIn, http_server.HTTPServer): + pass @pytest.fixture(scope="module") def http_daemon(request): - http_daemon = UhhhServer( + http_daemon = ThreadedHTTPServer( ('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler) logging.info('starting http://{}:{}'.format(http_daemon.server_address[0], http_daemon.server_address[1])) http_daemon_thread = threading.Thread(name='HttpDaemonThread', @@ -352,7 +356,7 @@ def http_daemon(request): @pytest.fixture(scope="module") def https_daemon(request, cert): # http://www.piware.de/2011/01/creating-an-https-server-in-python/ - https_daemon = http_server.HTTPServer(('localhost', 0), + https_daemon = ThreadedHTTPServer(('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler) https_daemon.socket = ssl.wrap_socket(https_daemon.socket, certfile=cert, server_side=True) logging.info('starting https://{}:{}'.format(https_daemon.server_address[0], https_daemon.server_address[1])) From cff8423befc0d6d8efc67827dbe3d230d75ae052 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 6 Apr 2018 12:09:33 -0700 Subject: [PATCH 13/31] bump dev version number after PR --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 94d0061..35bc578 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev162', + version='2.4b2.dev163', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 7c5c5da9b7bd83433a6727ed20795aaa36271f81 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 3 Apr 2018 20:51:51 +0000 Subject: [PATCH 14/31] CDX dedup improvements Check for not empty captured content (`payload_size() > 0`) before creating a new thread and running a CDX dedup request. Most dedup modules perform the same check to avoid unnecessary dedup requests. Increase CDX dedup max workers from 200 to 400 in order to handle more load. Set `user-agent: warcprox` for HTTP requests we send to CDX server. Its useful to identify and monitor `warcprox` requests. Pass HTTP headers to connection pool on init and not on each request. --- warcprox/dedup.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8d63f96..6280f77 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -205,17 +205,18 @@ class CdxServerDedup(DedupDb): cookies = None def __init__(self, cdx_url="https://web.archive.org/cdx/search", - maxsize=200, options=warcprox.Options()): + maxsize=400, options=warcprox.Options()): """Initialize cdx server connection pool and related parameters. Use low timeout value and no retries to avoid blocking warcprox operation by a slow CDX server. """ self.cdx_url = cdx_url self.options = options - self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, - timeout=2.0) + headers = {'user-agent': 'warcprox', 'accept': 'gzip/deflate'} if options.cdxserver_dedup_cookies: - self.cookies = options.cdxserver_dedup_cookies + headers['Cookie'] = options.cdxserver_dedup_cookies + self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, + timeout=2.0, headers=headers) def loader(self, *args, **kwargs): return CdxServerDedupLoader(self, self.options) @@ -245,10 +246,9 @@ class CdxServerDedup(DedupDb): """ u = url.decode("utf-8") if isinstance(url, bytes) else url try: - headers = {'Cookie': self.cookies} if self.cookies else {} result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-1), headers=headers) + limit=-1)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key @@ -276,14 +276,20 @@ class CdxServerDedup(DedupDb): class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=200) + self.pool = futures.ThreadPoolExecutor(max_workers=400) self.batch = set() self.cdx_dedup = cdx_dedup def _get_process_put(self): recorded_url = self.inq.get(block=True, timeout=0.5) - self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and recorded_url.response_recorder.payload_size() > 0): + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) + else: + if self.outq: + self.outq.put(recorded_url) def _process_url(self, recorded_url): try: From cce0c705fba5d057bce4c8d90fc00e0f6d300c7c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 4 Apr 2018 06:15:52 +0000 Subject: [PATCH 15/31] Fix Accept-Encoding request header --- warcprox/dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 6280f77..5db8e34 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -212,7 +212,7 @@ class CdxServerDedup(DedupDb): """ self.cdx_url = cdx_url self.options = options - headers = {'user-agent': 'warcprox', 'accept': 'gzip/deflate'} + headers = {'User-Agent': 'warcprox', 'Accept-Encoding': 'gzip, deflate'} if options.cdxserver_dedup_cookies: headers['Cookie'] = options.cdxserver_dedup_cookies self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, From ebf5453c2fcf0908dc304ab310acf180a5ea5749 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 6 Apr 2018 13:26:56 -0700 Subject: [PATCH 16/31] bump dev version number after PR --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 35bc578..614e531 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev163', + version='2.4b2.dev164', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From d32bf743bd1de9ababeb465b45f08d86234d5bcd Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 9 Apr 2018 15:52:44 +0000 Subject: [PATCH 17/31] Configurable min dedupable size for text/binary resources New `--dedup-min-text-size` and `--dedup-min-binary-size` cli options with default value = `0`. New `DedupableMixin` which can be used in any dedup class. It is currently used only in CDX dedup. Instead of checking `payload_size() > 0`, we now use `.is_dedupable(recorded_url)` New utility method `RecordedUrl.is_text`. --- warcprox/dedup.py | 19 +++++++++++++++++-- warcprox/main.py | 6 ++++++ warcprox/warcproxy.py | 12 ++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5db8e34..7e19150 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -37,6 +37,20 @@ from concurrent import futures urllib3.disable_warnings() +class DedupableMixin(object): + def __init__(self, options=warcprox.Options()): + self.min_text_size = options.dedup_min_text_size + self.min_binary_size = options.dedup_min_binary_size + + def is_dedupable(self, recorded_url): + """Check if we should try to run dedup on resource based on payload + size compared with min text/binary dedup size options. Return Boolean. + """ + if recorded_url.is_text(): + return recorded_url.response_recorder.payload_size() > self.min_text_size + else: + return recorded_url.response_recorder.payload_size() > self.min_binary_size + class DedupLoader(warcprox.BaseStandardPostfetchProcessor): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -273,9 +287,10 @@ class CdxServerDedup(DedupDb): """ pass -class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): +class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.pool = futures.ThreadPoolExecutor(max_workers=400) self.batch = set() self.cdx_dedup = cdx_dedup @@ -284,7 +299,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url = self.inq.get(block=True, timeout=0.5) if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.is_dedupable(recorded_url)): self.batch.add(recorded_url) self.pool.submit(self._process_url, recorded_url) else: diff --git a/warcprox/main.py b/warcprox/main.py index 8ff466b..3723445 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -148,6 +148,12 @@ def _build_arg_parser(prog='warcprox'): # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', help=argparse.SUPPRESS) + arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', + type=int, default=0, + help=('try to dedup text resources with payload size over this limit in bytes')) + arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', + type=int, default=0, help=( + 'try to dedup binary resources with payload size over this limit in bytes')) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index b6a0943..2477a06 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -376,6 +376,18 @@ class RecordedUrl: self.warc_records = warc_records self.do_not_archive = do_not_archive + def is_text(self): + """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types + Alternative method: try to decode('ascii') first N bytes to make sure + its text. + """ + if self.mimetype: + return self.mimetype[:5] == "text/" or self.mimetype in ( + "application/xml", "application/javascript", "application/json", + "application/xhtml+xml", "application/typescript", + "image/svg+xml") + return False + # inherit from object so that multiple inheritance from this class works # properly in python 2 # http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639 From cb0dea37396c6f76282d439482fa559d36e9f744 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 11 Apr 2018 22:05:31 +0000 Subject: [PATCH 18/31] oops! /status has been lying about queued urls --- setup.py | 2 +- warcprox/controller.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 614e531..824d2ba 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev164', + version='2.4b2.dev165', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index ff63657..7735df9 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -164,8 +164,7 @@ class WarcproxController(object): queued += len(processor.batch) result['postfetch_chain'].append({ - 'processor': name, - 'queued_urls': len(processor.inq.queue)}) + 'processor': name, 'queued_urls': queued}) return result def chain(self, processor0, processor1): From cc8fb4c608fcd9aab929a85598405be274632959 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 11 Apr 2018 22:29:50 +0000 Subject: [PATCH 19/31] cap the number of urls queued for warc writing --- setup.py | 2 +- warcprox/__init__.py | 15 ++++++++++++++- warcprox/writerthread.py | 8 +++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 824d2ba..a00566f 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev165', + version='2.4b2.dev166', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 76d733d..20f0de4 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -26,12 +26,14 @@ import time import logging from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution -__version__ = _get_distribution('warcprox').version +import concurrent.futures try: import queue except ImportError: import Queue as queue +__version__ = _get_distribution('warcprox').version + def digest_str(hash_obj, base32=False): import base64 return hash_obj.name.encode('utf-8') + b':' + ( @@ -45,6 +47,17 @@ class Options(_Namespace): except AttributeError: return None +class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + ''' + `concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size. + + If `max_queued` is set, calls to `submit()` will block if necessary until a + free slot is available. + ''' + def __init__(self, max_queued=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._work_queue = queue.Queue(maxsize=max_queued or 0) + class TimestampedQueue(queue.Queue): """ A queue.Queue that exposes the time enqueued of the oldest item in the diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 927c628..03aee9e 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -41,7 +41,13 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) - self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1) + + # set max_queued small, because self.inq is already handling queueing + # for us; but give it a little breathing room to make sure it can keep + # worker threads busy + self.pool = warcprox.ThreadPoolExecutor( + max_workers=options.writer_threads or 1, + max_queued=10 * (options.writer_threads or 1)) self.batch = set() def _startup(self): From ea4fc0f10a9ed2c382e50971f5ef33cce7498108 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 11 Apr 2018 22:35:37 +0000 Subject: [PATCH 20/31] include warc writer worker threads in profiling --- setup.py | 2 +- warcprox/controller.py | 15 +++++++++++++++ warcprox/writerthread.py | 21 ++++++++++++++++++++- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index a00566f..5522b2a 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev166', + version='2.4b2.dev167', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index 7735df9..cfffd06 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -439,3 +439,18 @@ class WarcproxController(object): self.logger.notice( 'performance profile of %s:\n%s', processor, buf.getvalue()) + + if hasattr(processor, 'thread_profilers'): + files = [] + for th_id, profiler in processor.thread_profilers.items(): + file = os.path.join(tmpdir, '%s.dat' % th_id) + profiler.dump_stats(file) + files.append(file) + buf = io.StringIO() + stats = pstats.Stats(*files, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'aggregate performance profile of %s worker ' + 'threads of %s:\n%s', + len(files), processor, buf.getvalue()) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 03aee9e..f4de35d 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -31,6 +31,7 @@ import logging import time import warcprox from concurrent import futures +import threading class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") @@ -43,6 +44,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.method_filter = set(method.upper() for method in self.options.method_filter or []) # set max_queued small, because self.inq is already handling queueing + self.thread_local = threading.local() + self.thread_profilers = {} # for us; but give it a little breathing room to make sure it can keep # worker threads busy self.pool = warcprox.ThreadPoolExecutor( @@ -58,10 +61,26 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): try: recorded_url = self.inq.get(block=True, timeout=0.5) self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) + self.pool.submit(self._wrap_process_url, recorded_url) finally: self.writer_pool.maybe_idle_rollover() + def _wrap_process_url(self, recorded_url): + if not getattr(self.thread_local, 'name_set', False): + threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid() + self.thread_local.name_set = True + if self.options.profile: + import cProfile + if not hasattr(self.thread_local, 'profiler'): + self.thread_local.profiler = cProfile.Profile() + tid = threading.current_thread().ident + self.thread_profilers[tid] = self.thread_local.profiler + self.thread_local.profiler.enable() + self._process_url(recorded_url) + self.thread_local.profiler.disable() + else: + self._process_url(recorded_url) + def _process_url(self, recorded_url): try: records = [] From a1930495af3322b7c3d6fc37cdf18f279dbbb080 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 12 Apr 2018 12:31:04 -0700 Subject: [PATCH 21/31] default to 100 proxy threads, 1 warc writer thread see https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads --- setup.py | 2 +- warcprox/controller.py | 6 ++---- warcprox/mitmproxy.py | 33 ++++++--------------------------- warcprox/writerthread.py | 2 +- 4 files changed, 10 insertions(+), 33 deletions(-) diff --git a/setup.py b/setup.py index 5522b2a..437b46e 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev167', + version='2.4b2.dev168', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index cfffd06..e89ecbb 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -141,11 +141,9 @@ class WarcproxController(object): self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) - # default number of warc writer threads = sqrt(proxy.max_threads) - # pulled out of thin air because it strikes me as reasonable - # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + # https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads if not self.options.writer_threads: - self.options.writer_threads = int(self.proxy.max_threads ** 0.5) + self.options.writer_threads = 1 self.build_postfetch_chain(self.proxy.recorded_url_q) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 8bd1861..e01f15e 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -501,35 +501,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): class PooledMixIn(socketserver.ThreadingMixIn): logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn") def __init__(self, max_threads=None): - ''' - If max_threads is not supplied, calculates a reasonable value based - on system resource limits. - ''' self.active_requests = set() self.unaccepted_requests = 0 - if not max_threads: - # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, - # more precisely on Linux, threads) that can be created for the - # real user ID of the calling process." - try: - import resource - rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0] - rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2) - # resource.RLIM_INFINITY == -1 which can result in max_threads == 0 - if max_threads <= 0 or max_threads > 5000: - max_threads = 5000 - self.logger.info( - "max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)", - max_threads, rlimit_nproc, rlimit_nofile) - except Exception as e: - self.logger.warn( - "unable to calculate optimal number of threads based " - "on resource limits due to %s", e) - max_threads = 100 - self.logger.info("max_threads=%s", max_threads) - self.max_threads = max_threads - self.pool = concurrent.futures.ThreadPoolExecutor(max_threads) + if max_threads: + self.max_threads = max_threads + else: + self.max_threads = 100 + self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads) + self.logger.info("%s proxy threads", self.max_threads) def status(self): if hasattr(super(), 'status'): diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index f4de35d..ef0bd2d 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -54,7 +54,7 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.batch = set() def _startup(self): - self.logger.info('%s threads', self.pool._max_workers) + self.logger.info('%s warc writer threads', self.pool._max_workers) warcprox.BaseStandardPostfetchProcessor._startup(self) def _get_process_put(self): From 9057fbdf36d425646860ea085336a3dbafd51837 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 10:29:35 +0000 Subject: [PATCH 22/31] Use DedupableMixin in all dedup classes Rename `DedupableMixin.is_dedupable` to `should_dedup`. --- warcprox/dedup.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7e19150..c75f6c3 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -42,7 +42,7 @@ class DedupableMixin(object): self.min_text_size = options.dedup_min_text_size self.min_binary_size = options.dedup_min_binary_size - def is_dedupable(self, recorded_url): + def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. Return Boolean. """ @@ -60,11 +60,12 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor): decorate_with_dedup_info( self.dedup_db, recorded_url, self.options.base32) -class DedupDb(object): +class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") def __init__( self, file='./warcprox.sqlite', options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.file = file self.options = options @@ -127,7 +128,7 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -150,10 +151,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): recorded_url.dedup_info = dedup_db.lookup( digest_key, url=recorded_url.url) -class RethinkDedupDb(DedupDb): +class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) @@ -204,7 +206,7 @@ class RethinkDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -299,7 +301,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) recorded_url = self.inq.get(block=True, timeout=0.5) if (recorded_url.response_recorder and recorded_url.payload_digest - and self.is_dedupable(recorded_url)): + and self.should_dedup(recorded_url)): self.batch.add(recorded_url) self.pool.submit(self._process_url, recorded_url) else: @@ -321,9 +323,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) if self.outq: self.outq.put(recorded_url) -class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _filter_and_bucketize(self, batch): @@ -335,7 +338,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.warc_records and recorded_url.warc_records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -367,9 +370,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -384,7 +388,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -444,7 +448,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out loading dedup info from trough', exc_info=True) -class TroughDedupDb(DedupDb): +class TroughDedupDb(DedupDb, DedupableMixin): ''' https://github.com/internetarchive/trough ''' @@ -461,6 +465,7 @@ class TroughDedupDb(DedupDb): 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.options = options self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) @@ -533,7 +538,7 @@ class TroughDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: From 6dce8cc644b3b083ac6d06959d72a24f89504a4a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 10:58:13 +0000 Subject: [PATCH 23/31] Remove method decorate_with_dedup_info Method `warcprox.dedup.decorate_with_dedup_info` is only used in `DedupLoader._process_url` and nowhere else. The problem is that `decorate_with_dedup_info` cannot get warcprox cli options. Thus we cannot pass the custom min size limits. --- warcprox/dedup.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index c75f6c3..17a4fd9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -51,14 +51,24 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size -class DedupLoader(warcprox.BaseStandardPostfetchProcessor): +class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + DedupableMixin.__init__(self, options) self.dedup_db = dedup_db def _process_url(self, recorded_url): - decorate_with_dedup_info( - self.dedup_db, recorded_url, self.options.base32) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and self.should_dedup(recorded_url)): + digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) + else: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, url=recorded_url.url) class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -138,19 +148,6 @@ class DedupDb(DedupableMixin): else: self.save(digest_key, records[0]) -def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): - if (recorded_url.response_recorder - and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url.url) - else: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, url=recorded_url.url) - class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") From 944c9a1e117e94537a4b66c89b41d47c37af684c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 12:18:20 +0000 Subject: [PATCH 24/31] Add unit tests Create two very small dummy responses (text, 2 bytes and binary, 4 bytes). Use options --dedup-min-text-size=3 and --dedup-min-binary-size=5. Ensure that due to the effects of these options, dedup is not happening. Existing dedup unit tests are not affected at all. --- tests/test_warcprox.py | 56 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 9f578a3..b3d00d6 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -191,6 +191,18 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + b'Content-Type: text/plain\r\n' + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + b'\r\n') + elif self.path == '/text-2bytes': + payload = b'aa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') + elif self.path == '/binary-4bytes': + payload = b'aaaa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: application/octet-stream\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') elif self.path.startswith('/test_payload_digest-'): content_body = ( b'Hello. How are you. I am the test_payload_digest ' @@ -394,7 +406,9 @@ def warcprox_(request, http_daemon, https_daemon): '--onion-tor-socks-proxy=localhost:9050', '--crawl-log-dir=crawl-logs', '--socket-timeout=4', - '--max-resource-size=200000'] + '--max-resource-size=200000', + '--dedup-min-text-size=3', + '--dedup-min-binary-size=5'] if request.config.getoption('--rethinkdb-dedup-url'): argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) # test these here only @@ -601,6 +615,46 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'I am the warcprox test payload! ffffffffff!\n' # XXX how to check dedup was used? +def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): + """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we + try to download content smaller than these limits to make sure that it is + not deduplicated. We create the digest_str with the following code: + ``` + payload_digest = hashlib.new('sha1') + payload_digest.update(b'aa') + warcprox.digest_str(payload_digest) + ``` + """ + url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + # This would return dedup data if payload_size > dedup-min-text-size + assert dedup_lookup is None + + url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 4 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + # This would return dedup data if payload_size > dedup-min-binary-size + assert dedup_lookup is None + # test dedup of same https url with same payload def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls From 9dac806ca1416aefb98907e4ba0f322a73b88def Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 16:31:37 +0000 Subject: [PATCH 25/31] Fix travis-ci unit test issue `test_dedup_https` fails on travis-ci. https://travis-ci.org/internetarchive/warcprox/jobs/370598950 We didn't touch that at all but worked on `test_dedup_min_size` which runs just before that. We move `test_dedup_min_size` to the end of the file hoping to resolve this. --- tests/test_warcprox.py | 81 +++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b3d00d6..8bb58ab 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -615,46 +615,6 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'I am the warcprox test payload! ffffffffff!\n' # XXX how to check dedup was used? -def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): - """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we - try to download content smaller than these limits to make sure that it is - not deduplicated. We create the digest_str with the following code: - ``` - payload_digest = hashlib.new('sha1') - payload_digest.update(b'aa') - warcprox.digest_str(payload_digest) - ``` - """ - url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - assert len(response.content) == 2 - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') - assert dedup_lookup is None - time.sleep(3) - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') - # This would return dedup data if payload_size > dedup-min-text-size - assert dedup_lookup is None - - url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - assert len(response.content) == 4 - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') - assert dedup_lookup is None - time.sleep(3) - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') - # This would return dedup data if payload_size > dedup-min-binary-size - assert dedup_lookup is None - # test dedup of same https url with same payload def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls @@ -1980,6 +1940,47 @@ def test_trough_segment_promotion(warcprox_): time.sleep(3) assert promoted == [] +def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): + """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we + try to download content smaller than these limits to make sure that it is + not deduplicated. We create the digest_str with the following code: + ``` + payload_digest = hashlib.new('sha1') + payload_digest.update(b'aa') + warcprox.digest_str(payload_digest) + ``` + """ + url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + # This would return dedup data if payload_size > dedup-min-text-size + assert dedup_lookup is None + + url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 4 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + # This would return dedup data if payload_size > dedup-min-binary-size + assert dedup_lookup is None + + if __name__ == '__main__': pytest.main() From 255d359ad4090bd9d47f8fac02a24f4c8a465c00 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 17:06:56 +0000 Subject: [PATCH 26/31] Use DedupableMixin in RethinkCapturesDedup I note that we didn't do any payload_size check at all here. --- warcprox/bigtable.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index e6674a6..cb4671e 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -34,6 +34,7 @@ import threading import datetime import doublethink import rethinkdb as r +from warcprox.dedup import DedupableMixin class RethinkCaptures: """Inserts in batches every 0.5 seconds""" @@ -215,10 +216,11 @@ class RethinkCaptures: if self._timer: self._timer.join() -class RethinkCapturesDedup(warcprox.dedup.DedupDb): +class RethinkCapturesDedup(warcprox.dedup.DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.captures_db = RethinkCaptures(options=options) self.options = options @@ -251,5 +253,6 @@ class RethinkCapturesDedup(warcprox.dedup.DedupDb): self.captures_db.close() def notify(self, recorded_url, records): - self.captures_db.notify(recorded_url, records) - + if (records and records[0].type == b'response' + and self.should_dedup(recorded_url)): + self.captures_db.notify(recorded_url, records) From 6f6a88fc0bb9f0bc50b9e51457fdb48dec6f8765 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 3 May 2018 12:36:16 -0700 Subject: [PATCH 27/31] bump dev version number after #86 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 437b46e..7b71cbe 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev168', + version='2.4b2.dev169', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 9baa2e22d547fd72967e2a9df65980d5daf1be20 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 4 May 2018 13:26:38 +0000 Subject: [PATCH 28/31] Rename captures-bucket to dedup-bucket in Warcprox-Meta --- tests/test_warcprox.py | 12 ++++++------ warcprox/bigtable.py | 4 ++-- warcprox/dedup.py | 24 ++++++++++++------------ 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 8bb58ab..d9d7341 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -745,7 +745,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) # archive url1 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -771,7 +771,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert dedup_lookup is None # archive url2 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -790,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_date = dedup_lookup['date'] # archive url2 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -800,7 +800,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -1371,7 +1371,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive with dedup_ok:False - request_meta = {'captures-bucket':'test_dedup_ok_flag','dedup-ok':False} + request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1389,7 +1389,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive without dedup_ok:False - request_meta = {'captures-bucket':'test_dedup_ok_flag'} + request_meta = {'dedup-bucket':'test_dedup_ok_flag'} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index cb4671e..d8cd218 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -157,8 +157,8 @@ class RethinkCaptures: sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") if (recorded_url.warcprox_meta - and "captures-bucket" in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta["captures-bucket"] + and "dedup-bucket" in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta["dedup-bucket"] else: bucket = "__unspecified__" diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 17a4fd9..33eab16 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -62,9 +62,9 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): and recorded_url.payload_digest and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: recorded_url.dedup_info = self.dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["captures-bucket"], + digest_key, recorded_url.warcprox_meta["dedup-bucket"], recorded_url.url) else: recorded_url.dedup_info = self.dedup_db.lookup( @@ -141,10 +141,10 @@ class DedupDb(DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: self.save( digest_key, records[0], - bucket=recorded_url.warcprox_meta["captures-bucket"]) + bucket=recorded_url.warcprox_meta["dedup-bucket"]) else: self.save(digest_key, records[0]) @@ -206,8 +206,8 @@ class RethinkDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) + if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: + self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) else: self.save(digest_key, records[0]) @@ -337,8 +337,8 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): and recorded_url.warc_records[0].type == b'response' and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'captures-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['captures-bucket'] + and 'dedup-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['dedup-bucket'] else: bucket = '__unspecified__' buckets[bucket].append(recorded_url) @@ -387,8 +387,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): and recorded_url.payload_digest and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'captures-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['captures-bucket'] + and 'dedup-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['dedup-bucket'] else: bucket = '__unspecified__' buckets[bucket].append(recorded_url) @@ -538,9 +538,9 @@ class TroughDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: + if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: self.save( digest_key, records[0], - bucket=recorded_url.warcprox_meta['captures-bucket']) + bucket=recorded_url.warcprox_meta['dedup-bucket']) else: self.save(digest_key, records[0]) From 432e42803cba7e197c86aff4b5b2091986abb10a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 4 May 2018 14:27:42 +0000 Subject: [PATCH 29/31] dedup-bucket is required in Warcprox-Meta to do dedup Modify `DedupableMixin.should_dedup` to check Warcprox-Meta for `dedup-bucket` in order to perform dedup. --- warcprox/dedup.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 33eab16..ec03c7c 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -44,8 +44,12 @@ class DedupableMixin(object): def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload - size compared with min text/binary dedup size options. Return Boolean. + size compared with min text/binary dedup size options. + `dedup-bucket` is required in Warcprox-Meta to perform dedup. + Return Boolean. """ + if "dedup-bucket" not in recorded_url.warcprox_meta: + return False if recorded_url.is_text(): return recorded_url.response_recorder.payload_size() > self.min_text_size else: From abb54e42d1b80b2f405f8314c6dbf4c6e9a6511c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 4 May 2018 20:50:54 +0000 Subject: [PATCH 30/31] Add hidden CLI option --dedup-only-with-bucket When we use `--dedup-only-with-bucket`, dedup will be done only when a request has key `dedup-bucket` in `Warcprox-Meta`. --- warcprox/dedup.py | 6 ++++-- warcprox/main.py | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index ec03c7c..f979d97 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -41,14 +41,16 @@ class DedupableMixin(object): def __init__(self, options=warcprox.Options()): self.min_text_size = options.dedup_min_text_size self.min_binary_size = options.dedup_min_binary_size + self.dedup_only_with_bucket = options.dedup_only_with_bucket def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. - `dedup-bucket` is required in Warcprox-Meta to perform dedup. + When we use option --dedup-only-with-bucket, `dedup-bucket` is required + in Warcprox-Meta to perform dedup. Return Boolean. """ - if "dedup-bucket" not in recorded_url.warcprox_meta: + if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: return False if recorded_url.is_text(): return recorded_url.response_recorder.payload_size() > self.min_text_size diff --git a/warcprox/main.py b/warcprox/main.py index 3723445..6fb46ef 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -154,6 +154,10 @@ def _build_arg_parser(prog='warcprox'): arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', type=int, default=0, help=( 'try to dedup binary resources with payload size over this limit in bytes')) + # optionally, dedup request only when `dedup-bucket` is available in + # Warcprox-Meta HTTP header. By default, we dedup all requests. + arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', + action='store_true', default=False, help=argparse.SUPPRESS) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, From 15830fc5a27939d4432c748a2928beb5a33239e2 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 9 May 2018 15:43:39 -0700 Subject: [PATCH 31/31] support "captures-bucket" for backward compatibility --- setup.py | 2 +- tests/test_warcprox.py | 2 +- warcprox/warcproxy.py | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 7b71cbe..4579b12 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev169', + version='2.4b2.dev170', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index d9d7341..269deee 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -790,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_date = dedup_lookup['date'] # archive url2 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 2477a06..0d93e5c 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -350,6 +350,10 @@ class RecordedUrl: self.response_recorder = response_recorder if warcprox_meta: + if 'captures-bucket' in warcprox_meta: + # backward compatibility + warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] + del warcprox_meta['captures-bucket'] self.warcprox_meta = warcprox_meta else: self.warcprox_meta = {}