From 957bd079e8e40a0f1412127fc1e44f9329bb6bb8 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 30 May 2019 19:27:46 -0700 Subject: [PATCH 1/8] WIP (untested): handle multiple dedup-buckets, rw or ro --- warcprox/bigtable.py | 7 ++++-- warcprox/dedup.py | 58 +++++++++++++++++++++++++------------------ warcprox/warcproxy.py | 3 ++- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 4df8ab3..ff2ad0a 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -157,8 +157,11 @@ class RethinkCaptures: sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") if (recorded_url.warcprox_meta - and "dedup-bucket" in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta["dedup-bucket"] + and "dedup-buckets" in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + # maybe this is the right thing to do here? or should we return an entry for each? or ? + break else: bucket = "__unspecified__" diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7889cd9..9562fa5 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -47,11 +47,11 @@ class DedupableMixin(object): def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. - When we use option --dedup-only-with-bucket, `dedup-bucket` is required + When we use option --dedup-only-with-bucket, `dedup-buckets` is required in Warcprox-Meta to perform dedup. Return Boolean. """ - if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: + if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta: return False if recorded_url.is_text(): return recorded_url.response_recorder.payload_size() > self.min_text_size @@ -69,10 +69,13 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): and recorded_url.payload_digest and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = self.dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["dedup-bucket"], - recorded_url.url) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, bucket, recorded_url.url) + if recorded_url.dedup_info: + # we found an existing capture + break else: recorded_url.dedup_info = self.dedup_db.lookup( digest_key, url=recorded_url.url) @@ -148,10 +151,12 @@ class DedupDb(DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - self.save( - digest_key, records[0], - bucket=recorded_url.warcprox_meta["dedup-bucket"]) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == "ro": + self.save( + digest_key, records[0], + bucket=bucket) else: self.save(digest_key, records[0]) @@ -213,8 +218,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: - self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) + if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + self.save(digest_key, records[0], bucket=bucket) else: self.save(digest_key, records[0]) @@ -347,11 +354,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): and recorded_url.warc_records[0].type == b'response' and self.trough_dedup_db.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'dedup-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['dedup-bucket'] + and 'dedup-buckets' in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + buckets[bucket].append(recorded_url) else: - bucket = '__unspecified__' - buckets[bucket].append(recorded_url) + buckets['__unspecified__'].append(recorded_url) return buckets def _process_batch(self, batch): @@ -399,11 +407,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): and recorded_url.payload_digest and self.trough_dedup_db.should_dedup(recorded_url)): if (recorded_url.warcprox_meta - and 'dedup-bucket' in recorded_url.warcprox_meta): - bucket = recorded_url.warcprox_meta['dedup-bucket'] + and 'dedup-buckets' in recorded_url.warcprox_meta): + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + buckets[bucket].append(recorded_url) else: - bucket = '__unspecified__' - buckets[bucket].append(recorded_url) + buckets['__unspecified__'].append(recorded_url) else: discards.append( warcprox.digest_str( @@ -576,9 +584,11 @@ class TroughDedupDb(DedupDb, DedupableMixin): and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: - self.save( - digest_key, records[0], - bucket=recorded_url.warcprox_meta['dedup-bucket']) + if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta: + for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): + if not bucket_mode == 'ro': + self.save( + digest_key, records[0], + bucket=bucket) else: self.save(digest_key, records[0]) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index e5b35d2..625138b 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -377,7 +377,8 @@ class RecordedUrl: if warcprox_meta: if 'captures-bucket' in warcprox_meta: # backward compatibility - warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] + warcprox_meta['dedup-buckets'] = {} + warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw' del warcprox_meta['captures-bucket'] self.warcprox_meta = warcprox_meta else: From 6ee7ab36a20478340711efbe662a339436f46dde Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Fri, 31 May 2019 17:36:13 -0700 Subject: [PATCH 2/8] fix tests too --- tests/test_warcprox.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index d34bb43..3f803c2 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -790,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) # archive url1 bucket_a - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_a":"rw"}})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -816,7 +816,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, assert dedup_lookup is None # archive url2 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -845,7 +845,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -928,7 +928,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, i) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -944,7 +944,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, -i - 1) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -959,7 +959,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin http_daemon.server_port, i) headers = {"Warcprox-Meta": json.dumps({ "warc-prefix":"test_dedup_buckets", - "dedup-bucket":"bucket_%s" % i})} + "dedup-buckets":{"bucket_%s" % i:"rw"}})} pool.submit( requests.get, url, proxies=archiving_proxies, verify=False, headers=headers) @@ -1500,7 +1500,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive with dedup_ok:False - request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False} + request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''},'dedup-ok':False} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) @@ -1518,7 +1518,7 @@ def test_dedup_ok_flag( assert dedup_lookup is None # archive without dedup_ok:False - request_meta = {'dedup-bucket':'test_dedup_ok_flag'} + request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''}} headers = {'Warcprox-Meta': json.dumps(request_meta)} response = requests.get( url, proxies=archiving_proxies, headers=headers, verify=False) From d13356506176c1eccc2cc19a03b520b12d066456 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Tue, 4 Jun 2019 14:53:06 -0700 Subject: [PATCH 3/8] continue support for _singular_ dedup-bucket --- warcprox/warcproxy.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 625138b..9d23244 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -380,6 +380,11 @@ class RecordedUrl: warcprox_meta['dedup-buckets'] = {} warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw' del warcprox_meta['captures-bucket'] + if 'dedup-bucket' in warcprox_meta: + # more backwards compatibility + warcprox_meta['dedup-buckets'] = {} + warcprox_meta['dedup-buckets'][warcprox_meta['dedup-bucket']] = 'rw' + del warcprox_meta['dedup-bucket'] self.warcprox_meta = warcprox_meta else: self.warcprox_meta = {} From 8c52bd8442d75e0a0da628610e77ab5979266980 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 13 Jun 2019 17:18:51 -0700 Subject: [PATCH 4/8] docs updates --- README.rst | 7 ++++--- api.rst | 10 ++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index b7b5c17..77e7e58 100644 --- a/README.rst +++ b/README.rst @@ -89,12 +89,13 @@ for deduplication works similarly to deduplication by `Heritrix 4. If not found, a. Write ``response`` record with full payload - b. Store new entry in deduplication database + b. Store new entry in deduplication database (can be disabled, see + `Warcprox-Meta HTTP request header ` The deduplication database is partitioned into different "buckets". URLs are deduplicated only against other captures in the same bucket. If specified, the -``dedup-bucket`` field of the `Warcprox-Meta HTTP request header -`_ determines the bucket. Otherwise, +``dedup-buckets`` field of the `Warcprox-Meta HTTP request header +`_ determines the bucket(s). Otherwise, the default bucket is used. Deduplication can be disabled entirely by starting warcprox with the argument diff --git a/api.rst b/api.rst index 1da1898..eee3219 100644 --- a/api.rst +++ b/api.rst @@ -137,14 +137,16 @@ Example:: Warcprox-Meta: {"warc-prefix": "special-warc"} -``dedup-bucket`` (string) +``dedup-buckets`` (string) ~~~~~~~~~~~~~~~~~~~~~~~~~ -Specifies the deduplication bucket. For more information about deduplication +Specifies the deduplication bucket(s). For more information about deduplication see ``_. -Example:: +Examples:: - Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"} + Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw"}} + + Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw", "my-read-only-dedup-bucket": "ro"}} ``blocks`` (list) ~~~~~~~~~~~~~~~~~ From 51c4f6d6222a9aac1543bad94a06f6233d6b0b64 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 13 Jun 2019 17:57:29 -0700 Subject: [PATCH 5/8] test_dedup_buckets_multiple --- tests/test_warcprox.py | 65 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 3f803c2..d051128 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -916,6 +916,71 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() +def test_dedup_buckets_multiple(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) + + # archive url1 + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_multiple", + "dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}}) + } + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + # check url1 in dedup db bucket_1 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 not in dedup db bucket_2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2") + assert dedup_lookup is None + + # close the warc + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] + warc_path = os.path.join(writer.directory, writer.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"].close() + assert os.path.exists(warc_path) + + # read the warc + fh = warctools.ArchiveRecord.open_archive(warc_path) + record_iter = fh.read_records(limit=None, offsets=True) + try: + (offset, record, errors) = next(record_iter) + assert record.type == b'warcinfo' + + # url1 bucket_1 + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url1.encode('ascii') + # check for duplicate warc record headers + assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # that's all folks + assert next(record_iter)[1] == None + assert next(record_iter, None) == None + + finally: + fh.close() + def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): urls_before = warcprox_.proxy.running_stats.urls revisits_before = warcprox_.proxy.stats_db.value( From 79aab697e2331ef0a564a273956f3d6ed5ea2035 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Fri, 14 Jun 2019 12:42:25 -0700 Subject: [PATCH 6/8] more tests --- tests/test_warcprox.py | 94 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 88 insertions(+), 6 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index d051128..0c1ae3f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -845,7 +845,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) # archive url1 bucket_b - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})} + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) assert response.status_code == 200 assert response.headers['warcprox-test-header'] == 'k!' @@ -916,13 +916,95 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() -def test_dedup_buckets_multiple(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): +def test_multiple_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + urls_before = warcprox_.proxy.running_stats.urls + + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) + + # archive url1 bucket_a1, bucket_b2, bucket_c3 + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_multiple_dedup_buckets", + "dedup-buckets":{"bucket_a1":"rw", "bucket_b2":"", "bucket_c3":"rw"} + })} + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for postfetch chain + wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) + + # check url1 in dedup db bucket_a1 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a1') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a1") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 in dedup db bucket_b2 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_b2') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b2") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 in dedup db bucket_c3 + # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_c3') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_c3") + assert dedup_lookup + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # close the warc + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] + warc_path = os.path.join(writer.directory, writer.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"].close() + assert os.path.exists(warc_path) + + # read the warc # should we bother with this? + fh = warctools.ArchiveRecord.open_archive(warc_path) + record_iter = fh.read_records(limit=None, offsets=True) + try: + (offset, record, errors) = next(record_iter) + assert record.type == b'warcinfo' + + # url1 bucket_a + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url1.encode('ascii') + # check for duplicate warc record headers + assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # that's all folks + assert next(record_iter)[1] == None + assert next(record_iter, None) == None + + finally: + fh.close() + +def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) # archive url1 - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_multiple", + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_readonly", "dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}}) } response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) @@ -950,11 +1032,11 @@ def test_dedup_buckets_multiple(https_daemon, http_daemon, warcprox_, archiving_ assert dedup_lookup is None # close the warc - assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] - writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"] + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"] warc_path = os.path.join(writer.directory, writer.finalname) assert not os.path.exists(warc_path) - warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_multiple"].close() + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"].close() assert os.path.exists(warc_path) # read the warc From c0fcf59c86d68cbc7d49698918ea5b57bf82351e Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Fri, 14 Jun 2019 13:34:47 -0700 Subject: [PATCH 7/8] rm test not matching use case --- tests/test_warcprox.py | 86 +----------------------------------------- 1 file changed, 2 insertions(+), 84 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0c1ae3f..884ddd4 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -916,88 +916,6 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, finally: fh.close() -def test_multiple_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): - urls_before = warcprox_.proxy.running_stats.urls - - url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) - - # archive url1 bucket_a1, bucket_b2, bucket_c3 - headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_multiple_dedup_buckets", - "dedup-buckets":{"bucket_a1":"rw", "bucket_b2":"", "bucket_c3":"rw"} - })} - response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) - assert response.status_code == 200 - assert response.headers['warcprox-test-header'] == 'k!' - assert response.content == b'I am the warcprox test payload! llllllllll!\n' - - # wait for postfetch chain - wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) - - # check url1 in dedup db bucket_a1 - # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a1') - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a1") - assert dedup_lookup - assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) - assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] - dedup_date = dedup_lookup['date'] - - # check url1 in dedup db bucket_b2 - # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_b2') - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b2") - assert dedup_lookup - assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) - assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] - dedup_date = dedup_lookup['date'] - - # check url1 in dedup db bucket_c3 - # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_c3') - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_c3") - assert dedup_lookup - assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) - assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] - dedup_date = dedup_lookup['date'] - - # close the warc - assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] - writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"] - warc_path = os.path.join(writer.directory, writer.finalname) - assert not os.path.exists(warc_path) - warcprox_.warc_writer_processor.writer_pool.warc_writers["test_multiple_dedup_buckets"].close() - assert os.path.exists(warc_path) - - # read the warc # should we bother with this? - fh = warctools.ArchiveRecord.open_archive(warc_path) - record_iter = fh.read_records(limit=None, offsets=True) - try: - (offset, record, errors) = next(record_iter) - assert record.type == b'warcinfo' - - # url1 bucket_a - (offset, record, errors) = next(record_iter) - assert record.type == b'response' - assert record.url == url1.encode('ascii') - # check for duplicate warc record headers - assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1 - assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' - (offset, record, errors) = next(record_iter) - assert record.type == b'request' - - # that's all folks - assert next(record_iter)[1] == None - assert next(record_iter, None) == None - - finally: - fh.close() - def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls @@ -1015,7 +933,7 @@ def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_ # wait for postfetch chain wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) - # check url1 in dedup db bucket_1 + # check url1 in dedup db bucket_1 (rw) # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1') dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1") @@ -1026,7 +944,7 @@ def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_ record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] - # check url1 not in dedup db bucket_2 + # check url1 not in dedup db bucket_2 (ro) dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2") assert dedup_lookup is None From 48d96fbc790b63970788224cf0b31f5926d50876 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 20 Jun 2019 14:52:28 -0700 Subject: [PATCH 8/8] fix link --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 77e7e58..59951ae 100644 --- a/README.rst +++ b/README.rst @@ -90,7 +90,7 @@ for deduplication works similarly to deduplication by `Heritrix a. Write ``response`` record with full payload b. Store new entry in deduplication database (can be disabled, see - `Warcprox-Meta HTTP request header ` + `Warcprox-Meta HTTP request header `_) The deduplication database is partitioned into different "buckets". URLs are deduplicated only against other captures in the same bucket. If specified, the