Merge branch 'limit_revisits' into qa

This commit is contained in:
Barbara Miller 2023-06-28 11:48:48 -07:00
commit 876a113470

View File

@ -29,6 +29,7 @@ import warcprox
import sqlite3 import sqlite3
import doublethink import doublethink
import datetime import datetime
import psycopg2
import urllib3 import urllib3
from urllib3.exceptions import HTTPError from urllib3.exceptions import HTTPError
import collections import collections
@ -61,23 +62,23 @@ class DedupableMixin(object):
else: else:
return recorded_url.response_recorder.payload_size() > self.min_binary_size return recorded_url.response_recorder.payload_size() > self.min_binary_size
class LimitRevisitsPGMixin(object): class LimitRevisitsPGMixin():
""" """
Limit revisits recorded to one per revisit_key Limit revisits recorded to one per revisit_key
""" """
def __init__(self, options=warcprox.Options()):
import psycopg2 def __init__(self):
from psycopg2 import extras # TODO: needed?
self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name"
self.datatable = "crawl_revisits" # postgres table in db_name self.datatable = "crawl_revisits" # postgres table in db_name
def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None):
# tracks revisits, returns True when we've seen revisit before, else False
if not hash_plus_url: if not hash_plus_url:
hash_plus_url = b''.join( digest = warcprox.digest_str(recorded_url.payload_digest,
(warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
self.options.base32), digest = digest[5:] if digest.startswith(b'sha1:') else digest
recorded_url.url) hash_plus_url = b"".join([digest, recorded_url.url]).decode()
)
if not revisit_key: if not revisit_key:
# use ait-job-id if available # use ait-job-id if available
if ( if (
@ -87,9 +88,9 @@ class LimitRevisitsPGMixin(object):
): ):
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
else: else:
revisit_key = 'all' revisit_key = '__unspecified__'
query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1);" query = "SELECT exists(SELECT 1 FROM crawl_revisits WHERE hash_plus_url = %s LIMIT 1);"
try: try:
conn = psycopg2.connect(self.datasource) conn = psycopg2.connect(self.datasource)
@ -98,23 +99,22 @@ class LimitRevisitsPGMixin(object):
return False return False
cur = conn.cursor() cur = conn.cursor()
try: try:
cur.execute(query) cur.execute(query, (hash_plus_url,))
except Exception as e: except Exception as e:
self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e)
return False return False
result = cur.fetchone() result = cur.fetchone()
if result[0]:
logging.info("result[0]: %s", result[0])
if result[0] and result[0] == True: if result and result == (True, ):
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
return True return True
else:
query = f"INSERT INTO {self.datatable} VALUES({revisit_key}, {hash_plus_url});" query = "INSERT INTO crawl_revisits (crawl_id, hash_plus_url) VALUES (%s, %s);"
try: try:
cur.execute(query) cur.execute(query, (revisit_key, hash_plus_url))
except Exception as e: conn.commit()
self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) except Exception as e:
self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e)
return False return False
@ -123,8 +123,6 @@ class LimitRecords(object):
Limit records to one per revisit_key, e.g., crawl id Limit records to one per revisit_key, e.g., crawl id
""" """
def __init__(self, datasource, options=warcprox.Options()): def __init__(self, datasource, options=warcprox.Options()):
import psycopg2
from psycopg2 import extras # needed
self.datasource = datasource # "postgresql://user@db_host/db_name" self.datasource = datasource # "postgresql://user@db_host/db_name"
# self.datatable = revisit_key # set in limit_revisits method # self.datatable = revisit_key # set in limit_revisits method
# verify partition table exists # verify partition table exists
@ -519,7 +517,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix
def __init__(self, trough_dedup_db, options=warcprox.Options()): def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options) warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
LimitRevisitsPGMixin.__init__(self, datasource, options) LimitRevisitsPGMixin.__init__(self)
self.trough_dedup_db = trough_dedup_db self.trough_dedup_db = trough_dedup_db
def _startup(self): def _startup(self):
@ -607,8 +605,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix
for recorded_url in key_index[entry['digest_key']]: for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry recorded_url.dedup_info = entry
if recorded_url.dedup_info: if recorded_url.dedup_info:
recorded_url.do_not_archive = \ recorded_url.do_not_archive = self.limit_revisits(recorded_url)
self.limit_revisits(recorded_url)
except Exception as e: except Exception as e:
# batch_lookup raised exception or something # batch_lookup raised exception or something
logging.warning( logging.warning(