From d9145eefb5de17c1e85d8de1ff85e5908198ff9f Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Mon, 26 Jun 2023 22:49:33 -0700 Subject: [PATCH] LimitRecords, more LimitRevisitsPGMixin --- warcprox/dedup.py | 107 ++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d257c58..d9f2934 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -61,34 +61,6 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size -class LimitRevisitsMixin(object): - """ - Limit revisits recorded to one per revisit_key - """ - def __init__(self, options=warcprox.Options()): - self.revisits = collections.defaultdict(set) - - def limit_revisits(self, recorded_url, hash=None, revisit_key=None): - if not hash: - hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if not revisit_key: - if ( - recorded_url.warcprox_meta - and "metadata" in recorded_url.warcprox_meta - and "ait-job-id" in recorded_url.warcprox_meta["metadata"] - ): - revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] - else: - revisit_key = 'all' - - hash_plus_url = b''.join((hash, recorded_url.url)) - if hash_plus_url in self.revisits[revisit_key]: - logger.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) - return True - else: - self.revisits[revisit_key].add(hash_plus_url) - return False - class LimitRevisitsPGMixin(object): """ Limit revisits recorded to one per revisit_key @@ -97,12 +69,16 @@ class LimitRevisitsPGMixin(object): import psycopg2 from psycopg2 import extras # needed self.datasource = datasource # "postgresql://user@db_host/db_name" - self.datatable = datatable # postgres table in db_name - # TODO here or elsewhere? verify partition table exists + self.datatable = 'crawled_urls' # postgres table in db_name + + def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): + if not hash_plus_url: + hash_plus_url = b''.join( + (warcprox.digest_str(recorded_url.payload_digest, + self.options.base32), + recorded_url.url) + ) - def limit_revisits(self, recorded_url, hash=None, revisit_key=None): - if not hash: - hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) if not revisit_key: # use ait-job-id if available if ( @@ -113,9 +89,67 @@ class LimitRevisitsPGMixin(object): revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] else: revisit_key = 'all' - hash_plus_url = b''.join((hash, recorded_url.url)) - query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + + try: + conn = psycopg2.connect(self.datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + return False + cur = conn.cursor() + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + + if result[0]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + query = "INSERT INTO {self.datable} VALUES(revisit_key, hash_plus_url);" + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + return False + +class LimitRecords(object): + """ + Limit records to one per revisit_key, e.g., crawl id + """ + def __init__(self, datasource, options=warcprox.Options()): + import psycopg2 + from psycopg2 import extras # needed + self.datasource = datasource # "postgresql://user@db_host/db_name" + # self.datatable = revisit_key # set in limit_revisits method + # verify partition table exists + self.datatable = 'crawled_urls' # postgres table in db_name + + def limit_records(self, buckets): + for bucket in buckets: + for recorded_url in bucket: + hash_plus_url = b''.join( + (warcprox.digest_str(recorded_url.payload_digest, + self.options.base32), + recorded_url.url) + ) + + # use ait-job-id if available, or seed? + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + + # query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" try: conn = psycopg2.connect(self.datasource) @@ -498,7 +532,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix ''' buckets = collections.defaultdict(list) discards = [] - # for duplicate checks, see https://webarchive.jira.com/browse/WT-31 + # current batch often contains new duplicates, so check for them here + # see https://webarchive.jira.com/browse/WT-31 hash_plus_urls = set() for recorded_url in batch: if not recorded_url.payload_digest: