diff --git a/warcprox/dedup.py b/warcprox/dedup.py index c49edd4..ad791f7 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -62,6 +62,7 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size + class LimitRevisitsPGMixin(): """ Limit revisits recorded to one per revisit_key @@ -70,6 +71,7 @@ class LimitRevisitsPGMixin(): self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" self.datatable = "crawl_revisits" # postgres table in db_name + @lru_cache(maxsize=256) def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): # tracks revisits, returns True when we've seen revisit before, else False if not hash_plus_url: @@ -116,62 +118,6 @@ class LimitRevisitsPGMixin(): return False -class LimitRecords(object): - """ - Limit records to one per revisit_key, e.g., crawl id - """ - def __init__(self, datasource, options=warcprox.Options()): - self.datasource = datasource # "postgresql://user@db_host/db_name" - # self.datatable = revisit_key # set in limit_revisits method - # verify partition table exists - self.datatable = 'crawled_urls' # postgres table in db_name - - def limit_records(self, buckets): - for bucket in buckets: - for recorded_url in bucket: - hash_plus_url = b''.join( - (warcprox.digest_str(recorded_url.payload_digest, - self.options.base32), - recorded_url.url) - ) - - # use ait-job-id if available, or seed? - if ( - recorded_url.warcprox_meta - and "metadata" in recorded_url.warcprox_meta - and "ait-job-id" in recorded_url.warcprox_meta["metadata"] - ): - revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] - else: - revisit_key = 'all' - - # query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" - query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" - - try: - conn = psycopg2.connect(self.datasource) - except Exception as e: - self.logger.warning("db connection failure: %s", e) - return False - cur = conn.cursor() - try: - cur.execute(query) - except Exception as e: - self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) - return False - result = cur.fetchone() - - if result[0]: - logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) - return True - else: - query = "INSERT INTO {revisit_key} VALUES(revisit_key, hash_plus_url);" - try: - cur.execute(query) - except Exception as e: - self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) - - return False class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): @@ -603,7 +549,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry if recorded_url.dedup_info: - recorded_url.do_not_archive = self.limit_revisits(recorded_url) + recorded_url.do_not_archive = self.limit_revisits( + recorded_url, + revisit_key=str(recorded_url.warcprox_meta["metadata"]["ait-job-id"]) + ) + logging.info('%s', self.limit_revisits.cache_info()) except Exception as e: # batch_lookup raised exception or something logging.warning(