diff --git a/setup.py b/setup.py index 629ed64..4579b12 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev159', + version='2.4b2.dev170', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index ac0b986..c38c721 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -50,6 +50,7 @@ import io import gzip import mock import email.message +import socketserver try: import http.server as http_server @@ -166,6 +167,9 @@ def chunkify(buf, chunk_size=13): # return outbuf.getvalue() class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + # enable keepalive + protocol_version = 'HTTP/1.1' + def build_response(self): m = re.match(r'^/([^/]+)/([^/]+)$', self.path) if m is not None: @@ -187,6 +191,18 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + b'Content-Type: text/plain\r\n' + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + b'\r\n') + elif self.path == '/text-2bytes': + payload = b'aa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') + elif self.path == '/binary-4bytes': + payload = b'aaaa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: application/octet-stream\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') elif self.path.startswith('/test_payload_digest-'): content_body = ( b'Hello. How are you. I am the test_payload_digest ' @@ -276,6 +292,11 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): headers, payload = self.build_response() self.connection.sendall(headers) self.connection.sendall(payload) + if self.path in ('/missing-content-length', '/empty-response'): + # server must close the connection, else client has no idea if + # there is more data coming + self.connection.shutdown(socket.SHUT_RDWR) + self.connection.close() def do_HEAD(self): logging.info('HEAD {}'.format(self.path)) @@ -315,9 +336,20 @@ def cert(request): finally: f.close() +# We need this test server to accept multiple simultaneous connections in order +# to avoid mysterious looking test failures like these: +# https://travis-ci.org/internetarchive/warcprox/builds/362892231 +# This is because we can't guarantee (without jumping through hoops) that +# MitmProxyHandler._proxy_request() returns the connection to the pool before +# the next request tries to get a connection from the pool in +# MitmProxyHandler._connect_to_remote_server(). (Unless we run warcprox +# single-threaded for these tests, which maybe we should consider?) +class ThreadedHTTPServer(socketserver.ThreadingMixIn, http_server.HTTPServer): + pass + @pytest.fixture(scope="module") def http_daemon(request): - http_daemon = http_server.HTTPServer( + http_daemon = ThreadedHTTPServer( ('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler) logging.info('starting http://{}:{}'.format(http_daemon.server_address[0], http_daemon.server_address[1])) http_daemon_thread = threading.Thread(name='HttpDaemonThread', @@ -336,9 +368,8 @@ def http_daemon(request): @pytest.fixture(scope="module") def https_daemon(request, cert): # http://www.piware.de/2011/01/creating-an-https-server-in-python/ - https_daemon = http_server.HTTPServer(('localhost', 0), + https_daemon = ThreadedHTTPServer(('localhost', 0), RequestHandlerClass=_TestHttpRequestHandler) - # https_daemon.socket = ssl.wrap_socket(httpd.socket, certfile='path/to/localhost.pem', server_side=True) https_daemon.socket = ssl.wrap_socket(https_daemon.socket, certfile=cert, server_side=True) logging.info('starting https://{}:{}'.format(https_daemon.server_address[0], https_daemon.server_address[1])) https_daemon_thread = threading.Thread(name='HttpsDaemonThread', @@ -354,8 +385,11 @@ def https_daemon(request, cert): return https_daemon +# specify http_daemon and https_daemon as dependencies so that their finalizers +# run after warcprox is shut down, otherwise warcprox holds connections open +# and prevents the servers from shutting down @pytest.fixture(scope="module") -def warcprox_(request): +def warcprox_(request, http_daemon, https_daemon): orig_dir = os.getcwd() work_dir = tempfile.mkdtemp() logging.info('changing to working directory %r', work_dir) @@ -372,7 +406,9 @@ def warcprox_(request): '--onion-tor-socks-proxy=localhost:9050', '--crawl-log-dir=crawl-logs', '--socket-timeout=4', - '--max-resource-size=200000'] + '--max-resource-size=200000', + '--dedup-min-text-size=3', + '--dedup-min-binary-size=5'] if request.config.getoption('--rethinkdb-dedup-url'): argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) # test these here only @@ -709,7 +745,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) # archive url1 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -735,7 +771,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert dedup_lookup is None # archive url2 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -764,7 +800,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -961,6 +997,14 @@ def test_domain_doc_soft_limit( http_daemon, https_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls + # we need to clear the connection pool here because + # - connection pool already may already have an open connection localhost + # - we're about to make a connection to foo.localhost + # - but our test server, which handles all the hosts, is single threaded + # - so it will fail to connect (socket timeout) + # must close connections before each connection to a different hostname + warcprox_.proxy.remote_connection_pool.clear() + request_meta = { "stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]}, "soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10}, @@ -978,6 +1022,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + warcprox_.proxy.remote_connection_pool.clear() + # make sure stats from different domain don't count url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port) for i in range(10): @@ -990,6 +1036,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11) + warcprox_.proxy.remote_connection_pool.clear() + # (2) same host but different scheme and port: domain limit applies url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -999,6 +1047,8 @@ def test_domain_doc_soft_limit( assert response.headers['warcprox-test-header'] == 'o!' assert response.content == b'I am the warcprox test payload! pppppppppp!\n' + warcprox_.proxy.remote_connection_pool.clear() + # (3-9) different subdomain: host limit applies url = 'https://baz.foo.localhost:{}/o/p'.format(https_daemon.server_port) for i in range(7): @@ -1027,6 +1077,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20) + warcprox_.proxy.remote_connection_pool.clear() + # (11) back to http, and this is the 11th request url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port) response = requests.get( @@ -1038,6 +1090,8 @@ def test_domain_doc_soft_limit( assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n" + warcprox_.proxy.remote_connection_pool.clear() + # make sure limit doesn't get applied to a different domain url = 'https://localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1050,6 +1104,8 @@ def test_domain_doc_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 21) + warcprox_.proxy.remote_connection_pool.clear() + # https also blocked url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1062,6 +1118,8 @@ def test_domain_doc_soft_limit( assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n" + warcprox_.proxy.remote_connection_pool.clear() + # same host, different capitalization still blocked url = 'https://HEHEHE.fOO.lOcALhoST:{}/o/p'.format(https_daemon.server_port) response = requests.get( @@ -1086,6 +1144,8 @@ def test_domain_data_soft_limit( } headers = {"Warcprox-Meta": json.dumps(request_meta)} + warcprox_.proxy.remote_connection_pool.clear() + url = 'http://ÞZz.localhost:{}/y/z'.format(http_daemon.server_port) response = requests.get( url, proxies=archiving_proxies, headers=headers, stream=True) @@ -1096,6 +1156,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + warcprox_.proxy.remote_connection_pool.clear() + # duplicate, does not count toward limit url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port) response = requests.get( @@ -1108,6 +1170,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) + warcprox_.proxy.remote_connection_pool.clear() + # novel, pushes stats over the limit url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port) response = requests.get( @@ -1120,6 +1184,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) + warcprox_.proxy.remote_connection_pool.clear() + # make sure limit doesn't get applied to a different host url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port) response = requests.get( @@ -1131,6 +1197,8 @@ def test_domain_data_soft_limit( # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) + warcprox_.proxy.remote_connection_pool.clear() + # blocked because we're over the limit now url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port) response = requests.get( @@ -1226,14 +1294,23 @@ def test_limit_large_resource(archiving_proxies, http_daemon, warcprox_): """ urls_before = warcprox_.proxy.running_stats.urls + # this should be truncated url = 'http://localhost:%s/300k-content' % http_daemon.server_port response = requests.get( url, proxies=archiving_proxies, verify=False, timeout=10) assert len(response.content) == 262144 + # test that the connection is cleaned up properly after truncating a + # response (no hang or timeout) + url = 'http://localhost:%s/' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert response.status_code == 404 + assert response.content == b'404 Not Found\n' + # wait for processing of this url to finish so that it doesn't interfere # with subsequent tests - wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2) def test_method_filter( warcprox_, https_daemon, http_daemon, archiving_proxies, @@ -1294,7 +1371,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive with dedup_ok:False - request_meta = {'captures-bucket':'test_dedup_ok_flag','dedup-ok':False} + request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1312,7 +1389,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive without dedup_ok:False - request_meta = {'captures-bucket':'test_dedup_ok_flag'} + request_meta = {'dedup-bucket':'test_dedup_ok_flag'} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1765,6 +1842,14 @@ def test_socket_timeout_response( response = requests.get(url, proxies=archiving_proxies, verify=False) assert response.status_code == 502 + # test that the connection is cleaned up properly after truncating a + # response (no hang or timeout) + url = 'http://localhost:%s/' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert response.status_code == 404 + assert response.content == b'404 Not Found\n' + def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): @@ -1856,6 +1941,47 @@ def test_trough_segment_promotion(warcprox_): time.sleep(3) assert promoted == [] +def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): + """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we + try to download content smaller than these limits to make sure that it is + not deduplicated. We create the digest_str with the following code: + ``` + payload_digest = hashlib.new('sha1') + payload_digest.update(b'aa') + warcprox.digest_str(payload_digest) + ``` + """ + url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + # This would return dedup data if payload_size > dedup-min-text-size + assert dedup_lookup is None + + url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 4 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + # This would return dedup data if payload_size > dedup-min-binary-size + assert dedup_lookup is None + + if __name__ == '__main__': pytest.main() diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 76d733d..20f0de4 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -26,12 +26,14 @@ import time import logging from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution -__version__ = _get_distribution('warcprox').version +import concurrent.futures try: import queue except ImportError: import Queue as queue +__version__ = _get_distribution('warcprox').version + def digest_str(hash_obj, base32=False): import base64 return hash_obj.name.encode('utf-8') + b':' + ( @@ -45,6 +47,17 @@ class Options(_Namespace): except AttributeError: return None +class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + ''' + `concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size. + + If `max_queued` is set, calls to `submit()` will block if necessary until a + free slot is available. + ''' + def __init__(self, max_queued=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._work_queue = queue.Queue(maxsize=max_queued or 0) + class TimestampedQueue(queue.Queue): """ A queue.Queue that exposes the time enqueued of the oldest item in the diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index e6674a6..d8cd218 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -34,6 +34,7 @@ import threading import datetime import doublethink import rethinkdb as r +from warcprox.dedup import DedupableMixin class RethinkCaptures: """Inserts in batches every 0.5 seconds""" @@ -156,8 +157,8 @@ class RethinkCaptures: sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") if (recorded_url.warcprox_meta - and "captures-bucket" in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta["captures-bucket"] + and "dedup-bucket" in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta["dedup-bucket"] else: bucket = "__unspecified__" @@ -215,10 +216,11 @@ class RethinkCaptures: if self._timer: self._timer.join() -class RethinkCapturesDedup(warcprox.dedup.DedupDb): +class RethinkCapturesDedup(warcprox.dedup.DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.captures_db = RethinkCaptures(options=options) self.options = options @@ -251,5 +253,6 @@ class RethinkCapturesDedup(warcprox.dedup.DedupDb): self.captures_db.close() def notify(self, recorded_url, records): - self.captures_db.notify(recorded_url, records) - + if (records and records[0].type == b'response' + and self.should_dedup(recorded_url)): + self.captures_db.notify(recorded_url, records) diff --git a/warcprox/controller.py b/warcprox/controller.py index ff63657..e89ecbb 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -141,11 +141,9 @@ class WarcproxController(object): self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) - # default number of warc writer threads = sqrt(proxy.max_threads) - # pulled out of thin air because it strikes me as reasonable - # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + # https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads if not self.options.writer_threads: - self.options.writer_threads = int(self.proxy.max_threads ** 0.5) + self.options.writer_threads = 1 self.build_postfetch_chain(self.proxy.recorded_url_q) @@ -164,8 +162,7 @@ class WarcproxController(object): queued += len(processor.batch) result['postfetch_chain'].append({ - 'processor': name, - 'queued_urls': len(processor.inq.queue)}) + 'processor': name, 'queued_urls': queued}) return result def chain(self, processor0, processor1): @@ -440,3 +437,18 @@ class WarcproxController(object): self.logger.notice( 'performance profile of %s:\n%s', processor, buf.getvalue()) + + if hasattr(processor, 'thread_profilers'): + files = [] + for th_id, profiler in processor.thread_profilers.items(): + file = os.path.join(tmpdir, '%s.dat' % th_id) + profiler.dump_stats(file) + files.append(file) + buf = io.StringIO() + stats = pstats.Stats(*files, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'aggregate performance profile of %s worker ' + 'threads of %s:\n%s', + len(files), processor, buf.getvalue()) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8d63f96..f979d97 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -37,20 +37,51 @@ from concurrent import futures urllib3.disable_warnings() -class DedupLoader(warcprox.BaseStandardPostfetchProcessor): +class DedupableMixin(object): + def __init__(self, options=warcprox.Options()): + self.min_text_size = options.dedup_min_text_size + self.min_binary_size = options.dedup_min_binary_size + self.dedup_only_with_bucket = options.dedup_only_with_bucket + + def should_dedup(self, recorded_url): + """Check if we should try to run dedup on resource based on payload + size compared with min text/binary dedup size options. + When we use option --dedup-only-with-bucket, `dedup-bucket` is required + in Warcprox-Meta to perform dedup. + Return Boolean. + """ + if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: + return False + if recorded_url.is_text(): + return recorded_url.response_recorder.payload_size() > self.min_text_size + else: + return recorded_url.response_recorder.payload_size() > self.min_binary_size + +class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + DedupableMixin.__init__(self, options) self.dedup_db = dedup_db def _process_url(self, recorded_url): - decorate_with_dedup_info( - self.dedup_db, recorded_url, self.options.base32) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and self.should_dedup(recorded_url)): + digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, recorded_url.warcprox_meta["dedup-bucket"], + recorded_url.url) + else: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, url=recorded_url.url) -class DedupDb(object): +class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") def __init__( self, file='./warcprox.sqlite', options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.file = file self.options = options @@ -113,33 +144,21 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: self.save( digest_key, records[0], - bucket=recorded_url.warcprox_meta["captures-bucket"]) + bucket=recorded_url.warcprox_meta["dedup-bucket"]) else: self.save(digest_key, records[0]) -def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): - if (recorded_url.response_recorder - and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url.url) - else: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, url=recorded_url.url) - -class RethinkDedupDb(DedupDb): +class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) @@ -190,11 +209,11 @@ class RethinkDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) + if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: + self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) else: self.save(digest_key, records[0]) @@ -205,17 +224,18 @@ class CdxServerDedup(DedupDb): cookies = None def __init__(self, cdx_url="https://web.archive.org/cdx/search", - maxsize=200, options=warcprox.Options()): + maxsize=400, options=warcprox.Options()): """Initialize cdx server connection pool and related parameters. Use low timeout value and no retries to avoid blocking warcprox operation by a slow CDX server. """ self.cdx_url = cdx_url self.options = options - self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, - timeout=2.0) + headers = {'User-Agent': 'warcprox', 'Accept-Encoding': 'gzip, deflate'} if options.cdxserver_dedup_cookies: - self.cookies = options.cdxserver_dedup_cookies + headers['Cookie'] = options.cdxserver_dedup_cookies + self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, + timeout=2.0, headers=headers) def loader(self, *args, **kwargs): return CdxServerDedupLoader(self, self.options) @@ -245,10 +265,9 @@ class CdxServerDedup(DedupDb): """ u = url.decode("utf-8") if isinstance(url, bytes) else url try: - headers = {'Cookie': self.cookies} if self.cookies else {} result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-1), headers=headers) + limit=-1)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key @@ -273,17 +292,24 @@ class CdxServerDedup(DedupDb): """ pass -class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): +class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=200) + DedupableMixin.__init__(self, options) + self.pool = futures.ThreadPoolExecutor(max_workers=400) self.batch = set() self.cdx_dedup = cdx_dedup def _get_process_put(self): recorded_url = self.inq.get(block=True, timeout=0.5) - self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and self.should_dedup(recorded_url)): + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) + else: + if self.outq: + self.outq.put(recorded_url) def _process_url(self, recorded_url): try: @@ -300,9 +326,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): if self.outq: self.outq.put(recorded_url) -class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _filter_and_bucketize(self, batch): @@ -314,10 +341,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.warc_records and recorded_url.warc_records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'captures-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['captures-bucket'] + and 'dedup-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['dedup-bucket'] else: bucket = '__unspecified__' buckets[bucket].append(recorded_url) @@ -346,9 +373,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -363,10 +391,10 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'captures-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['captures-bucket'] + and 'dedup-bucket' in recorded_url.warcprox_meta): + bucket = recorded_url.warcprox_meta['dedup-bucket'] else: bucket = '__unspecified__' buckets[bucket].append(recorded_url) @@ -423,7 +451,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out loading dedup info from trough', exc_info=True) -class TroughDedupDb(DedupDb): +class TroughDedupDb(DedupDb, DedupableMixin): ''' https://github.com/internetarchive/trough ''' @@ -440,6 +468,7 @@ class TroughDedupDb(DedupDb): 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.options = options self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) @@ -512,12 +541,12 @@ class TroughDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: + if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: self.save( digest_key, records[0], - bucket=recorded_url.warcprox_meta['captures-bucket']) + bucket=recorded_url.warcprox_meta['dedup-bucket']) else: self.save(digest_key, records[0]) diff --git a/warcprox/main.py b/warcprox/main.py index 8ff466b..6fb46ef 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -148,6 +148,16 @@ def _build_arg_parser(prog='warcprox'): # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', help=argparse.SUPPRESS) + arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', + type=int, default=0, + help=('try to dedup text resources with payload size over this limit in bytes')) + arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', + type=int, default=0, help=( + 'try to dedup binary resources with payload size over this limit in bytes')) + # optionally, dedup request only when `dedup-bucket` is available in + # Warcprox-Meta HTTP header. By default, we dedup all requests. + arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', + action='store_true', default=False, help=argparse.SUPPRESS) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 8778822..e01f15e 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -390,6 +390,24 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.send_error(502, str(e)) return + def send_error(self, code, message=None, explain=None): + # BaseHTTPRequestHandler.send_response_only() in http/server.py + # does this: + # if not hasattr(self, '_headers_buffer'): + # self._headers_buffer = [] + # but we sometimes see self._headers_buffer == None + # (This happened before! see commit dc9fdc34125dd2357) + # Workaround: + if hasattr(self, '_headers_buffer') and not self._headers_buffer: + self._headers_buffer = [] + try: + return http_server.BaseHTTPRequestHandler.send_error( + self, code, message, explain) + except: + self.logger.error( + 'send_error(%r, %r, %r) raised exception', exc_info=True) + return None + def _proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to @@ -447,13 +465,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): buf = prox_rec_res.read(65536) while buf != b'': buf = prox_rec_res.read(65536) - if self._max_resource_size: - if prox_rec_res.recorder.len > self._max_resource_size: - prox_rec_res.truncated = b'length' - self.logger.error( - 'Max resource size %d bytes exceeded for URL %s', + if (self._max_resource_size and + prox_rec_res.recorder.len > self._max_resource_size): + prox_rec_res.truncated = b'length' + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) + self._remote_server_conn.sock.close() + self.logger.info( + 'truncating response because max resource size %d ' + 'bytes exceeded for URL %s', self._max_resource_size, self.url) - break + break self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) # Let's close off the remote end. If remote connection is fine, @@ -461,6 +482,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): if not is_connection_dropped(self._remote_server_conn): self._conn_pool._put_conn(self._remote_server_conn) except: + self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.close() raise finally: @@ -479,35 +501,14 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): class PooledMixIn(socketserver.ThreadingMixIn): logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn") def __init__(self, max_threads=None): - ''' - If max_threads is not supplied, calculates a reasonable value based - on system resource limits. - ''' self.active_requests = set() self.unaccepted_requests = 0 - if not max_threads: - # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, - # more precisely on Linux, threads) that can be created for the - # real user ID of the calling process." - try: - import resource - rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0] - rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2) - # resource.RLIM_INFINITY == -1 which can result in max_threads == 0 - if max_threads <= 0 or max_threads > 5000: - max_threads = 5000 - self.logger.info( - "max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)", - max_threads, rlimit_nproc, rlimit_nofile) - except Exception as e: - self.logger.warn( - "unable to calculate optimal number of threads based " - "on resource limits due to %s", e) - max_threads = 100 - self.logger.info("max_threads=%s", max_threads) - self.max_threads = max_threads - self.pool = concurrent.futures.ThreadPoolExecutor(max_threads) + if max_threads: + self.max_threads = max_threads + else: + self.max_threads = 100 + self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads) + self.logger.info("%s proxy threads", self.max_threads) def status(self): if hasattr(super(), 'status'): diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 2f63a77..0d93e5c 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -350,6 +350,10 @@ class RecordedUrl: self.response_recorder = response_recorder if warcprox_meta: + if 'captures-bucket' in warcprox_meta: + # backward compatibility + warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] + del warcprox_meta['captures-bucket'] self.warcprox_meta = warcprox_meta else: self.warcprox_meta = {} @@ -376,6 +380,18 @@ class RecordedUrl: self.warc_records = warc_records self.do_not_archive = do_not_archive + def is_text(self): + """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types + Alternative method: try to decode('ascii') first N bytes to make sure + its text. + """ + if self.mimetype: + return self.mimetype[:5] == "text/" or self.mimetype in ( + "application/xml", "application/javascript", "application/json", + "application/xhtml+xml", "application/typescript", + "image/svg+xml") + return False + # inherit from object so that multiple inheritance from this class works # properly in python 2 # http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639 @@ -483,6 +499,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): def server_close(self): self.logger.info('shutting down') http_server.HTTPServer.server_close(self) + self.remote_connection_pool.clear() def handle_error(self, request, client_address): self.logger.warn( diff --git a/warcprox/writer.py b/warcprox/writer.py index e8f4b79..9ab249c 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -88,6 +88,7 @@ class _OneWritableWarc: os.mkdir(self.directory) self.finalname = self.next_filename(serial) + self.logger.trace('opening %s', self.finalname) self.path = os.path.sep.join( [self.directory, self.finalname + self.open_suffix]) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 927c628..ef0bd2d 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -31,6 +31,7 @@ import logging import time import warcprox from concurrent import futures +import threading class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") @@ -41,21 +42,45 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) - self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1) + + # set max_queued small, because self.inq is already handling queueing + self.thread_local = threading.local() + self.thread_profilers = {} + # for us; but give it a little breathing room to make sure it can keep + # worker threads busy + self.pool = warcprox.ThreadPoolExecutor( + max_workers=options.writer_threads or 1, + max_queued=10 * (options.writer_threads or 1)) self.batch = set() def _startup(self): - self.logger.info('%s threads', self.pool._max_workers) + self.logger.info('%s warc writer threads', self.pool._max_workers) warcprox.BaseStandardPostfetchProcessor._startup(self) def _get_process_put(self): try: recorded_url = self.inq.get(block=True, timeout=0.5) self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) + self.pool.submit(self._wrap_process_url, recorded_url) finally: self.writer_pool.maybe_idle_rollover() + def _wrap_process_url(self, recorded_url): + if not getattr(self.thread_local, 'name_set', False): + threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid() + self.thread_local.name_set = True + if self.options.profile: + import cProfile + if not hasattr(self.thread_local, 'profiler'): + self.thread_local.profiler = cProfile.Profile() + tid = threading.current_thread().ident + self.thread_profilers[tid] = self.thread_local.profiler + self.thread_local.profiler.enable() + self._process_url(recorded_url) + self.thread_local.profiler.disable() + else: + self._process_url(recorded_url) + def _process_url(self, recorded_url): try: records = []