diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 705b74a..27705c9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,7 +1,7 @@ ''' warcprox/dedup.py - identical payload digest deduplication using sqlite db -Copyright (C) 2013-2021 Internet Archive +Copyright (C) 2013-2023 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -61,6 +61,122 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size +class LimitRevisitsPGMixin(object): + """ + Limit revisits recorded to one per revisit_key + """ + def __init__(self, options=warcprox.Options()): + import psycopg2 + from psycopg2 import extras # TODO: needed? + self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" + self.datatable = "crawl_revisits" # postgres table in db_name + + def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): + if not hash_plus_url: + hash_plus_url = b''.join( + (warcprox.digest_str(recorded_url.payload_digest, + self.options.base32), + recorded_url.url) + ) + if not revisit_key: + # use ait-job-id if available + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1);" + + try: + conn = psycopg2.connect(self.datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + return False + cur = conn.cursor() + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + if result[0]: + logging.info("result[0]: %s", result[0]) + + if result[0] and result[0] == True: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + query = f"INSERT INTO {self.datatable} VALUES({revisit_key}, {hash_plus_url});" + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + return False + +class LimitRecords(object): + """ + Limit records to one per revisit_key, e.g., crawl id + """ + def __init__(self, datasource, options=warcprox.Options()): + import psycopg2 + from psycopg2 import extras # needed + self.datasource = datasource # "postgresql://user@db_host/db_name" + # self.datatable = revisit_key # set in limit_revisits method + # verify partition table exists + self.datatable = 'crawled_urls' # postgres table in db_name + + def limit_records(self, buckets): + for bucket in buckets: + for recorded_url in bucket: + hash_plus_url = b''.join( + (warcprox.digest_str(recorded_url.payload_digest, + self.options.base32), + recorded_url.url) + ) + + # use ait-job-id if available, or seed? + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + + # query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + + try: + conn = psycopg2.connect(self.datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + return False + cur = conn.cursor() + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + + if result[0]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + query = "INSERT INTO {revisit_key} VALUES(revisit_key, hash_plus_url);" + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + return False + class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -398,11 +514,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warning( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMixin): logger = logging.getLogger("warcprox.dedup.BatchTroughLoader") def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + LimitRevisitsPGMixin.__init__(self, datasource, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -415,7 +532,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): ''' buckets = collections.defaultdict(list) discards = [] - # for duplicate checks, see https://webarchive.jira.com/browse/WT-31 + # current batch often contains new duplicates, so check for them here + # see https://webarchive.jira.com/browse/WT-31 hash_plus_urls = set() for recorded_url in batch: if not recorded_url.payload_digest: @@ -488,6 +606,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for entry in future.result(): for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry + if recorded_url.dedup_info: + recorded_url.do_not_archive = \ + self.limit_revisits(recorded_url) except Exception as e: # batch_lookup raised exception or something logging.warning(