mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
parallelize trough dedup queries
Each dedup bucket (in archive-it, generally one per seed) requires a separate http request. The batches of urls processed by the trough dedup loader and storer may include multiple dedup buckets. This commit makes those all the trough queries in a given batch run in parallel, using a thread pool.
This commit is contained in:
parent
57abab100c
commit
7fb78ef1df
@ -33,6 +33,7 @@ import datetime
|
|||||||
import urllib3
|
import urllib3
|
||||||
from urllib3.exceptions import HTTPError
|
from urllib3.exceptions import HTTPError
|
||||||
import collections
|
import collections
|
||||||
|
from concurrent import futures
|
||||||
|
|
||||||
urllib3.disable_warnings()
|
urllib3.disable_warnings()
|
||||||
|
|
||||||
@ -289,8 +290,26 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
|
|||||||
|
|
||||||
def _process_batch(self, batch):
|
def _process_batch(self, batch):
|
||||||
buckets = self._filter_and_bucketize(batch)
|
buckets = self._filter_and_bucketize(batch)
|
||||||
for bucket in buckets:
|
if not buckets:
|
||||||
self.trough_dedup_db.batch_save(buckets[bucket], bucket)
|
return
|
||||||
|
fs = {}
|
||||||
|
with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
|
||||||
|
# send off requests in parallel
|
||||||
|
for bucket in buckets:
|
||||||
|
future = pool.submit(
|
||||||
|
self.trough_dedup_db.batch_save,
|
||||||
|
buckets[bucket], bucket)
|
||||||
|
fs[future] = bucket
|
||||||
|
|
||||||
|
# wait for results
|
||||||
|
try:
|
||||||
|
for future in futures.as_completed(fs, timeout=20):
|
||||||
|
pass
|
||||||
|
except futures.TimeoutError as e:
|
||||||
|
# the remaining threads actually keep running in this case,
|
||||||
|
# there's no way to stop them, but that should be harmless
|
||||||
|
logging.warn(
|
||||||
|
'timed out saving dedup info to trough', exc_info=True)
|
||||||
|
|
||||||
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
|
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
|
||||||
def __init__(self, trough_dedup_db, options=warcprox.Options()):
|
def __init__(self, trough_dedup_db, options=warcprox.Options()):
|
||||||
@ -320,7 +339,13 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
|
|||||||
|
|
||||||
def _build_key_index(self, batch):
|
def _build_key_index(self, batch):
|
||||||
'''
|
'''
|
||||||
Returns `{digest_key: [recorded_url, ...]}`.
|
Builds index of RecordedUrl by digest key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
batch(list): list of RecordedUrl
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict `{digest_key: [recorded_url, ...]}`
|
||||||
'''
|
'''
|
||||||
key_index = collections.defaultdict(list)
|
key_index = collections.defaultdict(list)
|
||||||
for recorded_url in batch:
|
for recorded_url in batch:
|
||||||
@ -331,13 +356,37 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
|
|||||||
|
|
||||||
def _process_batch(self, batch):
|
def _process_batch(self, batch):
|
||||||
buckets = self._filter_and_bucketize(batch)
|
buckets = self._filter_and_bucketize(batch)
|
||||||
for bucket in buckets:
|
if not buckets:
|
||||||
key_index = self._build_key_index(buckets[bucket])
|
return
|
||||||
results = self.trough_dedup_db.batch_lookup(
|
fs = {}
|
||||||
key_index.keys(), bucket)
|
with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
|
||||||
for result in results:
|
# send off the trough requests in parallel
|
||||||
for recorded_url in key_index[result['digest_key']]:
|
for bucket in buckets:
|
||||||
recorded_url.dedup_info = result
|
key_index = self._build_key_index(buckets[bucket])
|
||||||
|
future = pool.submit(
|
||||||
|
self.trough_dedup_db.batch_lookup,
|
||||||
|
key_index.keys(), bucket)
|
||||||
|
fs[future] = bucket
|
||||||
|
|
||||||
|
# process results as they come back
|
||||||
|
try:
|
||||||
|
for future in futures.as_completed(fs, timeout=20):
|
||||||
|
bucket = fs[future]
|
||||||
|
try:
|
||||||
|
for entry in future.result():
|
||||||
|
for recorded_url in key_index[entry['digest_key']]:
|
||||||
|
recorded_url.dedup_info = entry
|
||||||
|
except Exception as e:
|
||||||
|
# batch_lookup raised exception or something
|
||||||
|
logging.warn(
|
||||||
|
'problem looking up dedup info for %s urls '
|
||||||
|
'in bucket %s', len(buckets[bucket]), bucket,
|
||||||
|
exc_info=True)
|
||||||
|
except futures.TimeoutError as e:
|
||||||
|
# the remaining threads actually keep running in this case,
|
||||||
|
# there's no way to stop them, but that should be harmless
|
||||||
|
logging.warn(
|
||||||
|
'timed out loading dedup info from trough', exc_info=True)
|
||||||
|
|
||||||
class TroughDedupDb(DedupDb):
|
class TroughDedupDb(DedupDb):
|
||||||
'''
|
'''
|
||||||
@ -409,6 +458,7 @@ class TroughDedupDb(DedupDb):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def batch_lookup(self, digest_keys, bucket='__unspecified__'):
|
def batch_lookup(self, digest_keys, bucket='__unspecified__'):
|
||||||
|
'''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]'''
|
||||||
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
|
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
|
||||||
','.join('%s' for i in range(len(digest_keys))))
|
','.join('%s' for i in range(len(digest_keys))))
|
||||||
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
|
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user