From 5af0fcff6cebbd3b30ce6a3734e1d84cc09d6793 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 17 Jan 2018 13:34:35 +0000 Subject: [PATCH 1/6] Use socket.TCP_NODELAY to improve performance Experiment details supporting this in Jira issue WWM-935 --- warcprox/mitmproxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 130196a..dd08bf7 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -243,6 +243,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): port=self.onion_tor_socks_proxy_port, rdns=True) else: self._remote_server_sock = socket.socket() + self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # XXX what value should this timeout have? self._remote_server_sock.settimeout(60) From d590dee59a227394b7f092dfdd24c7ad94e19625 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 18 Jan 2018 12:00:27 -0800 Subject: [PATCH 2/6] fix port conflict test failure on travis-ci --- tests/test_warcprox.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 6ab90f6..d091542 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1396,8 +1396,7 @@ def test_controller_with_defaults(): assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_load_plugin(): - options = warcprox.Options() - options.plugins = ['warcprox.stats.RunningStats'] + options = warcprox.Options(port=0, plugins=['warcprox.stats.RunningStats']) controller = warcprox.controller.WarcproxController(options) assert isinstance( controller._postfetch_chain[-1], From 1c502353057a84cd77b9050ac84a3669a240ffcb Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 19 Jan 2018 15:16:26 +0000 Subject: [PATCH 3/6] Add --cdxserver-dedup-cookies option It is necessary to pass cookies to the CDX Server we use for deduplication. To do this, we add the optional CLI argument ``--cdxserver-dedup-cookies="cookie1=val1;cookie2=val2"`` and if it is available, its used in the `Cookie` HTTP header in CDX server requests. --- warcprox/controller.py | 2 +- warcprox/dedup.py | 6 +++++- warcprox/main.py | 3 +++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/warcprox/controller.py b/warcprox/controller.py index dfd930b..fe9960a 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -47,7 +47,7 @@ class Factory: dedup_db = warcprox.dedup.TroughDedupDb(options) elif options.cdxserver_dedup: dedup_db = warcprox.dedup.CdxServerDedup( - cdx_url=options.cdxserver_dedup) + cdx_url=options.cdxserver_dedup, options=options) elif options.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 950c110..cd3b397 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -201,12 +201,15 @@ class CdxServerDedup(DedupDb): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + cookies = None def __init__(self, cdx_url="https://web.archive.org/cdx/search", maxsize=200, options=warcprox.Options()): self.cdx_url = cdx_url self.options = options self.http_pool = urllib3.PoolManager(maxsize=maxsize) + if options.cdxserver_dedup_cookies: + self.cookies = options.cdxserver_dedup_cookies def start(self): pass @@ -233,9 +236,10 @@ class CdxServerDedup(DedupDb): """ u = url.decode("utf-8") if isinstance(url, bytes) else url try: + headers = {'Cookie': self.cookies} if self.cookies else {} result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-1)) + limit=-1), headers=headers) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key diff --git a/warcprox/main.py b/warcprox/main.py index 59e4b4a..1f270a1 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -145,6 +145,9 @@ def _build_arg_parser(prog='warcprox'): '--rethinkdb-services-url', dest='rethinkdb_services_url', help=( 'rethinkdb service registry table url; if provided, warcprox ' 'will create and heartbeat entry for itself')) + # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" + arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', + help=argparse.SUPPRESS) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, From 4b53c10132f5ecdc4521e9483deace607df38b79 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 19 Jan 2018 14:37:53 -0800 Subject: [PATCH 4/6] bump minor version after these big changes --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 242d5b6..a93da08 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.3.1b4.dev138', + version='2.4b1.dev139', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 57abab100cd20da215e7105127d2bb5794f92faf Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 19 Jan 2018 14:38:54 -0800 Subject: [PATCH 5/6] handle case where warc record id is missing ... from trough dedup. Not sure why this error happened but we shouldn't need that field anyway. --- warcprox/dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index cd3b397..c9b0079 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -419,7 +419,7 @@ class TroughDedupDb(DedupDb): len(digest_keys), len(results)) assert len(results) >= 0 and len(results) <= len(digest_keys) for result in results: - result['id'] = result['id'].encode('ascii') + result['id'] = result.get('id') and result['id'].encode('ascii') result['url'] = result['url'].encode('ascii') result['date'] = result['date'].encode('ascii') result['digest_key'] = result['digest_key'].encode('ascii') From 7fb78ef1df062ab63655125b93ce91ab516e3b38 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 19 Jan 2018 16:33:15 -0800 Subject: [PATCH 6/6] parallelize trough dedup queries Each dedup bucket (in archive-it, generally one per seed) requires a separate http request. The batches of urls processed by the trough dedup loader and storer may include multiple dedup buckets. This commit makes those all the trough queries in a given batch run in parallel, using a thread pool. --- warcprox/dedup.py | 70 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index c9b0079..cb65408 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -33,6 +33,7 @@ import datetime import urllib3 from urllib3.exceptions import HTTPError import collections +from concurrent import futures urllib3.disable_warnings() @@ -289,8 +290,26 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): def _process_batch(self, batch): buckets = self._filter_and_bucketize(batch) - for bucket in buckets: - self.trough_dedup_db.batch_save(buckets[bucket], bucket) + if not buckets: + return + fs = {} + with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: + # send off requests in parallel + for bucket in buckets: + future = pool.submit( + self.trough_dedup_db.batch_save, + buckets[bucket], bucket) + fs[future] = bucket + + # wait for results + try: + for future in futures.as_completed(fs, timeout=20): + pass + except futures.TimeoutError as e: + # the remaining threads actually keep running in this case, + # there's no way to stop them, but that should be harmless + logging.warn( + 'timed out saving dedup info to trough', exc_info=True) class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, trough_dedup_db, options=warcprox.Options()): @@ -320,7 +339,13 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def _build_key_index(self, batch): ''' - Returns `{digest_key: [recorded_url, ...]}`. + Builds index of RecordedUrl by digest key. + + Args: + batch(list): list of RecordedUrl + + Returns: + dict `{digest_key: [recorded_url, ...]}` ''' key_index = collections.defaultdict(list) for recorded_url in batch: @@ -331,13 +356,37 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def _process_batch(self, batch): buckets = self._filter_and_bucketize(batch) - for bucket in buckets: - key_index = self._build_key_index(buckets[bucket]) - results = self.trough_dedup_db.batch_lookup( - key_index.keys(), bucket) - for result in results: - for recorded_url in key_index[result['digest_key']]: - recorded_url.dedup_info = result + if not buckets: + return + fs = {} + with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: + # send off the trough requests in parallel + for bucket in buckets: + key_index = self._build_key_index(buckets[bucket]) + future = pool.submit( + self.trough_dedup_db.batch_lookup, + key_index.keys(), bucket) + fs[future] = bucket + + # process results as they come back + try: + for future in futures.as_completed(fs, timeout=20): + bucket = fs[future] + try: + for entry in future.result(): + for recorded_url in key_index[entry['digest_key']]: + recorded_url.dedup_info = entry + except Exception as e: + # batch_lookup raised exception or something + logging.warn( + 'problem looking up dedup info for %s urls ' + 'in bucket %s', len(buckets[bucket]), bucket, + exc_info=True) + except futures.TimeoutError as e: + # the remaining threads actually keep running in this case, + # there's no way to stop them, but that should be harmless + logging.warn( + 'timed out loading dedup info from trough', exc_info=True) class TroughDedupDb(DedupDb): ''' @@ -409,6 +458,7 @@ class TroughDedupDb(DedupDb): return None def batch_lookup(self, digest_keys, bucket='__unspecified__'): + '''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]''' sql_tmpl = 'select * from dedup where digest_key in (%s)' % ( ','.join('%s' for i in range(len(digest_keys)))) results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)