LimitRevisitsPGMixin

This commit is contained in:
Barbara Miller 2023-06-22 19:29:53 -07:00
parent 5075920415
commit 08f2903f14

View File

@ -67,7 +67,6 @@ class LimitRevisitsMixin(object):
"""
def __init__(self, options=warcprox.Options()):
self.revisits = collections.defaultdict(set)
# TODO: define logger?
def limit_revisits(self, recorded_url, hash=None, revisit_key=None):
if not hash:
@ -84,12 +83,65 @@ class LimitRevisitsMixin(object):
hash_plus_url = b''.join((hash, recorded_url.url))
if hash_plus_url in self.revisits[revisit_key]:
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
logger.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
return True
else:
self.revisits[revisit_key].add(hash_plus_url)
return False
class LimitRevisitsPGMixin(object):
"""
Limit revisits recorded to one per revisit_key
"""
def __init__(self, datasource, options=warcprox.Options()):
import psycopg2
from psycopg2 import extras # needed
self.datasource = datasource # "postgresql://user@db_host/db_name"
self.datatable = datatable # postgres table in db_name
# TODO here or elsewhere? verify partition table exists
def limit_revisits(self, recorded_url, hash=None, revisit_key=None):
if not hash:
hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if not revisit_key:
# use ait-job-id if available
if (
recorded_url.warcprox_meta
and "metadata" in recorded_url.warcprox_meta
and "ait-job-id" in recorded_url.warcprox_meta["metadata"]
):
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
else:
revisit_key = 'all'
hash_plus_url = b''.join((hash, recorded_url.url))
query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
try:
conn = psycopg2.connect(self.datasource)
except Exception as e:
self.logger.warning("db connection failure: %s", e)
return False
cur = conn.cursor()
try:
cur.execute(query)
except Exception as e:
self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e)
return False
result = cur.fetchone()
if result[0]:
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
return True
else:
query = "INSERT INTO {revisit_key} VALUES(revisit_key, hash_plus_url);"
try:
cur.execute(query)
except Exception as e:
self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e)
return False
class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
def __init__(self, dedup_db, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
@ -427,11 +479,13 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
logging.warning(
'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsMixin):
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMixin):
logger = logging.getLogger("warcprox.dedup.BatchTroughLoader")
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
LimitRevisitsPGMixin.__init__(self, datasource="", options)
# TODO: get datasource
self.trough_dedup_db = trough_dedup_db
def _startup(self):