lru_cache skip_revisit

This commit is contained in:
Barbara Miller 2023-07-12 17:04:07 -07:00
parent 64a152ee8c
commit af4c8b071a

View File

@ -29,7 +29,7 @@ import warcprox
import sqlite3 import sqlite3
import doublethink import doublethink
import datetime import datetime
import psycopg2 import psycopg
import urllib3 import urllib3
from urllib3.exceptions import HTTPError from urllib3.exceptions import HTTPError
import collections import collections
@ -62,18 +62,49 @@ class DedupableMixin(object):
else: else:
return recorded_url.response_recorder.payload_size() > self.min_binary_size return recorded_url.response_recorder.payload_size() > self.min_binary_size
@lru_cache(maxsize=256)
def skip_revisit(hash_plus_url, revisit_key, conn):
"""
check if hash_plus_url & revisit_key are present in conn table revisits,
returning true if they exist, else false
"""
query = "SELECT exists(SELECT 1 FROM revisits WHERE hash_plus_url = %s and crawl_id = %s LIMIT 1);"
cur = conn.cursor()
try:
cur.execute(query, (hash_plus_url, revisit_key))
except Exception as e:
logging.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e)
return False
result = cur.fetchone()
if result and result == (True, ):
logging.info("skipping revisit for hash %s", hash_plus_url)
return True
query = "INSERT INTO revisits (crawl_id, hash_plus_url) VALUES (%s, %s);"
try:
cur.execute(query, (revisit_key, hash_plus_url))
conn.commit()
except Exception as e:
logging.warning("exception inserting %s and %s: %s", hash_plus_url, revisit_key, e)
return False
class LimitRevisitsPGMixin(): class LimitRevisitsPGMixin():
""" """
Limit revisits recorded to one per revisit_key Limit revisits recorded to one per revisit_key
""" """
def __init__(self): def __init__(self, datasource):
self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" # datasource like "postgresql://user@db_host/db_name"
self.datatable = "crawl_revisits" # postgres table in db_name # table name revisits
try:
self._conn = psycopg.connect(datasource)
except Exception as e:
self.logger.warning("db connection failure: %s", e)
@lru_cache(maxsize=256)
def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None):
# tracks revisits, returns True when we've seen revisit before, else False """
tracks revisits, returns True when we've seen revisit before, else False
"""
if not hash_plus_url: if not hash_plus_url:
digest = warcprox.digest_str(recorded_url.payload_digest, digest = warcprox.digest_str(recorded_url.payload_digest,
self.options.base32) self.options.base32)
@ -89,34 +120,7 @@ class LimitRevisitsPGMixin():
revisit_key = str(recorded_url.warcprox_meta["metadata"]["ait-job-id"]) revisit_key = str(recorded_url.warcprox_meta["metadata"]["ait-job-id"])
else: else:
revisit_key = '__unspecified__' revisit_key = '__unspecified__'
return skip_revisit(hash_plus_url, revisit_key, self.conn)
query = "SELECT exists(SELECT 1 FROM crawl_revisits WHERE hash_plus_url = %s and crawl_id = %s LIMIT 1);"
try:
conn = psycopg2.connect(self.datasource)
except Exception as e:
self.logger.warning("db connection failure: %s", e)
return False
cur = conn.cursor()
try:
cur.execute(query, (hash_plus_url, revisit_key))
except Exception as e:
self.logger.warning("exception querying for %s in %s: %s", digest, revisit_key, e)
return False
result = cur.fetchone()
if result and result == (True, ):
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, digest)
return True
query = "INSERT INTO crawl_revisits (crawl_id, hash_plus_url) VALUES (%s, %s);"
try:
cur.execute(query, (revisit_key, hash_plus_url))
conn.commit()
except Exception as e:
self.logger.warning("exception inserting %s in %s for %s: %s", digest, revisit_key, recorded_url.url, e)
return False
class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
@ -461,7 +465,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix
def __init__(self, trough_dedup_db, options=warcprox.Options()): def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options) warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
LimitRevisitsPGMixin.__init__(self) LimitRevisitsPGMixin.__init__(self, "postgresql://brozzler@db.qa-archive-it.org/brozzler")
self.trough_dedup_db = trough_dedup_db self.trough_dedup_db = trough_dedup_db
def _startup(self): def _startup(self):
@ -549,10 +553,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix
for recorded_url in key_index[entry['digest_key']]: for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry recorded_url.dedup_info = entry
if recorded_url.dedup_info: if recorded_url.dedup_info:
recorded_url.do_not_archive = self.limit_revisits( recorded_url.do_not_archive = self.limit_revisits(recorded_url)
recorded_url,
revisit_key=str(recorded_url.warcprox_meta["metadata"]["ait-job-id"])
)
logging.info('%s', self.limit_revisits.cache_info()) logging.info('%s', self.limit_revisits.cache_info())
except Exception as e: except Exception as e:
# batch_lookup raised exception or something # batch_lookup raised exception or something