Merge pull request #90 from vbanos/dedup-bucket

Require dedup-bucket in Warcprox-Meta to perform dedup
This commit is contained in:
Noah Levitt 2018-05-08 11:06:32 -07:00 committed by GitHub
commit 5fa1f8f61c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 21 deletions

View File

@ -745,7 +745,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
# archive url1 bucket_a # archive url1 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
@ -771,7 +771,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert dedup_lookup is None assert dedup_lookup is None
# archive url2 bucket_b # archive url2 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
@ -790,7 +790,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
dedup_date = dedup_lookup['date'] dedup_date = dedup_lookup['date']
# archive url2 bucket_a # archive url2 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
@ -800,7 +800,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
# archive url1 bucket_b # archive url1 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
@ -1371,7 +1371,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None assert dedup_lookup is None
# archive with dedup_ok:False # archive with dedup_ok:False
request_meta = {'captures-bucket':'test_dedup_ok_flag','dedup-ok':False} request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False}
headers = {'Warcprox-Meta': json.dumps(request_meta)} headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get( response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False) url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1389,7 +1389,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None assert dedup_lookup is None
# archive without dedup_ok:False # archive without dedup_ok:False
request_meta = {'captures-bucket':'test_dedup_ok_flag'} request_meta = {'dedup-bucket':'test_dedup_ok_flag'}
headers = {'Warcprox-Meta': json.dumps(request_meta)} headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get( response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False) url, proxies=archiving_proxies, headers=headers, verify=False)

View File

@ -157,8 +157,8 @@ class RethinkCaptures:
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and "captures-bucket" in recorded_url.warcprox_meta): and "dedup-bucket" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["captures-bucket"] bucket = recorded_url.warcprox_meta["dedup-bucket"]
else: else:
bucket = "__unspecified__" bucket = "__unspecified__"

View File

@ -41,11 +41,17 @@ class DedupableMixin(object):
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
self.min_text_size = options.dedup_min_text_size self.min_text_size = options.dedup_min_text_size
self.min_binary_size = options.dedup_min_binary_size self.min_binary_size = options.dedup_min_binary_size
self.dedup_only_with_bucket = options.dedup_only_with_bucket
def should_dedup(self, recorded_url): def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload """Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options. Return Boolean. size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-bucket` is required
in Warcprox-Meta to perform dedup.
Return Boolean.
""" """
if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta:
return False
if recorded_url.is_text(): if recorded_url.is_text():
return recorded_url.response_recorder.payload_size() > self.min_text_size return recorded_url.response_recorder.payload_size() > self.min_text_size
else: else:
@ -62,9 +68,9 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
and recorded_url.payload_digest and recorded_url.payload_digest
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = self.dedup_db.lookup( recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["captures-bucket"], digest_key, recorded_url.warcprox_meta["dedup-bucket"],
recorded_url.url) recorded_url.url)
else: else:
recorded_url.dedup_info = self.dedup_db.lookup( recorded_url.dedup_info = self.dedup_db.lookup(
@ -141,10 +147,10 @@ class DedupDb(DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save( self.save(
digest_key, records[0], digest_key, records[0],
bucket=recorded_url.warcprox_meta["captures-bucket"]) bucket=recorded_url.warcprox_meta["dedup-bucket"])
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
@ -206,8 +212,8 @@ class RethinkDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"])
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
@ -337,8 +343,8 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
and recorded_url.warc_records[0].type == b'response' and recorded_url.warc_records[0].type == b'response'
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta): and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket'] bucket = recorded_url.warcprox_meta['dedup-bucket']
else: else:
bucket = '__unspecified__' bucket = '__unspecified__'
buckets[bucket].append(recorded_url) buckets[bucket].append(recorded_url)
@ -387,8 +393,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
and recorded_url.payload_digest and recorded_url.payload_digest
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta): and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket'] bucket = recorded_url.warcprox_meta['dedup-bucket']
else: else:
bucket = '__unspecified__' bucket = '__unspecified__'
buckets[bucket].append(recorded_url) buckets[bucket].append(recorded_url)
@ -538,9 +544,9 @@ class TroughDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta:
self.save( self.save(
digest_key, records[0], digest_key, records[0],
bucket=recorded_url.warcprox_meta['captures-bucket']) bucket=recorded_url.warcprox_meta['dedup-bucket'])
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])

View File

@ -154,6 +154,10 @@ def _build_arg_parser(prog='warcprox'):
arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size',
type=int, default=0, help=( type=int, default=0, help=(
'try to dedup binary resources with payload size over this limit in bytes')) 'try to dedup binary resources with payload size over this limit in bytes'))
# optionally, dedup request only when `dedup-bucket` is available in
# Warcprox-Meta HTTP header. By default, we dedup all requests.
arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket',
action='store_true', default=False, help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int, arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS) default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int, arg_parser.add_argument('--max-threads', dest='max_threads', type=int,