From 507592041574f286a55c3b84d5ae5669606d1391 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 21 Jun 2023 17:25:41 -0700 Subject: [PATCH] limit revisits mixin --- warcprox/dedup.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 705b74a..289b252 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,7 +1,7 @@ ''' warcprox/dedup.py - identical payload digest deduplication using sqlite db -Copyright (C) 2013-2021 Internet Archive +Copyright (C) 2013-2023 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -61,6 +61,35 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size +class LimitRevisitsMixin(object): + """ + Limit revisits recorded to one per revisit_key + """ + def __init__(self, options=warcprox.Options()): + self.revisits = collections.defaultdict(set) + # TODO: define logger? + + def limit_revisits(self, recorded_url, hash=None, revisit_key=None): + if not hash: + hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if not revisit_key: + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + + hash_plus_url = b''.join((hash, recorded_url.url)) + if hash_plus_url in self.revisits[revisit_key]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + self.revisits[revisit_key].add(hash_plus_url) + return False + class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -398,7 +427,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warning( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsMixin): logger = logging.getLogger("warcprox.dedup.BatchTroughLoader") def __init__(self, trough_dedup_db, options=warcprox.Options()): @@ -488,6 +517,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for entry in future.result(): for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry + if recorded_url.dedup_info: + recorded_url.do_not_archive = \ + self.limit_revisits(recorded_url) except Exception as e: # batch_lookup raised exception or something logging.warning(