Merge branch 'limit_revisits' into qa

This commit is contained in:
Barbara Miller 2023-06-27 11:48:28 -07:00
commit 65d7776ec4

View File

@ -1,7 +1,7 @@
'''
warcprox/dedup.py - identical payload digest deduplication using sqlite db
Copyright (C) 2013-2021 Internet Archive
Copyright (C) 2013-2023 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -61,6 +61,122 @@ class DedupableMixin(object):
else:
return recorded_url.response_recorder.payload_size() > self.min_binary_size
class LimitRevisitsPGMixin(object):
"""
Limit revisits recorded to one per revisit_key
"""
def __init__(self, options=warcprox.Options()):
import psycopg2
from psycopg2 import extras # TODO: needed?
self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name"
self.datatable = "crawl_revisits" # postgres table in db_name
def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None):
if not hash_plus_url:
hash_plus_url = b''.join(
(warcprox.digest_str(recorded_url.payload_digest,
self.options.base32),
recorded_url.url)
)
if not revisit_key:
# use ait-job-id if available
if (
recorded_url.warcprox_meta
and "metadata" in recorded_url.warcprox_meta
and "ait-job-id" in recorded_url.warcprox_meta["metadata"]
):
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
else:
revisit_key = 'all'
query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1);"
try:
conn = psycopg2.connect(self.datasource)
except Exception as e:
self.logger.warning("db connection failure: %s", e)
return False
cur = conn.cursor()
try:
cur.execute(query)
except Exception as e:
self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e)
return False
result = cur.fetchone()
if result[0]:
logging.info("result[0]: %s", result[0])
if result[0] and result[0] == True:
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
return True
else:
query = f"INSERT INTO {self.datatable} VALUES({revisit_key}, {hash_plus_url});"
try:
cur.execute(query)
except Exception as e:
self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e)
return False
class LimitRecords(object):
"""
Limit records to one per revisit_key, e.g., crawl id
"""
def __init__(self, datasource, options=warcprox.Options()):
import psycopg2
from psycopg2 import extras # needed
self.datasource = datasource # "postgresql://user@db_host/db_name"
# self.datatable = revisit_key # set in limit_revisits method
# verify partition table exists
self.datatable = 'crawled_urls' # postgres table in db_name
def limit_records(self, buckets):
for bucket in buckets:
for recorded_url in bucket:
hash_plus_url = b''.join(
(warcprox.digest_str(recorded_url.payload_digest,
self.options.base32),
recorded_url.url)
)
# use ait-job-id if available, or seed?
if (
recorded_url.warcprox_meta
and "metadata" in recorded_url.warcprox_meta
and "ait-job-id" in recorded_url.warcprox_meta["metadata"]
):
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
else:
revisit_key = 'all'
# query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
try:
conn = psycopg2.connect(self.datasource)
except Exception as e:
self.logger.warning("db connection failure: %s", e)
return False
cur = conn.cursor()
try:
cur.execute(query)
except Exception as e:
self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e)
return False
result = cur.fetchone()
if result[0]:
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
return True
else:
query = "INSERT INTO {revisit_key} VALUES(revisit_key, hash_plus_url);"
try:
cur.execute(query)
except Exception as e:
self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e)
return False
class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
def __init__(self, dedup_db, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
@ -398,11 +514,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
logging.warning(
'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMixin):
logger = logging.getLogger("warcprox.dedup.BatchTroughLoader")
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
LimitRevisitsPGMixin.__init__(self, datasource, options)
self.trough_dedup_db = trough_dedup_db
def _startup(self):
@ -415,7 +532,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
'''
buckets = collections.defaultdict(list)
discards = []
# for duplicate checks, see https://webarchive.jira.com/browse/WT-31
# current batch often contains new duplicates, so check for them here
# see https://webarchive.jira.com/browse/WT-31
hash_plus_urls = set()
for recorded_url in batch:
if not recorded_url.payload_digest:
@ -488,6 +606,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
for entry in future.result():
for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry
if recorded_url.dedup_info:
recorded_url.do_not_archive = \
self.limit_revisits(recorded_url)
except Exception as e:
# batch_lookup raised exception or something
logging.warning(