diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 289b252..d257c58 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -67,7 +67,6 @@ class LimitRevisitsMixin(object): """ def __init__(self, options=warcprox.Options()): self.revisits = collections.defaultdict(set) - # TODO: define logger? def limit_revisits(self, recorded_url, hash=None, revisit_key=None): if not hash: @@ -84,12 +83,65 @@ class LimitRevisitsMixin(object): hash_plus_url = b''.join((hash, recorded_url.url)) if hash_plus_url in self.revisits[revisit_key]: - logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + logger.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) return True else: self.revisits[revisit_key].add(hash_plus_url) return False +class LimitRevisitsPGMixin(object): + """ + Limit revisits recorded to one per revisit_key + """ + def __init__(self, datasource, options=warcprox.Options()): + import psycopg2 + from psycopg2 import extras # needed + self.datasource = datasource # "postgresql://user@db_host/db_name" + self.datatable = datatable # postgres table in db_name + # TODO here or elsewhere? verify partition table exists + + def limit_revisits(self, recorded_url, hash=None, revisit_key=None): + if not hash: + hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if not revisit_key: + # use ait-job-id if available + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + hash_plus_url = b''.join((hash, recorded_url.url)) + + query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + + try: + conn = psycopg2.connect(self.datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + return False + cur = conn.cursor() + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + + if result[0]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + query = "INSERT INTO {revisit_key} VALUES(revisit_key, hash_plus_url);" + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + return False + class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -427,11 +479,13 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warning( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsMixin): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMixin): logger = logging.getLogger("warcprox.dedup.BatchTroughLoader") def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + LimitRevisitsPGMixin.__init__(self, datasource="", options) + # TODO: get datasource self.trough_dedup_db = trough_dedup_db def _startup(self):