diff --git a/warcprox/controller.py b/warcprox/controller.py index d27c0cd..efa7885 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -291,7 +291,6 @@ class WarcproxController(object): self.playback_proxy_thread.start() for processor in self._postfetch_chain: - # logging.info('starting postfetch processor %r', processor) processor.start() logging.info('started postfetch processor %r', processor) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 2aab0aa..962ec3a 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,7 +1,7 @@ ''' warcprox/dedup.py - identical payload digest deduplication using sqlite db -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -32,17 +32,18 @@ import doublethink import datetime import urllib3 from urllib3.exceptions import HTTPError +import collections urllib3.disable_warnings() class DedupLoader(warcprox.BaseStandardPostfetchProcessor): - def __init__(self, dedup_db, inq, outq, base32=False, profile=False): + def __init__(self, dedup_db, inq, outq, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__( self, inq, outq, profile) self.dedup_db = dedup_db - self.base32 = base32 def _process_url(self, recorded_url): - decorate_with_dedup_info(self.dedup_db, recorded_url, self.base32) + decorate_with_dedup_info( + self.dedup_db, recorded_url, self.options.base32) class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -259,6 +260,48 @@ class CdxServerDedup(DedupDb): """ pass +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): + def __init__(self, trough_dedup_db, inq, outq, options=warcprox.Options()): + warcprox.BaseBatchPostfetchProcessor.__init__(self, inq, outq, options) + self.trough_dedup_db = trough_dedup_db + + def _filter_and_bucketize(self, batch): + ''' + Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not + be looked up. + ''' + buckets = collections.defaultdict(list) + for recorded_url in batch: + if (recorded_url.response_recorder + and recorded_url.payload_digest + and recorded_url.response_recorder.payload_size() > 0): + if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: + bucket = recorded_url.warcprox_meta['captures-bucket'] + else: + bucket = '__unspecified__' + buckets[bucket].append(recorded_url) + return buckets + + def _build_key_index(self, batch): + ''' + Returns `{digest_key: [recorded_url, ...]}`. + ''' + key_index = collections.defaultdict(list) + for recorded_url in batch: + digest_key = warcprox.digest_str( + recorded_url.payload_digest, self.options.base32) + key_index[digest_key].append(recorded_url) + return key_index + + def _process_batch(self, batch): + buckets = self._filter_and_bucketize(batch) + for bucket in buckets: + key_index = self._build_key_index(buckets[bucket]) + results = self.trough_dedup_db.batch_lookup(key_index.keys(), bucket) + for result in results: + for recorded_url in key_index[result['digest_key']]: + recorded_url.dedup_info = result + class TroughDedupDb(DedupDb): ''' https://github.com/internetarchive/trough @@ -279,6 +322,9 @@ class TroughDedupDb(DedupDb): self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) + def loader(self, inq, outq, options=warcprox.Options()): + return BatchTroughLoader(self, inq, outq, options) + def start(self): self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) @@ -306,6 +352,23 @@ class TroughDedupDb(DedupDb): else: return None + def batch_lookup(self, digest_keys, bucket='__unspecified__'): + sql_tmpl = 'select * from dedup where digest_key in (%s)' % ( + ','.join('%s' for i in range(len(digest_keys)))) + results = self._trough_cli.read(bucket, sql_tmpl, digest_keys) + if results is None: + return [] + self.logger.debug( + 'trough batch lookup of %s keys returned %s results', + len(digest_keys), len(results)) + assert len(results) >= 0 and len(results) <= len(digest_keys) + for result in results: + result['id'] = result['id'].encode('ascii') + result['url'] = result['url'].encode('ascii') + result['date'] = result['date'].encode('ascii') + result['digest_key'] = result['digest_key'].encode('ascii') + return results + def notify(self, recorded_url, records): if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0):