diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0375ca1..fad7130 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -51,6 +51,7 @@ import gzip import mock import email.message import socketserver +from concurrent import futures try: import http.server as http_server @@ -886,6 +887,57 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() +def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): + urls_before = warcprox_.proxy.running_stats.urls + revisits_before = warcprox_.proxy.stats_db.value( + '__all__', 'revisit', 'urls') or 0 + + # fire off 20 initial requests simultaneously-ish + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, i) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + + # fire off 20 requests to the same urls but different buckets + # none should be deduped + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, -i - 1) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 40) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + + # fire off 20 requests same as the initial requests, all should be deduped + with futures.ThreadPoolExecutor(max_workers=20) as pool: + for i in range(20): + url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % ( + http_daemon.server_port, i) + headers = {"Warcprox-Meta": json.dumps({ + "warc-prefix":"test_dedup_buckets", + "dedup-bucket":"bucket_%s" % i})} + pool.submit( + requests.get, url, proxies=archiving_proxies, verify=False, + headers=headers) + + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 60) + assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + 20 + def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 4825e29..2dcc838 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -148,6 +148,8 @@ class BasePostfetchProcessor(threading.Thread): raise Exception('not implemented') def _run(self): + threading.current_thread().name = '%s(tid=%s)' % ( + threading.current_thread().name, gettid()) self.logger.info('%s starting up', self) self._startup() while not self.stop.is_set(): diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 81be2ea..5e26062 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -405,7 +405,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url.payload_digest, self.options.base32) if recorded_url.payload_digest else 'n/a') self.logger.debug( - 'filtered out digests (not loading dedup): %r', discards) + 'len(batch)=%s len(discards)=%s buckets=%s', + len(batch), len(discards), + {bucket: len(buckets[bucket]) for bucket in buckets}) return buckets def _build_key_index(self, batch): @@ -432,11 +434,12 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): fs = {} with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool: # send off the trough requests in parallel + key_indexes = {} for bucket in buckets: - key_index = self._build_key_index(buckets[bucket]) + key_indexes[bucket] = self._build_key_index(buckets[bucket]) future = pool.submit( self.trough_dedup_db.batch_lookup, - key_index.keys(), bucket) + key_indexes[bucket].keys(), bucket) fs[future] = bucket # process results as they come back @@ -444,6 +447,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for future in futures.as_completed(fs, timeout=20): bucket = fs[future] try: + key_index = key_indexes[bucket] for entry in future.result(): for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry @@ -459,8 +463,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): novel = sorted([ k for k in key_index.keys() if k not in dups]) self.logger.debug( - 'bucket %s: dups=%r novel=%r', - bucket, dups, novel) + 'bucket %s: dups(%s)=%r novel(%s)=%r', + bucket, len(dups), dups, len(novel), novel) except futures.TimeoutError as e: # the remaining threads actually keep running in this case, diff --git a/warcprox/writer.py b/warcprox/writer.py index 9ab249c..ffdca8f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -204,13 +204,14 @@ class WarcWriter: record.offset = offset record.length = warc.f.tell() - offset record.warc_filename = warc.finalname - self.logger.debug( + self.logger.trace( 'wrote warc record: warc_type=%s content_length=%s ' - 'url=%s warc=%s offset=%d', - record.get_header(warctools.WarcRecord.TYPE), + 'digest=%s offset=%d warc=%s url=%s', + record.type, record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - warc.path, record.offset) + record.get_header(b'WARC-Payload-Digest'), + record.offset, warc.path, + record.get_header(warctools.WarcRecord.URL)) return records diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index ef0bd2d..9d85b6f 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -117,19 +117,18 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): and self._filter_accepts(recorded_url)) def _log(self, recorded_url, records): - try: - payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8") - except: - payload_digest = "-" - # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} - type_ = records[0].type.decode("utf-8") if records else '-' + try: + payload_digest = records[0].get_header(b'WARC-Payload-Digest').decode('utf-8') + except: + payload_digest = '-' + type_ = records[0].type.decode('utf-8') if records else '-' filename = records[0].warc_filename if records else '-' offset = records[0].offset if records else '-' self.logger.info( - "%s %s %s %s %s size=%s %s %s %s offset=%s", + '%s %s %s %s %s size=%s %s %s %s offset=%s', recorded_url.client_ip, recorded_url.status, - recorded_url.method, recorded_url.url.decode("utf-8"), + recorded_url.method, recorded_url.url.decode('utf-8'), recorded_url.mimetype, recorded_url.size, payload_digest, type_, filename, offset)