diff --git a/setup.py b/setup.py index 242d5b6..a93da08 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.3.1b4.dev138', + version='2.4b1.dev139', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 6ab90f6..d091542 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1396,8 +1396,7 @@ def test_controller_with_defaults(): assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_load_plugin(): - options = warcprox.Options() - options.plugins = ['warcprox.stats.RunningStats'] + options = warcprox.Options(port=0, plugins=['warcprox.stats.RunningStats']) controller = warcprox.controller.WarcproxController(options) assert isinstance( controller._postfetch_chain[-1], diff --git a/warcprox/controller.py b/warcprox/controller.py index dfd930b..fe9960a 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -47,7 +47,7 @@ class Factory: dedup_db = warcprox.dedup.TroughDedupDb(options) elif options.cdxserver_dedup: dedup_db = warcprox.dedup.CdxServerDedup( - cdx_url=options.cdxserver_dedup) + cdx_url=options.cdxserver_dedup, options=options) elif options.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 950c110..cb65408 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -33,6 +33,7 @@ import datetime import urllib3 from urllib3.exceptions import HTTPError import collections +from concurrent import futures urllib3.disable_warnings() @@ -201,12 +202,15 @@ class CdxServerDedup(DedupDb): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + cookies = None def __init__(self, cdx_url="https://web.archive.org/cdx/search", maxsize=200, options=warcprox.Options()): self.cdx_url = cdx_url self.options = options self.http_pool = urllib3.PoolManager(maxsize=maxsize) + if options.cdxserver_dedup_cookies: + self.cookies = options.cdxserver_dedup_cookies def start(self): pass @@ -233,9 +237,10 @@ class CdxServerDedup(DedupDb): """ u = url.decode("utf-8") if isinstance(url, bytes) else url try: + headers = {'Cookie': self.cookies} if self.cookies else {} result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-1)) + limit=-1), headers=headers) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key @@ -285,8 +290,26 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): def _process_batch(self, batch): buckets = self._filter_and_bucketize(batch) - for bucket in buckets: - self.trough_dedup_db.batch_save(buckets[bucket], bucket) + if not buckets: + return + fs = {} + with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: + # send off requests in parallel + for bucket in buckets: + future = pool.submit( + self.trough_dedup_db.batch_save, + buckets[bucket], bucket) + fs[future] = bucket + + # wait for results + try: + for future in futures.as_completed(fs, timeout=20): + pass + except futures.TimeoutError as e: + # the remaining threads actually keep running in this case, + # there's no way to stop them, but that should be harmless + logging.warn( + 'timed out saving dedup info to trough', exc_info=True) class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, trough_dedup_db, options=warcprox.Options()): @@ -316,7 +339,13 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def _build_key_index(self, batch): ''' - Returns `{digest_key: [recorded_url, ...]}`. + Builds index of RecordedUrl by digest key. + + Args: + batch(list): list of RecordedUrl + + Returns: + dict `{digest_key: [recorded_url, ...]}` ''' key_index = collections.defaultdict(list) for recorded_url in batch: @@ -327,13 +356,37 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): def _process_batch(self, batch): buckets = self._filter_and_bucketize(batch) - for bucket in buckets: - key_index = self._build_key_index(buckets[bucket]) - results = self.trough_dedup_db.batch_lookup( - key_index.keys(), bucket) - for result in results: - for recorded_url in key_index[result['digest_key']]: - recorded_url.dedup_info = result + if not buckets: + return + fs = {} + with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: + # send off the trough requests in parallel + for bucket in buckets: + key_index = self._build_key_index(buckets[bucket]) + future = pool.submit( + self.trough_dedup_db.batch_lookup, + key_index.keys(), bucket) + fs[future] = bucket + + # process results as they come back + try: + for future in futures.as_completed(fs, timeout=20): + bucket = fs[future] + try: + for entry in future.result(): + for recorded_url in key_index[entry['digest_key']]: + recorded_url.dedup_info = entry + except Exception as e: + # batch_lookup raised exception or something + logging.warn( + 'problem looking up dedup info for %s urls ' + 'in bucket %s', len(buckets[bucket]), bucket, + exc_info=True) + except futures.TimeoutError as e: + # the remaining threads actually keep running in this case, + # there's no way to stop them, but that should be harmless + logging.warn( + 'timed out loading dedup info from trough', exc_info=True) class TroughDedupDb(DedupDb): ''' @@ -405,6 +458,7 @@ class TroughDedupDb(DedupDb): return None def batch_lookup(self, digest_keys, bucket='__unspecified__'): + '''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]''' sql_tmpl = 'select * from dedup where digest_key in (%s)' % ( ','.join('%s' for i in range(len(digest_keys)))) results = self._trough_cli.read(bucket, sql_tmpl, digest_keys) @@ -415,7 +469,7 @@ class TroughDedupDb(DedupDb): len(digest_keys), len(results)) assert len(results) >= 0 and len(results) <= len(digest_keys) for result in results: - result['id'] = result['id'].encode('ascii') + result['id'] = result.get('id') and result['id'].encode('ascii') result['url'] = result['url'].encode('ascii') result['date'] = result['date'].encode('ascii') result['digest_key'] = result['digest_key'].encode('ascii') diff --git a/warcprox/main.py b/warcprox/main.py index 59e4b4a..1f270a1 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -145,6 +145,9 @@ def _build_arg_parser(prog='warcprox'): '--rethinkdb-services-url', dest='rethinkdb_services_url', help=( 'rethinkdb service registry table url; if provided, warcprox ' 'will create and heartbeat entry for itself')) + # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" + arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', + help=argparse.SUPPRESS) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 608cc75..7792c5c 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -245,6 +245,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): port=self.onion_tor_socks_proxy_port, rdns=True) else: self._remote_server_sock = socket.socket() + self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # XXX what value should this timeout have? self._remote_server_sock.settimeout(60)