batch storing for trough dedup

This commit is contained in:
Noah Levitt 2018-01-17 16:49:28 -08:00
parent a974ec86fa
commit c933cb3119
3 changed files with 66 additions and 9 deletions

View File

@ -106,6 +106,7 @@ class BasePostfetchProcessor(threading.Thread):
# these should be set before thread is started
self.inq = None
self.outq = None
self.profiler = None
def run(self):
if self.options.profile:
@ -129,6 +130,7 @@ class BasePostfetchProcessor(threading.Thread):
raise Exception('not implemented')
def _run(self):
logging.info('%s starting up', self)
self._startup()
while not self.stop.is_set():
try:

View File

@ -167,20 +167,22 @@ class WarcproxController(object):
if self.playback_proxy:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.playback_proxy.playback_index_db))
self.playback_proxy.playback_index_db, self.options))
crawl_logger = Factory.crawl_logger(self.options)
if crawl_logger:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(crawl_logger))
warcprox.ListenerPostfetchProcessor(
crawl_logger, self.options))
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(self.proxy.running_stats))
warcprox.ListenerPostfetchProcessor(
self.proxy.running_stats, self.options))
for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname)
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(plugin))
warcprox.ListenerPostfetchProcessor(plugin, self.options))
# chain them all up
self._postfetch_chain[0].inq = inq
@ -285,7 +287,6 @@ class WarcproxController(object):
for processor in self._postfetch_chain:
processor.start()
logging.info('started postfetch processor %r', processor)
def shutdown(self):
with self._start_stop_lock:
@ -390,6 +391,9 @@ class WarcproxController(object):
# postfetch processors
for processor in self._postfetch_chain:
if not processor.profiler:
self.logger.notice('%s has no profiling data', processor)
continue
file = os.path.join(tmpdir, '%s.dat' % processor.ident)
processor.profiler.dump_stats(file)
buf = io.StringIO()

View File

@ -260,11 +260,42 @@ class CdxServerDedup(DedupDb):
"""
pass
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _filter_and_bucketize(self, batch):
'''
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should
have dedup info stored.
'''
buckets = collections.defaultdict(list)
for recorded_url in batch:
if (recorded_url.warc_records
and recorded_url.warc_records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
return buckets
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
self.trough_dedup_db.batch_save(buckets[bucket], bucket)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _startup(self):
self.trough_dedup_db.start()
def _filter_and_bucketize(self, batch):
'''
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not
@ -275,7 +306,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
else:
bucket = '__unspecified__'
@ -315,7 +347,8 @@ class TroughDedupDb(DedupDb):
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) '
WRITE_SQL_TMPL = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()):
@ -323,8 +356,11 @@ class TroughDedupDb(DedupDb):
self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60)
def loader(self, options=warcprox.Options()):
return BatchTroughLoader(self, options)
def loader(self, *args, **kwargs):
return BatchTroughLoader(self, self.options)
def storer(self, *args, **kwargs):
return BatchTroughStorer(self, self.options)
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
@ -337,6 +373,21 @@ class TroughDedupDb(DedupDb):
bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
def batch_save(self, batch, bucket='__unspecified__'):
sql_tmpl = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values %s;' % ','.join(
'(%s,%s,%s,%s)' for i in range(len(batch))))
values = []
for recorded_url in batch:
values.extend([
warcprox.digest_str(
recorded_url.payload_digest, self.options.base32),
recorded_url.url,
recorded_url.warc_records[0].date,
recorded_url.warc_records[0].id,])
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;',