From 57abab100cd20da215e7105127d2bb5794f92faf Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Fri, 19 Jan 2018 14:38:54 -0800
Subject: [PATCH 1/2] handle case where warc record id is missing

... from trough dedup. Not sure why this error happened but we shouldn't
need that field anyway.
---
 warcprox/dedup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/warcprox/dedup.py b/warcprox/dedup.py
index cd3b397..c9b0079 100644
--- a/warcprox/dedup.py
+++ b/warcprox/dedup.py
@@ -419,7 +419,7 @@ class TroughDedupDb(DedupDb):
             len(digest_keys), len(results))
         assert len(results) >= 0 and len(results) <= len(digest_keys)
         for result in results:
-            result['id'] = result['id'].encode('ascii')
+            result['id'] = result.get('id') and result['id'].encode('ascii')
             result['url'] = result['url'].encode('ascii')
             result['date'] = result['date'].encode('ascii')
             result['digest_key'] = result['digest_key'].encode('ascii')

From 7fb78ef1df062ab63655125b93ce91ab516e3b38 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Fri, 19 Jan 2018 16:33:15 -0800
Subject: [PATCH 2/2] parallelize trough dedup queries

Each dedup bucket (in archive-it, generally one per seed) requires a
separate http request. The batches of urls processed by the trough dedup
loader and storer may include multiple dedup buckets. This commit makes
those all the trough queries in a given batch run in parallel, using a
thread pool.
---
 warcprox/dedup.py | 70 ++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 60 insertions(+), 10 deletions(-)

diff --git a/warcprox/dedup.py b/warcprox/dedup.py
index c9b0079..cb65408 100644
--- a/warcprox/dedup.py
+++ b/warcprox/dedup.py
@@ -33,6 +33,7 @@ import datetime
 import urllib3
 from urllib3.exceptions import HTTPError
 import collections
+from concurrent import futures
 
 urllib3.disable_warnings()
 
@@ -289,8 +290,26 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
 
     def _process_batch(self, batch):
         buckets = self._filter_and_bucketize(batch)
-        for bucket in buckets:
-            self.trough_dedup_db.batch_save(buckets[bucket], bucket)
+        if not buckets:
+            return
+        fs = {}
+        with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
+            # send off requests in parallel
+            for bucket in buckets:
+                future = pool.submit(
+                        self.trough_dedup_db.batch_save,
+                        buckets[bucket], bucket)
+                fs[future] = bucket
+
+            # wait for results
+            try:
+                for future in futures.as_completed(fs, timeout=20):
+                    pass
+            except futures.TimeoutError as e:
+                # the remaining threads actually keep running in this case,
+                # there's no way to stop them, but that should be harmless
+                logging.warn(
+                    'timed out saving dedup info to trough', exc_info=True)
 
 class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
     def __init__(self, trough_dedup_db, options=warcprox.Options()):
@@ -320,7 +339,13 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
 
     def _build_key_index(self, batch):
         '''
-        Returns `{digest_key: [recorded_url, ...]}`.
+        Builds index of RecordedUrl by digest key.
+
+        Args:
+            batch(list): list of RecordedUrl
+
+        Returns:
+            dict `{digest_key: [recorded_url, ...]}`
         '''
         key_index = collections.defaultdict(list)
         for recorded_url in batch:
@@ -331,13 +356,37 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
 
     def _process_batch(self, batch):
         buckets = self._filter_and_bucketize(batch)
-        for bucket in buckets:
-            key_index = self._build_key_index(buckets[bucket])
-            results = self.trough_dedup_db.batch_lookup(
-                    key_index.keys(), bucket)
-            for result in results:
-                for recorded_url in key_index[result['digest_key']]:
-                    recorded_url.dedup_info = result
+        if not buckets:
+            return
+        fs = {}
+        with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
+            # send off the trough requests in parallel
+            for bucket in buckets:
+                key_index = self._build_key_index(buckets[bucket])
+                future = pool.submit(
+                        self.trough_dedup_db.batch_lookup,
+                        key_index.keys(), bucket)
+                fs[future] = bucket
+
+            # process results as they come back
+            try:
+                for future in futures.as_completed(fs, timeout=20):
+                    bucket = fs[future]
+                    try:
+                        for entry in future.result():
+                            for recorded_url in key_index[entry['digest_key']]:
+                                recorded_url.dedup_info = entry
+                    except Exception as e:
+                        # batch_lookup raised exception or something
+                        logging.warn(
+                                'problem looking up dedup info for %s urls '
+                                'in bucket %s', len(buckets[bucket]), bucket,
+                                exc_info=True)
+            except futures.TimeoutError as e:
+                # the remaining threads actually keep running in this case,
+                # there's no way to stop them, but that should be harmless
+                logging.warn(
+                    'timed out loading dedup info from trough', exc_info=True)
 
 class TroughDedupDb(DedupDb):
     '''
@@ -409,6 +458,7 @@ class TroughDedupDb(DedupDb):
             return None
 
     def batch_lookup(self, digest_keys, bucket='__unspecified__'):
+        '''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]'''
         sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
                 ','.join('%s' for i in range(len(digest_keys))))
         results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)