Merge pull request #133 from galgeek/dedup-fixes

handle multiple dedup-buckets, rw or ro (and dedup brozzler test crawls against collection seed)
This commit is contained in:
Noah Levitt 2019-06-20 14:57:20 -07:00 committed by GitHub
commit a4253d5425
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 128 additions and 41 deletions

View File

@ -89,12 +89,13 @@ for deduplication works similarly to deduplication by `Heritrix
4. If not found, 4. If not found,
a. Write ``response`` record with full payload a. Write ``response`` record with full payload
b. Store new entry in deduplication database b. Store new entry in deduplication database (can be disabled, see
`Warcprox-Meta HTTP request header <api.rst#warcprox-meta-http-request-header>`_)
The deduplication database is partitioned into different "buckets". URLs are The deduplication database is partitioned into different "buckets". URLs are
deduplicated only against other captures in the same bucket. If specified, the deduplicated only against other captures in the same bucket. If specified, the
``dedup-bucket`` field of the `Warcprox-Meta HTTP request header ``dedup-buckets`` field of the `Warcprox-Meta HTTP request header
<api.rst#warcprox-meta-http-request-header>`_ determines the bucket. Otherwise, <api.rst#warcprox-meta-http-request-header>`_ determines the bucket(s). Otherwise,
the default bucket is used. the default bucket is used.
Deduplication can be disabled entirely by starting warcprox with the argument Deduplication can be disabled entirely by starting warcprox with the argument

10
api.rst
View File

@ -137,14 +137,16 @@ Example::
Warcprox-Meta: {"warc-prefix": "special-warc"} Warcprox-Meta: {"warc-prefix": "special-warc"}
``dedup-bucket`` (string) ``dedup-buckets`` (string)
~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~
Specifies the deduplication bucket. For more information about deduplication Specifies the deduplication bucket(s). For more information about deduplication
see `<README.rst#deduplication>`_. see `<README.rst#deduplication>`_.
Example:: Examples::
Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"} Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw"}}
Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw", "my-read-only-dedup-bucket": "ro"}}
``blocks`` (list) ``blocks`` (list)
~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~

View File

@ -790,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
# archive url1 bucket_a # archive url1 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_a":"rw"}})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
@ -816,7 +816,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert dedup_lookup is None assert dedup_lookup is None
# archive url2 bucket_b # archive url2 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
@ -916,6 +916,71 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
finally: finally:
fh.close() fh.close()
def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
# archive url1
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_readonly",
"dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}})
}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check url1 in dedup db bucket_1 (rw)
# logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1")
assert dedup_lookup
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
record_id = dedup_lookup['id']
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_2 (ro)
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2")
assert dedup_lookup is None
# close the warc
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"]
warc_path = os.path.join(writer.directory, writer.finalname)
assert not os.path.exists(warc_path)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"].close()
assert os.path.exists(warc_path)
# read the warc
fh = warctools.ArchiveRecord.open_archive(warc_path)
record_iter = fh.read_records(limit=None, offsets=True)
try:
(offset, record, errors) = next(record_iter)
assert record.type == b'warcinfo'
# url1 bucket_1
(offset, record, errors) = next(record_iter)
assert record.type == b'response'
assert record.url == url1.encode('ascii')
# check for duplicate warc record headers
assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# that's all folks
assert next(record_iter)[1] == None
assert next(record_iter, None) == None
finally:
fh.close()
def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies): def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls urls_before = warcprox_.proxy.running_stats.urls
revisits_before = warcprox_.proxy.stats_db.value( revisits_before = warcprox_.proxy.stats_db.value(
@ -928,7 +993,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, i) http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({ headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets", "warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})} "dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit( pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False, requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers) headers=headers)
@ -944,7 +1009,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, -i - 1) http_daemon.server_port, -i - 1)
headers = {"Warcprox-Meta": json.dumps({ headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets", "warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})} "dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit( pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False, requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers) headers=headers)
@ -959,7 +1024,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, i) http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({ headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets", "warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})} "dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit( pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False, requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers) headers=headers)
@ -1500,7 +1565,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None assert dedup_lookup is None
# archive with dedup_ok:False # archive with dedup_ok:False
request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False} request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''},'dedup-ok':False}
headers = {'Warcprox-Meta': json.dumps(request_meta)} headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get( response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False) url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1518,7 +1583,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None assert dedup_lookup is None
# archive without dedup_ok:False # archive without dedup_ok:False
request_meta = {'dedup-bucket':'test_dedup_ok_flag'} request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''}}
headers = {'Warcprox-Meta': json.dumps(request_meta)} headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get( response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False) url, proxies=archiving_proxies, headers=headers, verify=False)

View File

@ -157,8 +157,11 @@ class RethinkCaptures:
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and "dedup-bucket" in recorded_url.warcprox_meta): and "dedup-buckets" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["dedup-bucket"] for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
# maybe this is the right thing to do here? or should we return an entry for each? or ?
break
else: else:
bucket = "__unspecified__" bucket = "__unspecified__"

View File

@ -47,11 +47,11 @@ class DedupableMixin(object):
def should_dedup(self, recorded_url): def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload """Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options. size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-bucket` is required When we use option --dedup-only-with-bucket, `dedup-buckets` is required
in Warcprox-Meta to perform dedup. in Warcprox-Meta to perform dedup.
Return Boolean. Return Boolean.
""" """
if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta:
return False return False
if recorded_url.is_text(): if recorded_url.is_text():
return recorded_url.response_recorder.payload_size() > self.min_text_size return recorded_url.response_recorder.payload_size() > self.min_text_size
@ -69,10 +69,13 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
and recorded_url.payload_digest and recorded_url.payload_digest
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
recorded_url.dedup_info = self.dedup_db.lookup( for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
digest_key, recorded_url.warcprox_meta["dedup-bucket"], recorded_url.dedup_info = self.dedup_db.lookup(
recorded_url.url) digest_key, bucket, recorded_url.url)
if recorded_url.dedup_info:
# we found an existing capture
break
else: else:
recorded_url.dedup_info = self.dedup_db.lookup( recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, url=recorded_url.url) digest_key, url=recorded_url.url)
@ -148,10 +151,12 @@ class DedupDb(DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
self.save( for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
digest_key, records[0], if not bucket_mode == "ro":
bucket=recorded_url.warcprox_meta["dedup-bucket"]) self.save(
digest_key, records[0],
bucket=bucket)
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
@ -213,8 +218,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
self.save(digest_key, records[0], bucket=bucket)
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
@ -347,11 +354,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.warc_records[0].type == b'response' and recorded_url.warc_records[0].type == b'response'
and self.trough_dedup_db.should_dedup(recorded_url)): and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta): and 'dedup-buckets' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket'] for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
buckets[bucket].append(recorded_url)
else: else:
bucket = '__unspecified__' buckets['__unspecified__'].append(recorded_url)
buckets[bucket].append(recorded_url)
return buckets return buckets
def _process_batch(self, batch): def _process_batch(self, batch):
@ -399,11 +407,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.payload_digest and recorded_url.payload_digest
and self.trough_dedup_db.should_dedup(recorded_url)): and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta): and 'dedup-buckets' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket'] for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
buckets[bucket].append(recorded_url)
else: else:
bucket = '__unspecified__' buckets['__unspecified__'].append(recorded_url)
buckets[bucket].append(recorded_url)
else: else:
discards.append( discards.append(
warcprox.digest_str( warcprox.digest_str(
@ -576,9 +584,11 @@ class TroughDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta:
self.save( for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
digest_key, records[0], if not bucket_mode == 'ro':
bucket=recorded_url.warcprox_meta['dedup-bucket']) self.save(
digest_key, records[0],
bucket=bucket)
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])

View File

@ -377,8 +377,14 @@ class RecordedUrl:
if warcprox_meta: if warcprox_meta:
if 'captures-bucket' in warcprox_meta: if 'captures-bucket' in warcprox_meta:
# backward compatibility # backward compatibility
warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw'
del warcprox_meta['captures-bucket'] del warcprox_meta['captures-bucket']
if 'dedup-bucket' in warcprox_meta:
# more backwards compatibility
warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['dedup-bucket']] = 'rw'
del warcprox_meta['dedup-bucket']
self.warcprox_meta = warcprox_meta self.warcprox_meta = warcprox_meta
else: else:
self.warcprox_meta = {} self.warcprox_meta = {}