WIP (untested): handle multiple dedup-buckets, rw or ro

This commit is contained in:
Barbara Miller 2019-05-30 19:27:46 -07:00
parent 8c31ec2916
commit 957bd079e8
3 changed files with 41 additions and 27 deletions

View File

@ -157,8 +157,11 @@ class RethinkCaptures:
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8") sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and "dedup-bucket" in recorded_url.warcprox_meta): and "dedup-buckets" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["dedup-bucket"] for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
# maybe this is the right thing to do here? or should we return an entry for each? or ?
break
else: else:
bucket = "__unspecified__" bucket = "__unspecified__"

View File

@ -47,11 +47,11 @@ class DedupableMixin(object):
def should_dedup(self, recorded_url): def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload """Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options. size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-bucket` is required When we use option --dedup-only-with-bucket, `dedup-buckets` is required
in Warcprox-Meta to perform dedup. in Warcprox-Meta to perform dedup.
Return Boolean. Return Boolean.
""" """
if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta: if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta:
return False return False
if recorded_url.is_text(): if recorded_url.is_text():
return recorded_url.response_recorder.payload_size() > self.min_text_size return recorded_url.response_recorder.payload_size() > self.min_text_size
@ -69,10 +69,13 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
and recorded_url.payload_digest and recorded_url.payload_digest
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
recorded_url.dedup_info = self.dedup_db.lookup( for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
digest_key, recorded_url.warcprox_meta["dedup-bucket"], recorded_url.dedup_info = self.dedup_db.lookup(
recorded_url.url) digest_key, bucket, recorded_url.url)
if recorded_url.dedup_info:
# we found an existing capture
break
else: else:
recorded_url.dedup_info = self.dedup_db.lookup( recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, url=recorded_url.url) digest_key, url=recorded_url.url)
@ -148,10 +151,12 @@ class DedupDb(DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
self.save( for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
digest_key, records[0], if not bucket_mode == "ro":
bucket=recorded_url.warcprox_meta["dedup-bucket"]) self.save(
digest_key, records[0],
bucket=bucket)
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
@ -213,8 +218,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"]) for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
self.save(digest_key, records[0], bucket=bucket)
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
@ -347,11 +354,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.warc_records[0].type == b'response' and recorded_url.warc_records[0].type == b'response'
and self.trough_dedup_db.should_dedup(recorded_url)): and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta): and 'dedup-buckets' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket'] for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
buckets[bucket].append(recorded_url)
else: else:
bucket = '__unspecified__' buckets['__unspecified__'].append(recorded_url)
buckets[bucket].append(recorded_url)
return buckets return buckets
def _process_batch(self, batch): def _process_batch(self, batch):
@ -399,11 +407,11 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.payload_digest and recorded_url.payload_digest
and self.trough_dedup_db.should_dedup(recorded_url)): and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta): and 'dedup-buckets' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket'] for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
buckets[bucket].append(recorded_url)
else: else:
bucket = '__unspecified__' buckets['__unspecified__'].append(recorded_url)
buckets[bucket].append(recorded_url)
else: else:
discards.append( discards.append(
warcprox.digest_str( warcprox.digest_str(
@ -576,9 +584,11 @@ class TroughDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str( digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32) recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta:
self.save( for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
digest_key, records[0], if not bucket_mode == 'ro':
bucket=recorded_url.warcprox_meta['dedup-bucket']) self.save(
digest_key, records[0],
bucket=bucket)
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])

View File

@ -377,7 +377,8 @@ class RecordedUrl:
if warcprox_meta: if warcprox_meta:
if 'captures-bucket' in warcprox_meta: if 'captures-bucket' in warcprox_meta:
# backward compatibility # backward compatibility
warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket'] warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw'
del warcprox_meta['captures-bucket'] del warcprox_meta['captures-bucket']
self.warcprox_meta = warcprox_meta self.warcprox_meta = warcprox_meta
else: else: