From 5631eaced1d175d0f733e4a9441c0af5c0510609 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 23 Jan 2018 23:16:35 +0000 Subject: [PATCH] Parallelize CDX Server dedup queries --- warcprox/dedup.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index cb65408..d5ff7c8 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -212,6 +212,9 @@ class CdxServerDedup(DedupDb): if options.cdxserver_dedup_cookies: self.cookies = options.cdxserver_dedup_cookies + def loader(self, *args, **kwargs): + return CdxServerDedupLoader(self, self.options) + def start(self): pass @@ -265,6 +268,33 @@ class CdxServerDedup(DedupDb): """ pass +class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): + def __init__(self, cdx_dedup, options=warcprox.Options()): + warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + self.pool = futures.ThreadPoolExecutor(max_workers=50) + self.batch = set() + self.cdx_dedup = cdx_dedup + + def _get_process_put(self): + recorded_url = self.inq.get(block=True, timeout=0.5) + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) + + def _process_url(self, recorded_url): + try: + digest_key = warcprox.digest_str(recorded_url.payload_digest, + self.options.base32) + dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url) + if dedup_info: + recorded_url.dedup_info = dedup_info + except ValueError as exc: + self.logger.error('CdxServerDedupLoader _process_url failed for url=%s %s', + recorded_url.url, exc) + finally: + self.batch.remove(recorded_url) + if self.outq: + self.outq.put(recorded_url) + class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options)