diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 9cd09a8..9fe3a74 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,7 +1,7 @@ """ warcprox/__init__.py - warcprox package main file, contains some utility code -Copyright (C) 2013-2019 Internet Archive +Copyright (C) 2013-2021 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -175,8 +175,8 @@ class BaseStandardPostfetchProcessor(BasePostfetchProcessor): class BaseBatchPostfetchProcessor(BasePostfetchProcessor): MAX_BATCH_SIZE = 500 - MAX_BATCH_SEC = 10 - MIN_BATCH_SEC = 2.0 + MAX_BATCH_SEC = 30 + MIN_BATCH_SEC = 10 def _get_process_put(self): batch = [] diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 77bc7c6..43286d7 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,7 +1,7 @@ ''' warcprox/dedup.py - identical payload digest deduplication using sqlite db -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2021 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -384,6 +384,9 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): self.trough_dedup_db.batch_save, buckets[bucket], bucket) fs[future] = bucket + logging.debug( + 'storing dedup info for %s urls ' + 'in bucket %s', len(buckets[bucket]), bucket) # wait for results try: @@ -412,10 +415,19 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): ''' buckets = collections.defaultdict(list) discards = [] + # for duplicate checks, see https://webarchive.jira.com/browse/WT-31 + hash_plus_urls = set() for recorded_url in batch: + if not recorded_url.payload_digest: + discards.append('n/a') + continue + payload_hash = warcprox.digest_str( + recorded_url.payload_digest, self.options.base32) + hash_plus_url = b''.join((payload_hash, recorded_url.url)) if (recorded_url.response_recorder - and recorded_url.payload_digest + and hash_plus_url not in hash_plus_urls and self.trough_dedup_db.should_dedup(recorded_url)): + hash_plus_urls.add(hash_plus_url) if (recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta): for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items(): @@ -423,10 +435,12 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): else: buckets['__unspecified__'].append(recorded_url) else: - discards.append( - warcprox.digest_str( - recorded_url.payload_digest, self.options.base32) - if recorded_url.payload_digest else 'n/a') + if hash_plus_url in hash_plus_urls: + self.logger.debug( + 'discarding duplicate and setting do_not_archive for %, hash %'.format( + recorded_url.url, payload_hash)) + recorded_url.do_not_archive = True + discards.append(payload_hash) self.logger.debug( 'len(batch)=%s len(discards)=%s buckets=%s', len(batch), len(discards),