From ef75164f8bab043894643f6d7f5c8e22e4ea432a Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Tue, 27 Jun 2023 11:47:55 -0700 Subject: [PATCH 1/2] fixes for qa prototyping --- warcprox/dedup.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d9f2934..93a335b 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -29,6 +29,7 @@ import warcprox import sqlite3 import doublethink import datetime +import psycopg2 import urllib3 from urllib3.exceptions import HTTPError import collections @@ -65,20 +66,17 @@ class LimitRevisitsPGMixin(object): """ Limit revisits recorded to one per revisit_key """ - def __init__(self, datasource, options=warcprox.Options()): - import psycopg2 - from psycopg2 import extras # needed - self.datasource = datasource # "postgresql://user@db_host/db_name" - self.datatable = 'crawled_urls' # postgres table in db_name + def __init__(self, options=warcprox.Options()): + self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" + self.datatable = "crawl_revisits" # postgres table in db_name def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): if not hash_plus_url: - hash_plus_url = b''.join( + hash_plus_url = b"".join( (warcprox.digest_str(recorded_url.payload_digest, self.options.base32), recorded_url.url) - ) - + ).decode() if not revisit_key: # use ait-job-id if available if ( @@ -90,7 +88,7 @@ class LimitRevisitsPGMixin(object): else: revisit_key = 'all' - query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = "SELECT exists(SELECT 1 FROM crawl_revisits WHERE hash_plus_url = %s LIMIT 1);" try: conn = psycopg2.connect(self.datasource) @@ -99,19 +97,21 @@ class LimitRevisitsPGMixin(object): return False cur = conn.cursor() try: - cur.execute(query) + cur.execute(query, (hash_plus_url,)) except Exception as e: self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) return False result = cur.fetchone() - if result[0]: + logging.info("result[0]: %s", result[0]) + + if result[0] and result[0] == True: logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) return True else: - query = "INSERT INTO {self.datable} VALUES(revisit_key, hash_plus_url);" + query = "INSERT INTO crawl_revisits (crawl_id, hash_plus_url) VALUES (%s, %s);" try: - cur.execute(query) + cur.execute(query, (revisit_key, hash_plus_url)) except Exception as e: self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) @@ -122,8 +122,6 @@ class LimitRecords(object): Limit records to one per revisit_key, e.g., crawl id """ def __init__(self, datasource, options=warcprox.Options()): - import psycopg2 - from psycopg2 import extras # needed self.datasource = datasource # "postgresql://user@db_host/db_name" # self.datatable = revisit_key # set in limit_revisits method # verify partition table exists @@ -518,8 +516,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - LimitRevisitsPGMixin.__init__(self, datasource="", options) - # TODO: get datasource + LimitRevisitsPGMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -607,8 +604,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry if recorded_url.dedup_info: - recorded_url.do_not_archive = \ - self.limit_revisits(recorded_url) + recorded_url.do_not_archive = self.limit_revisits(recorded_url) except Exception as e: # batch_lookup raised exception or something logging.warning( From dfc34e75612a38cb2772a8f486d0698d6311cc43 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 28 Jun 2023 11:43:53 -0700 Subject: [PATCH 2/2] more updates qa prototyping --- warcprox/dedup.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 93a335b..c0b9ec1 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -62,21 +62,21 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size -class LimitRevisitsPGMixin(object): +class LimitRevisitsPGMixin(): """ Limit revisits recorded to one per revisit_key """ - def __init__(self, options=warcprox.Options()): + def __init__(self): self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" self.datatable = "crawl_revisits" # postgres table in db_name def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): + # tracks revisits, returns True when we've seen revisit before, else False if not hash_plus_url: - hash_plus_url = b"".join( - (warcprox.digest_str(recorded_url.payload_digest, - self.options.base32), - recorded_url.url) - ).decode() + digest = warcprox.digest_str(recorded_url.payload_digest, + self.options.base32) + digest = digest[5:] if digest.startswith(b'sha1:') else digest + hash_plus_url = b"".join([digest, recorded_url.url]).decode() if not revisit_key: # use ait-job-id if available if ( @@ -86,7 +86,7 @@ class LimitRevisitsPGMixin(object): ): revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] else: - revisit_key = 'all' + revisit_key = '__unspecified__' query = "SELECT exists(SELECT 1 FROM crawl_revisits WHERE hash_plus_url = %s LIMIT 1);" @@ -102,18 +102,17 @@ class LimitRevisitsPGMixin(object): self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) return False result = cur.fetchone() - if result[0]: - logging.info("result[0]: %s", result[0]) - if result[0] and result[0] == True: + if result and result == (True, ): logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) return True - else: - query = "INSERT INTO crawl_revisits (crawl_id, hash_plus_url) VALUES (%s, %s);" - try: - cur.execute(query, (revisit_key, hash_plus_url)) - except Exception as e: - self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + query = "INSERT INTO crawl_revisits (crawl_id, hash_plus_url) VALUES (%s, %s);" + try: + cur.execute(query, (revisit_key, hash_plus_url)) + conn.commit() + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) return False