Merge branch 'parallelize-trough' into qa

* parallelize-trough:
  parallelize trough dedup queries
  handle case where warc record id is missing
  bump minor version after these big changes
  Add --cdxserver-dedup-cookies option
  fix port conflict test failure on travis-ci
  Use socket.TCP_NODELAY to improve performance
This commit is contained in:
Noah Levitt 2018-01-19 16:35:56 -08:00
commit e4294509dd
6 changed files with 73 additions and 16 deletions

View File

@ -52,7 +52,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.3.1b4.dev138',
version='2.4b1.dev139',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -1396,8 +1396,7 @@ def test_controller_with_defaults():
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
def test_load_plugin():
options = warcprox.Options()
options.plugins = ['warcprox.stats.RunningStats']
options = warcprox.Options(port=0, plugins=['warcprox.stats.RunningStats'])
controller = warcprox.controller.WarcproxController(options)
assert isinstance(
controller._postfetch_chain[-1],

View File

@ -47,7 +47,7 @@ class Factory:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif options.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(
cdx_url=options.cdxserver_dedup)
cdx_url=options.cdxserver_dedup, options=options)
elif options.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None

View File

@ -33,6 +33,7 @@ import datetime
import urllib3
from urllib3.exceptions import HTTPError
import collections
from concurrent import futures
urllib3.disable_warnings()
@ -201,12 +202,15 @@ class CdxServerDedup(DedupDb):
"""Query a CDX server to perform deduplication.
"""
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
cookies = None
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
maxsize=200, options=warcprox.Options()):
self.cdx_url = cdx_url
self.options = options
self.http_pool = urllib3.PoolManager(maxsize=maxsize)
if options.cdxserver_dedup_cookies:
self.cookies = options.cdxserver_dedup_cookies
def start(self):
pass
@ -233,9 +237,10 @@ class CdxServerDedup(DedupDb):
"""
u = url.decode("utf-8") if isinstance(url, bytes) else url
try:
headers = {'Cookie': self.cookies} if self.cookies else {}
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1))
limit=-1), headers=headers)
assert result.status == 200
if isinstance(digest_key, bytes):
dkey = digest_key
@ -285,8 +290,26 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
self.trough_dedup_db.batch_save(buckets[bucket], bucket)
if not buckets:
return
fs = {}
with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
# send off requests in parallel
for bucket in buckets:
future = pool.submit(
self.trough_dedup_db.batch_save,
buckets[bucket], bucket)
fs[future] = bucket
# wait for results
try:
for future in futures.as_completed(fs, timeout=20):
pass
except futures.TimeoutError as e:
# the remaining threads actually keep running in this case,
# there's no way to stop them, but that should be harmless
logging.warn(
'timed out saving dedup info to trough', exc_info=True)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
@ -316,7 +339,13 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
def _build_key_index(self, batch):
'''
Returns `{digest_key: [recorded_url, ...]}`.
Builds index of RecordedUrl by digest key.
Args:
batch(list): list of RecordedUrl
Returns:
dict `{digest_key: [recorded_url, ...]}`
'''
key_index = collections.defaultdict(list)
for recorded_url in batch:
@ -327,13 +356,37 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
key_index = self._build_key_index(buckets[bucket])
results = self.trough_dedup_db.batch_lookup(
key_index.keys(), bucket)
for result in results:
for recorded_url in key_index[result['digest_key']]:
recorded_url.dedup_info = result
if not buckets:
return
fs = {}
with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
# send off the trough requests in parallel
for bucket in buckets:
key_index = self._build_key_index(buckets[bucket])
future = pool.submit(
self.trough_dedup_db.batch_lookup,
key_index.keys(), bucket)
fs[future] = bucket
# process results as they come back
try:
for future in futures.as_completed(fs, timeout=20):
bucket = fs[future]
try:
for entry in future.result():
for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry
except Exception as e:
# batch_lookup raised exception or something
logging.warn(
'problem looking up dedup info for %s urls '
'in bucket %s', len(buckets[bucket]), bucket,
exc_info=True)
except futures.TimeoutError as e:
# the remaining threads actually keep running in this case,
# there's no way to stop them, but that should be harmless
logging.warn(
'timed out loading dedup info from trough', exc_info=True)
class TroughDedupDb(DedupDb):
'''
@ -405,6 +458,7 @@ class TroughDedupDb(DedupDb):
return None
def batch_lookup(self, digest_keys, bucket='__unspecified__'):
'''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]'''
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
','.join('%s' for i in range(len(digest_keys))))
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
@ -415,7 +469,7 @@ class TroughDedupDb(DedupDb):
len(digest_keys), len(results))
assert len(results) >= 0 and len(results) <= len(digest_keys)
for result in results:
result['id'] = result['id'].encode('ascii')
result['id'] = result.get('id') and result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
result['date'] = result['date'].encode('ascii')
result['digest_key'] = result['digest_key'].encode('ascii')

View File

@ -145,6 +145,9 @@ def _build_arg_parser(prog='warcprox'):
'--rethinkdb-services-url', dest='rethinkdb_services_url', help=(
'rethinkdb service registry table url; if provided, warcprox '
'will create and heartbeat entry for itself'))
# optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2"
arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies',
help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,

View File

@ -245,6 +245,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
port=self.onion_tor_socks_proxy_port, rdns=True)
else:
self._remote_server_sock = socket.socket()
self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# XXX what value should this timeout have?
self._remote_server_sock.settimeout(60)