diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7e19150..c75f6c3 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -42,7 +42,7 @@ class DedupableMixin(object): self.min_text_size = options.dedup_min_text_size self.min_binary_size = options.dedup_min_binary_size - def is_dedupable(self, recorded_url): + def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. Return Boolean. """ @@ -60,11 +60,12 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor): decorate_with_dedup_info( self.dedup_db, recorded_url, self.options.base32) -class DedupDb(object): +class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") def __init__( self, file='./warcprox.sqlite', options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.file = file self.options = options @@ -127,7 +128,7 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -150,10 +151,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): recorded_url.dedup_info = dedup_db.lookup( digest_key, url=recorded_url.url) -class RethinkDedupDb(DedupDb): +class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) @@ -204,7 +206,7 @@ class RethinkDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -299,7 +301,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) recorded_url = self.inq.get(block=True, timeout=0.5) if (recorded_url.response_recorder and recorded_url.payload_digest - and self.is_dedupable(recorded_url)): + and self.should_dedup(recorded_url)): self.batch.add(recorded_url) self.pool.submit(self._process_url, recorded_url) else: @@ -321,9 +323,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) if self.outq: self.outq.put(recorded_url) -class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _filter_and_bucketize(self, batch): @@ -335,7 +338,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.warc_records and recorded_url.warc_records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -367,9 +370,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -384,7 +388,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -444,7 +448,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out loading dedup info from trough', exc_info=True) -class TroughDedupDb(DedupDb): +class TroughDedupDb(DedupDb, DedupableMixin): ''' https://github.com/internetarchive/trough ''' @@ -461,6 +465,7 @@ class TroughDedupDb(DedupDb): 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.options = options self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) @@ -533,7 +538,7 @@ class TroughDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: