mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
batch trough dedup loader
This commit is contained in:
parent
6ab73764ea
commit
d4bbaf10b7
@ -291,7 +291,6 @@ class WarcproxController(object):
|
|||||||
self.playback_proxy_thread.start()
|
self.playback_proxy_thread.start()
|
||||||
|
|
||||||
for processor in self._postfetch_chain:
|
for processor in self._postfetch_chain:
|
||||||
# logging.info('starting postfetch processor %r', processor)
|
|
||||||
processor.start()
|
processor.start()
|
||||||
logging.info('started postfetch processor %r', processor)
|
logging.info('started postfetch processor %r', processor)
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
'''
|
'''
|
||||||
warcprox/dedup.py - identical payload digest deduplication using sqlite db
|
warcprox/dedup.py - identical payload digest deduplication using sqlite db
|
||||||
|
|
||||||
Copyright (C) 2013-2017 Internet Archive
|
Copyright (C) 2013-2018 Internet Archive
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
This program is free software; you can redistribute it and/or
|
||||||
modify it under the terms of the GNU General Public License
|
modify it under the terms of the GNU General Public License
|
||||||
@ -32,17 +32,18 @@ import doublethink
|
|||||||
import datetime
|
import datetime
|
||||||
import urllib3
|
import urllib3
|
||||||
from urllib3.exceptions import HTTPError
|
from urllib3.exceptions import HTTPError
|
||||||
|
import collections
|
||||||
|
|
||||||
urllib3.disable_warnings()
|
urllib3.disable_warnings()
|
||||||
|
|
||||||
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
|
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
|
||||||
def __init__(self, dedup_db, inq, outq, base32=False, profile=False):
|
def __init__(self, dedup_db, inq, outq, options=warcprox.Options()):
|
||||||
warcprox.BaseStandardPostfetchProcessor.__init__(
|
warcprox.BaseStandardPostfetchProcessor.__init__(
|
||||||
self, inq, outq, profile)
|
self, inq, outq, profile)
|
||||||
self.dedup_db = dedup_db
|
self.dedup_db = dedup_db
|
||||||
self.base32 = base32
|
|
||||||
def _process_url(self, recorded_url):
|
def _process_url(self, recorded_url):
|
||||||
decorate_with_dedup_info(self.dedup_db, recorded_url, self.base32)
|
decorate_with_dedup_info(
|
||||||
|
self.dedup_db, recorded_url, self.options.base32)
|
||||||
|
|
||||||
class DedupDb(object):
|
class DedupDb(object):
|
||||||
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
||||||
@ -259,6 +260,48 @@ class CdxServerDedup(DedupDb):
|
|||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
|
||||||
|
def __init__(self, trough_dedup_db, inq, outq, options=warcprox.Options()):
|
||||||
|
warcprox.BaseBatchPostfetchProcessor.__init__(self, inq, outq, options)
|
||||||
|
self.trough_dedup_db = trough_dedup_db
|
||||||
|
|
||||||
|
def _filter_and_bucketize(self, batch):
|
||||||
|
'''
|
||||||
|
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not
|
||||||
|
be looked up.
|
||||||
|
'''
|
||||||
|
buckets = collections.defaultdict(list)
|
||||||
|
for recorded_url in batch:
|
||||||
|
if (recorded_url.response_recorder
|
||||||
|
and recorded_url.payload_digest
|
||||||
|
and recorded_url.response_recorder.payload_size() > 0):
|
||||||
|
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
|
||||||
|
bucket = recorded_url.warcprox_meta['captures-bucket']
|
||||||
|
else:
|
||||||
|
bucket = '__unspecified__'
|
||||||
|
buckets[bucket].append(recorded_url)
|
||||||
|
return buckets
|
||||||
|
|
||||||
|
def _build_key_index(self, batch):
|
||||||
|
'''
|
||||||
|
Returns `{digest_key: [recorded_url, ...]}`.
|
||||||
|
'''
|
||||||
|
key_index = collections.defaultdict(list)
|
||||||
|
for recorded_url in batch:
|
||||||
|
digest_key = warcprox.digest_str(
|
||||||
|
recorded_url.payload_digest, self.options.base32)
|
||||||
|
key_index[digest_key].append(recorded_url)
|
||||||
|
return key_index
|
||||||
|
|
||||||
|
def _process_batch(self, batch):
|
||||||
|
buckets = self._filter_and_bucketize(batch)
|
||||||
|
for bucket in buckets:
|
||||||
|
key_index = self._build_key_index(buckets[bucket])
|
||||||
|
results = self.trough_dedup_db.batch_lookup(key_index.keys(), bucket)
|
||||||
|
for result in results:
|
||||||
|
for recorded_url in key_index[result['digest_key']]:
|
||||||
|
recorded_url.dedup_info = result
|
||||||
|
|
||||||
class TroughDedupDb(DedupDb):
|
class TroughDedupDb(DedupDb):
|
||||||
'''
|
'''
|
||||||
https://github.com/internetarchive/trough
|
https://github.com/internetarchive/trough
|
||||||
@ -279,6 +322,9 @@ class TroughDedupDb(DedupDb):
|
|||||||
self._trough_cli = warcprox.trough.TroughClient(
|
self._trough_cli = warcprox.trough.TroughClient(
|
||||||
options.rethinkdb_trough_db_url, promotion_interval=60*60)
|
options.rethinkdb_trough_db_url, promotion_interval=60*60)
|
||||||
|
|
||||||
|
def loader(self, inq, outq, options=warcprox.Options()):
|
||||||
|
return BatchTroughLoader(self, inq, outq, options)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
||||||
|
|
||||||
@ -306,6 +352,23 @@ class TroughDedupDb(DedupDb):
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def batch_lookup(self, digest_keys, bucket='__unspecified__'):
|
||||||
|
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
|
||||||
|
','.join('%s' for i in range(len(digest_keys))))
|
||||||
|
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
|
||||||
|
if results is None:
|
||||||
|
return []
|
||||||
|
self.logger.debug(
|
||||||
|
'trough batch lookup of %s keys returned %s results',
|
||||||
|
len(digest_keys), len(results))
|
||||||
|
assert len(results) >= 0 and len(results) <= len(digest_keys)
|
||||||
|
for result in results:
|
||||||
|
result['id'] = result['id'].encode('ascii')
|
||||||
|
result['url'] = result['url'].encode('ascii')
|
||||||
|
result['date'] = result['date'].encode('ascii')
|
||||||
|
result['digest_key'] = result['digest_key'].encode('ascii')
|
||||||
|
return results
|
||||||
|
|
||||||
def notify(self, recorded_url, records):
|
def notify(self, recorded_url, records):
|
||||||
if (records and records[0].type == b'response'
|
if (records and records[0].type == b'response'
|
||||||
and recorded_url.response_recorder.payload_size() > 0):
|
and recorded_url.response_recorder.payload_size() > 0):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user