Use DedupableMixin in all dedup classes

Rename `DedupableMixin.is_dedupable` to `should_dedup`.
This commit is contained in:
Vangelis Banos 2018-04-24 10:29:35 +00:00
parent d32bf743bd
commit 9057fbdf36

View File

@ -42,7 +42,7 @@ class DedupableMixin(object):
self.min_text_size = options.dedup_min_text_size self.min_text_size = options.dedup_min_text_size
self.min_binary_size = options.dedup_min_binary_size self.min_binary_size = options.dedup_min_binary_size
def is_dedupable(self, recorded_url): def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload """Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options. Return Boolean. size compared with min text/binary dedup size options. Return Boolean.
""" """
@ -60,11 +60,12 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
decorate_with_dedup_info( decorate_with_dedup_info(
self.dedup_db, recorded_url, self.options.base32) self.dedup_db, recorded_url, self.options.base32)
class DedupDb(object): class DedupDb(DedupableMixin):
logger = logging.getLogger("warcprox.dedup.DedupDb") logger = logging.getLogger("warcprox.dedup.DedupDb")
def __init__( def __init__(
self, file='./warcprox.sqlite', options=warcprox.Options()): self, file='./warcprox.sqlite', options=warcprox.Options()):
DedupableMixin.__init__(self, options)
self.file = file self.file = file
self.options = options self.options = options
@ -127,7 +128,7 @@ class DedupDb(object):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records and records[0].type == b'response' if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
@ -150,10 +151,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
recorded_url.dedup_info = dedup_db.lookup( recorded_url.dedup_info = dedup_db.lookup(
digest_key, url=recorded_url.url) digest_key, url=recorded_url.url)
class RethinkDedupDb(DedupDb): class RethinkDedupDb(DedupDb, DedupableMixin):
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
DedupableMixin.__init__(self, options)
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url)
self.rr = doublethink.Rethinker( self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database) servers=parsed.hosts, db=parsed.database)
@ -204,7 +206,7 @@ class RethinkDedupDb(DedupDb):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records and records[0].type == b'response' if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
@ -299,7 +301,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin)
recorded_url = self.inq.get(block=True, timeout=0.5) recorded_url = self.inq.get(block=True, timeout=0.5)
if (recorded_url.response_recorder if (recorded_url.response_recorder
and recorded_url.payload_digest and recorded_url.payload_digest
and self.is_dedupable(recorded_url)): and self.should_dedup(recorded_url)):
self.batch.add(recorded_url) self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url) self.pool.submit(self._process_url, recorded_url)
else: else:
@ -321,9 +323,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin)
if self.outq: if self.outq:
self.outq.put(recorded_url) self.outq.put(recorded_url)
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
def __init__(self, trough_dedup_db, options=warcprox.Options()): def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options) warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
DedupableMixin.__init__(self, options)
self.trough_dedup_db = trough_dedup_db self.trough_dedup_db = trough_dedup_db
def _filter_and_bucketize(self, batch): def _filter_and_bucketize(self, batch):
@ -335,7 +338,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
for recorded_url in batch: for recorded_url in batch:
if (recorded_url.warc_records if (recorded_url.warc_records
and recorded_url.warc_records[0].type == b'response' and recorded_url.warc_records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and self.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta): and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket'] bucket = recorded_url.warcprox_meta['captures-bucket']
@ -367,9 +370,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
logging.warn( logging.warn(
'timed out saving dedup info to trough', exc_info=True) 'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
def __init__(self, trough_dedup_db, options=warcprox.Options()): def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options) warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
DedupableMixin.__init__(self, options)
self.trough_dedup_db = trough_dedup_db self.trough_dedup_db = trough_dedup_db
def _startup(self): def _startup(self):
@ -384,7 +388,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
for recorded_url in batch: for recorded_url in batch:
if (recorded_url.response_recorder if (recorded_url.response_recorder
and recorded_url.payload_digest and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0): and self.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta): and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket'] bucket = recorded_url.warcprox_meta['captures-bucket']
@ -444,7 +448,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
logging.warn( logging.warn(
'timed out loading dedup info from trough', exc_info=True) 'timed out loading dedup info from trough', exc_info=True)
class TroughDedupDb(DedupDb): class TroughDedupDb(DedupDb, DedupableMixin):
''' '''
https://github.com/internetarchive/trough https://github.com/internetarchive/trough
''' '''
@ -461,6 +465,7 @@ class TroughDedupDb(DedupDb):
'values (%s, %s, %s, %s);') 'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
DedupableMixin.__init__(self, options)
self.options = options self.options = options
self._trough_cli = warcprox.trough.TroughClient( self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60) options.rethinkdb_trough_db_url, promotion_interval=60*60)
@ -533,7 +538,7 @@ class TroughDedupDb(DedupDb):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records and records[0].type == b'response' if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: