diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 9f578a3..8bb58ab 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -191,6 +191,18 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): + b'Content-Type: text/plain\r\n' + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + b'\r\n') + elif self.path == '/text-2bytes': + payload = b'aa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') + elif self.path == '/binary-4bytes': + payload = b'aaaa' + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: application/octet-stream\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') elif self.path.startswith('/test_payload_digest-'): content_body = ( b'Hello. How are you. I am the test_payload_digest ' @@ -394,7 +406,9 @@ def warcprox_(request, http_daemon, https_daemon): '--onion-tor-socks-proxy=localhost:9050', '--crawl-log-dir=crawl-logs', '--socket-timeout=4', - '--max-resource-size=200000'] + '--max-resource-size=200000', + '--dedup-min-text-size=3', + '--dedup-min-binary-size=5'] if request.config.getoption('--rethinkdb-dedup-url'): argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) # test these here only @@ -1926,6 +1940,47 @@ def test_trough_segment_promotion(warcprox_): time.sleep(3) assert promoted == [] +def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies): + """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we + try to download content smaller than these limits to make sure that it is + not deduplicated. We create the digest_str with the following code: + ``` + payload_digest = hashlib.new('sha1') + payload_digest.update(b'aa') + warcprox.digest_str(payload_digest) + ``` + """ + url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 2 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37') + # This would return dedup data if payload_size > dedup-min-text-size + assert dedup_lookup is None + + url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + assert len(response.content) == 4 + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + assert dedup_lookup is None + time.sleep(3) + response = requests.get( + url, proxies=archiving_proxies, verify=False, timeout=10) + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79') + # This would return dedup data if payload_size > dedup-min-binary-size + assert dedup_lookup is None + + if __name__ == '__main__': pytest.main() diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index e6674a6..cb4671e 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -34,6 +34,7 @@ import threading import datetime import doublethink import rethinkdb as r +from warcprox.dedup import DedupableMixin class RethinkCaptures: """Inserts in batches every 0.5 seconds""" @@ -215,10 +216,11 @@ class RethinkCaptures: if self._timer: self._timer.join() -class RethinkCapturesDedup(warcprox.dedup.DedupDb): +class RethinkCapturesDedup(warcprox.dedup.DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.captures_db = RethinkCaptures(options=options) self.options = options @@ -251,5 +253,6 @@ class RethinkCapturesDedup(warcprox.dedup.DedupDb): self.captures_db.close() def notify(self, recorded_url, records): - self.captures_db.notify(recorded_url, records) - + if (records and records[0].type == b'response' + and self.should_dedup(recorded_url)): + self.captures_db.notify(recorded_url, records) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 5db8e34..17a4fd9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -37,20 +37,45 @@ from concurrent import futures urllib3.disable_warnings() -class DedupLoader(warcprox.BaseStandardPostfetchProcessor): +class DedupableMixin(object): + def __init__(self, options=warcprox.Options()): + self.min_text_size = options.dedup_min_text_size + self.min_binary_size = options.dedup_min_binary_size + + def should_dedup(self, recorded_url): + """Check if we should try to run dedup on resource based on payload + size compared with min text/binary dedup size options. Return Boolean. + """ + if recorded_url.is_text(): + return recorded_url.response_recorder.payload_size() > self.min_text_size + else: + return recorded_url.response_recorder.payload_size() > self.min_binary_size + +class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + DedupableMixin.__init__(self, options) self.dedup_db = dedup_db def _process_url(self, recorded_url): - decorate_with_dedup_info( - self.dedup_db, recorded_url, self.options.base32) + if (recorded_url.response_recorder + and recorded_url.payload_digest + and self.should_dedup(recorded_url)): + digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) + else: + recorded_url.dedup_info = self.dedup_db.lookup( + digest_key, url=recorded_url.url) -class DedupDb(object): +class DedupDb(DedupableMixin): logger = logging.getLogger("warcprox.dedup.DedupDb") def __init__( self, file='./warcprox.sqlite', options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.file = file self.options = options @@ -113,7 +138,7 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -123,23 +148,11 @@ class DedupDb(object): else: self.save(digest_key, records[0]) -def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): - if (recorded_url.response_recorder - and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): - digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) - if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url.url) - else: - recorded_url.dedup_info = dedup_db.lookup( - digest_key, url=recorded_url.url) - -class RethinkDedupDb(DedupDb): +class RethinkDedupDb(DedupDb, DedupableMixin): logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_dedup_url) self.rr = doublethink.Rethinker( servers=parsed.hosts, db=parsed.database) @@ -190,7 +203,7 @@ class RethinkDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: @@ -273,9 +286,10 @@ class CdxServerDedup(DedupDb): """ pass -class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): +class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.pool = futures.ThreadPoolExecutor(max_workers=400) self.batch = set() self.cdx_dedup = cdx_dedup @@ -284,7 +298,7 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): recorded_url = self.inq.get(block=True, timeout=0.5) if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): self.batch.add(recorded_url) self.pool.submit(self._process_url, recorded_url) else: @@ -306,9 +320,10 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): if self.outq: self.outq.put(recorded_url) -class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _filter_and_bucketize(self, batch): @@ -320,7 +335,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.warc_records and recorded_url.warc_records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -352,9 +367,10 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin): def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + DedupableMixin.__init__(self, options) self.trough_dedup_db = trough_dedup_db def _startup(self): @@ -369,7 +385,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for recorded_url in batch: if (recorded_url.response_recorder and recorded_url.payload_digest - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): if (recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta): bucket = recorded_url.warcprox_meta['captures-bucket'] @@ -429,7 +445,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): logging.warn( 'timed out loading dedup info from trough', exc_info=True) -class TroughDedupDb(DedupDb): +class TroughDedupDb(DedupDb, DedupableMixin): ''' https://github.com/internetarchive/trough ''' @@ -446,6 +462,7 @@ class TroughDedupDb(DedupDb): 'values (%s, %s, %s, %s);') def __init__(self, options=warcprox.Options()): + DedupableMixin.__init__(self, options) self.options = options self._trough_cli = warcprox.trough.TroughClient( options.rethinkdb_trough_db_url, promotion_interval=60*60) @@ -518,7 +535,7 @@ class TroughDedupDb(DedupDb): def notify(self, recorded_url, records): if (records and records[0].type == b'response' - and recorded_url.response_recorder.payload_size() > 0): + and self.should_dedup(recorded_url)): digest_key = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta: diff --git a/warcprox/main.py b/warcprox/main.py index 8ff466b..3723445 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -148,6 +148,12 @@ def _build_arg_parser(prog='warcprox'): # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', help=argparse.SUPPRESS) + arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', + type=int, default=0, + help=('try to dedup text resources with payload size over this limit in bytes')) + arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', + type=int, default=0, help=( + 'try to dedup binary resources with payload size over this limit in bytes')) arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index b6a0943..2477a06 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -376,6 +376,18 @@ class RecordedUrl: self.warc_records = warc_records self.do_not_archive = do_not_archive + def is_text(self): + """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types + Alternative method: try to decode('ascii') first N bytes to make sure + its text. + """ + if self.mimetype: + return self.mimetype[:5] == "text/" or self.mimetype in ( + "application/xml", "application/javascript", "application/json", + "application/xhtml+xml", "application/typescript", + "image/svg+xml") + return False + # inherit from object so that multiple inheritance from this class works # properly in python 2 # http://stackoverflow.com/questions/1713038/super-fails-with-error-typeerror-argument-1-must-be-type-not-classobj#18392639