From af4c8b071a79eb19d6e1fbcc14c2a6bb66b3b60b Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 12 Jul 2023 17:04:07 -0700 Subject: [PATCH] lru_cache skip_revisit --- warcprox/dedup.py | 79 ++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index ad791f7..6094582 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -29,7 +29,7 @@ import warcprox import sqlite3 import doublethink import datetime -import psycopg2 +import psycopg import urllib3 from urllib3.exceptions import HTTPError import collections @@ -62,18 +62,49 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size +@lru_cache(maxsize=256) +def skip_revisit(hash_plus_url, revisit_key, conn): + """ + check if hash_plus_url & revisit_key are present in conn table revisits, + returning true if they exist, else false + """ + query = "SELECT exists(SELECT 1 FROM revisits WHERE hash_plus_url = %s and crawl_id = %s LIMIT 1);" + cur = conn.cursor() + try: + cur.execute(query, (hash_plus_url, revisit_key)) + except Exception as e: + logging.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + if result and result == (True, ): + logging.info("skipping revisit for hash %s", hash_plus_url) + return True + + query = "INSERT INTO revisits (crawl_id, hash_plus_url) VALUES (%s, %s);" + try: + cur.execute(query, (revisit_key, hash_plus_url)) + conn.commit() + except Exception as e: + logging.warning("exception inserting %s and %s: %s", hash_plus_url, revisit_key, e) + return False class LimitRevisitsPGMixin(): """ Limit revisits recorded to one per revisit_key """ - def __init__(self): - self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" - self.datatable = "crawl_revisits" # postgres table in db_name + def __init__(self, datasource): + # datasource like "postgresql://user@db_host/db_name" + # table name revisits + try: + self._conn = psycopg.connect(datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + - @lru_cache(maxsize=256) def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): - # tracks revisits, returns True when we've seen revisit before, else False + """ + tracks revisits, returns True when we've seen revisit before, else False + """ if not hash_plus_url: digest = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) @@ -89,34 +120,7 @@ class LimitRevisitsPGMixin(): revisit_key = str(recorded_url.warcprox_meta["metadata"]["ait-job-id"]) else: revisit_key = '__unspecified__' - - query = "SELECT exists(SELECT 1 FROM crawl_revisits WHERE hash_plus_url = %s and crawl_id = %s LIMIT 1);" - - try: - conn = psycopg2.connect(self.datasource) - except Exception as e: - self.logger.warning("db connection failure: %s", e) - return False - cur = conn.cursor() - try: - cur.execute(query, (hash_plus_url, revisit_key)) - except Exception as e: - self.logger.warning("exception querying for %s in %s: %s", digest, revisit_key, e) - return False - result = cur.fetchone() - - if result and result == (True, ): - logging.info("skipping revisit for url %s and hash %s", recorded_url.url, digest) - return True - - query = "INSERT INTO crawl_revisits (crawl_id, hash_plus_url) VALUES (%s, %s);" - try: - cur.execute(query, (revisit_key, hash_plus_url)) - conn.commit() - except Exception as e: - self.logger.warning("exception inserting %s in %s for %s: %s", digest, revisit_key, recorded_url.url, e) - - return False + return skip_revisit(hash_plus_url, revisit_key, self.conn) class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): @@ -461,7 +465,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - LimitRevisitsPGMixin.__init__(self) + LimitRevisitsPGMixin.__init__(self, "postgresql://brozzler@db.qa-archive-it.org/brozzler") self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -549,10 +553,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry if recorded_url.dedup_info: - recorded_url.do_not_archive = self.limit_revisits( - recorded_url, - revisit_key=str(recorded_url.warcprox_meta["metadata"]["ait-job-id"]) - ) + recorded_url.do_not_archive = self.limit_revisits(recorded_url) logging.info('%s', self.limit_revisits.cache_info()) except Exception as e: # batch_lookup raised exception or something