diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8d63f96..6280f77 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -205,17 +205,18 @@ class CdxServerDedup(DedupDb): cookies = None def __init__(self, cdx_url="https://web.archive.org/cdx/search", - maxsize=200, options=warcprox.Options()): + maxsize=400, options=warcprox.Options()): """Initialize cdx server connection pool and related parameters. Use low timeout value and no retries to avoid blocking warcprox operation by a slow CDX server. """ self.cdx_url = cdx_url self.options = options - self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, - timeout=2.0) + headers = {'user-agent': 'warcprox', 'accept': 'gzip/deflate'} if options.cdxserver_dedup_cookies: - self.cookies = options.cdxserver_dedup_cookies + headers['Cookie'] = options.cdxserver_dedup_cookies + self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, + timeout=2.0, headers=headers) def loader(self, *args, **kwargs): return CdxServerDedupLoader(self, self.options) @@ -245,10 +246,9 @@ class CdxServerDedup(DedupDb): """ u = url.decode("utf-8") if isinstance(url, bytes) else url try: - headers = {'Cookie': self.cookies} if self.cookies else {} result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-1), headers=headers) + limit=-1)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key @@ -276,14 +276,20 @@ class CdxServerDedup(DedupDb): class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=200) + self.pool = futures.ThreadPoolExecutor(max_workers=400) self.batch = set() self.cdx_dedup = cdx_dedup def _get_process_put(self): recorded_url = self.inq.get(block=True, timeout=0.5) - self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and recorded_url.response_recorder.payload_size() > 0): + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) + else: + if self.outq: + self.outq.put(recorded_url) def _process_url(self, recorded_url): try: