mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
LimitRecords, more LimitRevisitsPGMixin
This commit is contained in:
parent
08f2903f14
commit
d9145eefb5
@ -61,34 +61,6 @@ class DedupableMixin(object):
|
|||||||
else:
|
else:
|
||||||
return recorded_url.response_recorder.payload_size() > self.min_binary_size
|
return recorded_url.response_recorder.payload_size() > self.min_binary_size
|
||||||
|
|
||||||
class LimitRevisitsMixin(object):
|
|
||||||
"""
|
|
||||||
Limit revisits recorded to one per revisit_key
|
|
||||||
"""
|
|
||||||
def __init__(self, options=warcprox.Options()):
|
|
||||||
self.revisits = collections.defaultdict(set)
|
|
||||||
|
|
||||||
def limit_revisits(self, recorded_url, hash=None, revisit_key=None):
|
|
||||||
if not hash:
|
|
||||||
hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
|
|
||||||
if not revisit_key:
|
|
||||||
if (
|
|
||||||
recorded_url.warcprox_meta
|
|
||||||
and "metadata" in recorded_url.warcprox_meta
|
|
||||||
and "ait-job-id" in recorded_url.warcprox_meta["metadata"]
|
|
||||||
):
|
|
||||||
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
|
|
||||||
else:
|
|
||||||
revisit_key = 'all'
|
|
||||||
|
|
||||||
hash_plus_url = b''.join((hash, recorded_url.url))
|
|
||||||
if hash_plus_url in self.revisits[revisit_key]:
|
|
||||||
logger.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
self.revisits[revisit_key].add(hash_plus_url)
|
|
||||||
return False
|
|
||||||
|
|
||||||
class LimitRevisitsPGMixin(object):
|
class LimitRevisitsPGMixin(object):
|
||||||
"""
|
"""
|
||||||
Limit revisits recorded to one per revisit_key
|
Limit revisits recorded to one per revisit_key
|
||||||
@ -97,12 +69,16 @@ class LimitRevisitsPGMixin(object):
|
|||||||
import psycopg2
|
import psycopg2
|
||||||
from psycopg2 import extras # needed
|
from psycopg2 import extras # needed
|
||||||
self.datasource = datasource # "postgresql://user@db_host/db_name"
|
self.datasource = datasource # "postgresql://user@db_host/db_name"
|
||||||
self.datatable = datatable # postgres table in db_name
|
self.datatable = 'crawled_urls' # postgres table in db_name
|
||||||
# TODO here or elsewhere? verify partition table exists
|
|
||||||
|
def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None):
|
||||||
|
if not hash_plus_url:
|
||||||
|
hash_plus_url = b''.join(
|
||||||
|
(warcprox.digest_str(recorded_url.payload_digest,
|
||||||
|
self.options.base32),
|
||||||
|
recorded_url.url)
|
||||||
|
)
|
||||||
|
|
||||||
def limit_revisits(self, recorded_url, hash=None, revisit_key=None):
|
|
||||||
if not hash:
|
|
||||||
hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
|
|
||||||
if not revisit_key:
|
if not revisit_key:
|
||||||
# use ait-job-id if available
|
# use ait-job-id if available
|
||||||
if (
|
if (
|
||||||
@ -113,9 +89,67 @@ class LimitRevisitsPGMixin(object):
|
|||||||
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
|
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
|
||||||
else:
|
else:
|
||||||
revisit_key = 'all'
|
revisit_key = 'all'
|
||||||
hash_plus_url = b''.join((hash, recorded_url.url))
|
|
||||||
|
|
||||||
query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
|
query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = psycopg2.connect(self.datasource)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning("db connection failure: %s", e)
|
||||||
|
return False
|
||||||
|
cur = conn.cursor()
|
||||||
|
try:
|
||||||
|
cur.execute(query)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e)
|
||||||
|
return False
|
||||||
|
result = cur.fetchone()
|
||||||
|
|
||||||
|
if result[0]:
|
||||||
|
logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
query = "INSERT INTO {self.datable} VALUES(revisit_key, hash_plus_url);"
|
||||||
|
try:
|
||||||
|
cur.execute(query)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
class LimitRecords(object):
|
||||||
|
"""
|
||||||
|
Limit records to one per revisit_key, e.g., crawl id
|
||||||
|
"""
|
||||||
|
def __init__(self, datasource, options=warcprox.Options()):
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2 import extras # needed
|
||||||
|
self.datasource = datasource # "postgresql://user@db_host/db_name"
|
||||||
|
# self.datatable = revisit_key # set in limit_revisits method
|
||||||
|
# verify partition table exists
|
||||||
|
self.datatable = 'crawled_urls' # postgres table in db_name
|
||||||
|
|
||||||
|
def limit_records(self, buckets):
|
||||||
|
for bucket in buckets:
|
||||||
|
for recorded_url in bucket:
|
||||||
|
hash_plus_url = b''.join(
|
||||||
|
(warcprox.digest_str(recorded_url.payload_digest,
|
||||||
|
self.options.base32),
|
||||||
|
recorded_url.url)
|
||||||
|
)
|
||||||
|
|
||||||
|
# use ait-job-id if available, or seed?
|
||||||
|
if (
|
||||||
|
recorded_url.warcprox_meta
|
||||||
|
and "metadata" in recorded_url.warcprox_meta
|
||||||
|
and "ait-job-id" in recorded_url.warcprox_meta["metadata"]
|
||||||
|
):
|
||||||
|
revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"]
|
||||||
|
else:
|
||||||
|
revisit_key = 'all'
|
||||||
|
|
||||||
|
# query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
|
||||||
|
query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn = psycopg2.connect(self.datasource)
|
conn = psycopg2.connect(self.datasource)
|
||||||
@ -498,7 +532,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix
|
|||||||
'''
|
'''
|
||||||
buckets = collections.defaultdict(list)
|
buckets = collections.defaultdict(list)
|
||||||
discards = []
|
discards = []
|
||||||
# for duplicate checks, see https://webarchive.jira.com/browse/WT-31
|
# current batch often contains new duplicates, so check for them here
|
||||||
|
# see https://webarchive.jira.com/browse/WT-31
|
||||||
hash_plus_urls = set()
|
hash_plus_urls = set()
|
||||||
for recorded_url in batch:
|
for recorded_url in batch:
|
||||||
if not recorded_url.payload_digest:
|
if not recorded_url.payload_digest:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user