From d32bf743bd1de9ababeb465b45f08d86234d5bcd Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 9 Apr 2018 15:52:44 +0000 Subject: [PATCH 1/6] Configurable min dedupable size for text/binary resources New `--dedup-min-text-size` and `--dedup-min-binary-size` cli options with default value = `0`. New `DedupableMixin` which can be used in any dedup class. It is currently used only in CDX dedup. Instead of checking `payload_size() > 0`, we now use `.is_dedupable(recorded_url)` New utility method `RecordedUrl.is_text`. --- warcprox/dedup.py | 19 +++++++++++++++++-- warcprox/main.py | 6 ++++++ warcprox/warcproxy.py | 12 ++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5db8e34..7e19150 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -37,6 +37,20 @@ from concurrent import futures urllib3.disable_warnings() +class DedupableMixin(object): + def __init__(self, options=warcprox.Options()): + self.min_text_size = options.dedup_min_text_size + self.min_binary_size = options.dedup_min_binary_size + + def is_dedupable(self, recorded_url): + """Check if we should try to run dedup on resource based on payload + size compared with min text/binary dedup size options. Return Boolean. + """ + if recorded_url.is_text(): + return recorded_url.response_recorder.payload_size() > self.min_text_size + else: + return recorded_url.response_recorder.payload_size() > self.min_binary_size + class DedupLoader(warcprox.BaseStandardPostfetchProcessor): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -273,9 +287,10 @@ class CdxServerDedup(DedupDb): """ pass -class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): +class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.pool = futures.ThreadPoolExecutor(max_workers=400) self.batch = set() self.cdx_dedup = cdx_dedup @@ -284,7 +299,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url = self.inq.get(block=True, timeout=0.5) if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.is_dedupable(recorded_url)): self.batch.add(recorded_url) self.pool.submit(self._process_url, recorded_url) else: diff --git a/warcprox/main.py b/warcprox/main.py index 8ff466b..3723445 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -148,6 +148,12 @@ def _build_arg_parser(prog='warcprox'): # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', help=argparse.SUPPRESS) + arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', + type=int, default=0, + help=('try to dedup text resources with payload size over this limit in bytes')) + arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', + type=int, default=0, help=( + 'try to dedup binary resources with payload size over this limit in bytes')) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index b6a0943..2477a06 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -376,6 +376,18 @@ class RecordedUrl: self.warc_records = warc_records self.do_not_archive = do_not_archive + def is_text(self): + """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types + Alternative method: try to decode('ascii') first N bytes to make sure + its text. + """ + if self.mimetype: + return self.mimetype[:5] == "text/" or self.mimetype in ( + "application/xml", "application/javascript", "application/json", + "application/xhtml+xml", "application/typescript", + "image/svg+xml") + return False + # inherit from object so that multiple inheritance from this class works # properly in python 2 # http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639 From 9057fbdf36d425646860ea085336a3dbafd51837 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 10:29:35 +0000 Subject: [PATCH 2/6] Use DedupableMixin in all dedup classes Rename `DedupableMixin.is_dedupable` to `should_dedup`. --- warcprox/dedup.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7e19150..c75f6c3 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -42,7 +42,7 @@ class DedupableMixin(object): self.min_text_size = options.dedup_min_text_size self.min_binary_size = options.dedup_min_binary_size - def is_dedupable(self, recorded_url): + def should_dedup(self, recorded_url): """Check if we should try to run dedup on resource based on payload size compared with min text/binary dedup size options. Return Boolean. """ @@ -60,11 +60,12 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor): decorate_with_dedup_info( self.dedup_db, recorded_url, self.options.base32) -class DedupDb(object): +class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") def __init__( self, file='./warcprox.sqlite', options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.file = file self.options = options @@ -127,7 +128,7 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -150,10 +151,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): recorded_url.dedup_info = dedup_db.lookup( digest_key, url=recorded_url.url) -class RethinkDedupDb(DedupDb): +class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) @@ -204,7 +206,7 @@ class RethinkDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -299,7 +301,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) recorded_url = self.inq.get(block=True, timeout=0.5) if (recorded_url.response_recorder and recorded_url.payload_digest - and self.is_dedupable(recorded_url)): + and self.should_dedup(recorded_url)): self.batch.add(recorded_url) self.pool.submit(self._process_url, recorded_url) else: @@ -321,9 +323,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin) if self.outq: self.outq.put(recorded_url) -class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _filter_and_bucketize(self, batch): @@ -335,7 +338,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.warc_records and recorded_url.warc_records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -367,9 +370,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -384,7 +388,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -444,7 +448,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out loading dedup info from trough', exc_info=True) -class TroughDedupDb(DedupDb): +class TroughDedupDb(DedupDb, DedupableMixin): ''' https://github.com/internetarchive/trough ''' @@ -461,6 +465,7 @@ class TroughDedupDb(DedupDb): 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.options = options self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) @@ -533,7 +538,7 @@ class TroughDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: From 6dce8cc644b3b083ac6d06959d72a24f89504a4a Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 10:58:13 +0000 Subject: [PATCH 3/6] Remove method decorate_with_dedup_info Method `warcprox.dedup.decorate_with_dedup_info` is only used in `DedupLoader._process_url` and nowhere else. The problem is that `decorate_with_dedup_info` cannot get warcprox cli options. Thus we cannot pass the custom min size limits. --- warcprox/dedup.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index c75f6c3..17a4fd9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -51,14 +51,24 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size -class DedupLoader(warcprox.BaseStandardPostfetchProcessor): +class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + DedupableMixin.__init__(self, options) self.dedup_db = dedup_db def _process_url(self, recorded_url): - decorate_with_dedup_info( - self.dedup_db, recorded_url, self.options.base32) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and self.should_dedup(recorded_url)): + digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) + else: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, url=recorded_url.url) class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -138,19 +148,6 @@ class DedupDb(DedupableMixin): else: self.save(digest_key, records[0]) -def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): - if (recorded_url.response_recorder - and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url.url) - else: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, url=recorded_url.url) - class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") From 944c9a1e117e94537a4b66c89b41d47c37af684c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 12:18:20 +0000 Subject: [PATCH 4/6] Add unit tests Create two very small dummy responses (text, 2 bytes and binary, 4 bytes). Use options --dedup-min-text-size=3 and --dedup-min-binary-size=5. Ensure that due to the effects of these options, dedup is not happening. Existing dedup unit tests are not affected at all. --- tests/test_warcprox.py | 56 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 9f578a3..b3d00d6 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -191,6 +191,18 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + b'Content-Type: text/plain\r\n' + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + b'\r\n') + elif self.path == '/text-2bytes': + payload = b'aa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') + elif self.path == '/binary-4bytes': + payload = b'aaaa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: application/octet-stream\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') elif self.path.startswith('/test_payload_digest-'): content_body = ( b'Hello. How are you. I am the test_payload_digest ' @@ -394,7 +406,9 @@ def warcprox_(request, http_daemon, https_daemon): '--onion-tor-socks-proxy=localhost:9050', '--crawl-log-dir=crawl-logs', '--socket-timeout=4', - '--max-resource-size=200000'] + '--max-resource-size=200000', + '--dedup-min-text-size=3', + '--dedup-min-binary-size=5'] if request.config.getoption('--rethinkdb-dedup-url'): argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) # test these here only @@ -601,6 +615,46 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'I am the warcprox test payload! ffffffffff!\n' # XXX how to check dedup was used? +def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): + """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we + try to download content smaller than these limits to make sure that it is + not deduplicated. We create the digest_str with the following code: + ``` + payload_digest = hashlib.new('sha1') + payload_digest.update(b'aa') + warcprox.digest_str(payload_digest) + ``` + """ + url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + # This would return dedup data if payload_size > dedup-min-text-size + assert dedup_lookup is None + + url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 4 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + # This would return dedup data if payload_size > dedup-min-binary-size + assert dedup_lookup is None + # test dedup of same https url with same payload def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls From 9dac806ca1416aefb98907e4ba0f322a73b88def Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 16:31:37 +0000 Subject: [PATCH 5/6] Fix travis-ci unit test issue `test_dedup_https` fails on travis-ci. https://travis-ci.org/internetarchive/warcprox/jobs/370598950 We didn't touch that at all but worked on `test_dedup_min_size` which runs just before that. We move `test_dedup_min_size` to the end of the file hoping to resolve this. --- tests/test_warcprox.py | 81 +++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b3d00d6..8bb58ab 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -615,46 +615,6 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'I am the warcprox test payload! ffffffffff!\n' # XXX how to check dedup was used? -def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): - """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we - try to download content smaller than these limits to make sure that it is - not deduplicated. We create the digest_str with the following code: - ``` - payload_digest = hashlib.new('sha1') - payload_digest.update(b'aa') - warcprox.digest_str(payload_digest) - ``` - """ - url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - assert len(response.content) == 2 - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') - assert dedup_lookup is None - time.sleep(3) - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') - # This would return dedup data if payload_size > dedup-min-text-size - assert dedup_lookup is None - - url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - assert len(response.content) == 4 - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') - assert dedup_lookup is None - time.sleep(3) - response = requests.get( - url, proxies=archiving_proxies, verify=False, timeout=10) - dedup_lookup = warcprox_.dedup_db.lookup( - b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') - # This would return dedup data if payload_size > dedup-min-binary-size - assert dedup_lookup is None - # test dedup of same https url with same payload def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies): urls_before = warcprox_.proxy.running_stats.urls @@ -1980,6 +1940,47 @@ def test_trough_segment_promotion(warcprox_): time.sleep(3) assert promoted == [] +def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): + """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we + try to download content smaller than these limits to make sure that it is + not deduplicated. We create the digest_str with the following code: + ``` + payload_digest = hashlib.new('sha1') + payload_digest.update(b'aa') + warcprox.digest_str(payload_digest) + ``` + """ + url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + # This would return dedup data if payload_size > dedup-min-text-size + assert dedup_lookup is None + + url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 4 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + # This would return dedup data if payload_size > dedup-min-binary-size + assert dedup_lookup is None + + if __name__ == '__main__': pytest.main() From 255d359ad4090bd9d47f8fac02a24f4c8a465c00 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 24 Apr 2018 17:06:56 +0000 Subject: [PATCH 6/6] Use DedupableMixin in RethinkCapturesDedup I note that we didn't do any payload_size check at all here. --- warcprox/bigtable.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index e6674a6..cb4671e 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -34,6 +34,7 @@ import threading import datetime import doublethink import rethinkdb as r +from warcprox.dedup import DedupableMixin class RethinkCaptures: """Inserts in batches every 0.5 seconds""" @@ -215,10 +216,11 @@ class RethinkCaptures: if self._timer: self._timer.join() -class RethinkCapturesDedup(warcprox.dedup.DedupDb): +class RethinkCapturesDedup(warcprox.dedup.DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.captures_db = RethinkCaptures(options=options) self.options = options @@ -251,5 +253,6 @@ class RethinkCapturesDedup(warcprox.dedup.DedupDb): self.captures_db.close() def notify(self, recorded_url, records): - self.captures_db.notify(recorded_url, records) - + if (records and records[0].type == b'response' + and self.should_dedup(recorded_url)): + self.captures_db.notify(recorded_url, records)