CDX dedup improvements

Check for not empty captured content (`payload_size() > 0`) before
creating a new thread and running a CDX dedup request.
Most dedup modules perform the same check to avoid unnecessary dedup
requests.

Increase CDX dedup max workers from 200 to 400 in order to handle more
load.

Set `user-agent: warcprox` for HTTP requests we send to CDX server. Its
useful to identify and monitor `warcprox` requests.

Pass HTTP headers to connection pool on init and not on each request.
This commit is contained in:
Vangelis Banos 2018-04-03 20:51:51 +00:00
parent cff8423bef
commit 7c5c5da9b7

View File

@ -205,17 +205,18 @@ class CdxServerDedup(DedupDb):
cookies = None cookies = None
def __init__(self, cdx_url="https://web.archive.org/cdx/search", def __init__(self, cdx_url="https://web.archive.org/cdx/search",
maxsize=200, options=warcprox.Options()): maxsize=400, options=warcprox.Options()):
"""Initialize cdx server connection pool and related parameters. """Initialize cdx server connection pool and related parameters.
Use low timeout value and no retries to avoid blocking warcprox Use low timeout value and no retries to avoid blocking warcprox
operation by a slow CDX server. operation by a slow CDX server.
""" """
self.cdx_url = cdx_url self.cdx_url = cdx_url
self.options = options self.options = options
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, headers = {'user-agent': 'warcprox', 'accept': 'gzip/deflate'}
timeout=2.0)
if options.cdxserver_dedup_cookies: if options.cdxserver_dedup_cookies:
self.cookies = options.cdxserver_dedup_cookies headers['Cookie'] = options.cdxserver_dedup_cookies
self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0,
timeout=2.0, headers=headers)
def loader(self, *args, **kwargs): def loader(self, *args, **kwargs):
return CdxServerDedupLoader(self, self.options) return CdxServerDedupLoader(self, self.options)
@ -245,10 +246,9 @@ class CdxServerDedup(DedupDb):
""" """
u = url.decode("utf-8") if isinstance(url, bytes) else url u = url.decode("utf-8") if isinstance(url, bytes) else url
try: try:
headers = {'Cookie': self.cookies} if self.cookies else {}
result = self.http_pool.request('GET', self.cdx_url, fields=dict( result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1), headers=headers) limit=-1))
assert result.status == 200 assert result.status == 200
if isinstance(digest_key, bytes): if isinstance(digest_key, bytes):
dkey = digest_key dkey = digest_key
@ -276,14 +276,20 @@ class CdxServerDedup(DedupDb):
class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, cdx_dedup, options=warcprox.Options()): def __init__(self, cdx_dedup, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options) warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.pool = futures.ThreadPoolExecutor(max_workers=200) self.pool = futures.ThreadPoolExecutor(max_workers=400)
self.batch = set() self.batch = set()
self.cdx_dedup = cdx_dedup self.cdx_dedup = cdx_dedup
def _get_process_put(self): def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5) recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url) if (recorded_url.response_recorder
self.pool.submit(self._process_url, recorded_url) and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
else:
if self.outq:
self.outq.put(recorded_url)
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
try: try: