limit revisits mixin

This commit is contained in:
Barbara Miller 2023-06-21 17:25:41 -07:00
parent ca02c22ff7
commit 5075920415

View File

@ -1,7 +1,7 @@
'''
warcprox/dedup.py - identical payload digest deduplication using sqlite db
Copyright (C) 2013-2021 Internet Archive
Copyright (C) 2013-2023 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -61,6 +61,35 @@ class DedupableMixin(object):
else:
return recorded_url.response_recorder.payload_size() > self.min_binary_size
class LimitRevisitsMixin(object):
"""
Limit revisits recorded to one per revisit_key
"""
def __init__(self, options=warcprox.Options()):
self.revisits = collections.defaultdict(set)
# TODO: define logger?
def limit_revisits(self, recorded_url, hash=None, revisit_key=None):
if not hash:
hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if not revisit_key:
if (
recorded_url.warcprox_meta
and "metadata" in recorded_url.warcprox_meta
and "ait-job-id" in recorded_url.warcprox_meta["metadata"]
):
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
else:
revisit_key = 'all'
hash_plus_url = b''.join((hash, recorded_url.url))
if hash_plus_url in self.revisits[revisit_key]:
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
return True
else:
self.revisits[revisit_key].add(hash_plus_url)
return False
class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
def __init__(self, dedup_db, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
@ -398,7 +427,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
logging.warning(
'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsMixin):
logger = logging.getLogger("warcprox.dedup.BatchTroughLoader")
def __init__(self, trough_dedup_db, options=warcprox.Options()):
@ -488,6 +517,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
for entry in future.result():
for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry
if recorded_url.dedup_info:
recorded_url.do_not_archive = \
self.limit_revisits(recorded_url)
except Exception as e:
# batch_lookup raised exception or something
logging.warning(