Merge pull request #81 from vbanos/cdxdedup-improvements2

CDX dedup improvements
This commit is contained in:
Noah Levitt 2018-04-06 13:26:28 -07:00 committed by GitHub
commit 797e33b91d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -205,17 +205,18 @@ class CdxServerDedup(DedupDb):
cookies = None cookies = None
def __init__(self, cdx_url="https://web.archive.org/cdx/search", def __init__(self, cdx_url="https://web.archive.org/cdx/search",
maxsize=200, options=warcprox.Options()): maxsize=400, options=warcprox.Options()):
"""Initialize cdx server connection pool and related parameters. """Initialize cdx server connection pool and related parameters.
Use low timeout value and no retries to avoid blocking warcprox Use low timeout value and no retries to avoid blocking warcprox
operation by a slow CDX server. operation by a slow CDX server.
""" """
self.cdx_url = cdx_url self.cdx_url = cdx_url
self.options = options self.options = options
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, headers = {'User-Agent': 'warcprox', 'Accept-Encoding': 'gzip, deflate'}
timeout=2.0)
if options.cdxserver_dedup_cookies: if options.cdxserver_dedup_cookies:
self.cookies = options.cdxserver_dedup_cookies headers['Cookie'] = options.cdxserver_dedup_cookies
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0,
timeout=2.0, headers=headers)
def loader(self, *args, **kwargs): def loader(self, *args, **kwargs):
return CdxServerDedupLoader(self, self.options) return CdxServerDedupLoader(self, self.options)
@ -245,10 +246,9 @@ class CdxServerDedup(DedupDb):
""" """
u = url.decode("utf-8") if isinstance(url, bytes) else url u = url.decode("utf-8") if isinstance(url, bytes) else url
try: try:
headers = {'Cookie': self.cookies} if self.cookies else {}
result = self.http_pool.request('GET', self.cdx_url, fields=dict( result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1), headers=headers) limit=-1))
assert result.status == 200 assert result.status == 200
if isinstance(digest_key, bytes): if isinstance(digest_key, bytes):
dkey = digest_key dkey = digest_key
@ -276,14 +276,20 @@ class CdxServerDedup(DedupDb):
class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, cdx_dedup, options=warcprox.Options()): def __init__(self, cdx_dedup, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options) warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.pool = futures.ThreadPoolExecutor(max_workers=200) self.pool = futures.ThreadPoolExecutor(max_workers=400)
self.batch = set() self.batch = set()
self.cdx_dedup = cdx_dedup self.cdx_dedup = cdx_dedup
def _get_process_put(self): def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5) recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url) if (recorded_url.response_recorder
self.pool.submit(self._process_url, recorded_url) and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
else:
if self.outq:
self.outq.put(recorded_url)
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
try: try: