Parallelize CDX Server dedup queries

This commit is contained in:
Vangelis Banos 2018-01-23 23:16:35 +00:00
parent 1cfb4d46c6
commit 5631eaced1

View File

@ -212,6 +212,9 @@ class CdxServerDedup(DedupDb):
if options.cdxserver_dedup_cookies:
self.cookies = options.cdxserver_dedup_cookies
def loader(self, *args, **kwargs):
return CdxServerDedupLoader(self, self.options)
def start(self):
pass
@ -265,6 +268,33 @@ class CdxServerDedup(DedupDb):
"""
pass
class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, cdx_dedup, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.pool = futures.ThreadPoolExecutor(max_workers=50)
self.batch = set()
self.cdx_dedup = cdx_dedup
def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
def _process_url(self, recorded_url):
try:
digest_key = warcprox.digest_str(recorded_url.payload_digest,
self.options.base32)
dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url)
if dedup_info:
recorded_url.dedup_info = dedup_info
except ValueError as exc:
self.logger.error('CdxServerDedupLoader _process_url failed for url=%s %s',
recorded_url.url, exc)
finally:
self.batch.remove(recorded_url)
if self.outq:
self.outq.put(recorded_url)
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)