From bd23e37dc027ceeee9b09038623170a8020d67df Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 19:27:15 +0000 Subject: [PATCH 1/5] Stop using WarcRecord.REFERS_TO header and use payload_digest instead Stop adding WarcRecord.REFERS_TO when building WARC record. Methods ``warc.WarcRecordBuilder._build_response_principal_record`` and ``warc.WarcRecordBuilder.build_warc_record``. Replace ``record_id`` (WarcRecord.REFERS_TO) with payload_digest in ``playback``. Playback database has ``{'f': warcfile, 'o': offset, 'd': payload_digest}`` instead of ``'i': record_id``. Make all ``dedup`` classes return only `url` and `date`. Drop `id`. --- warcprox/bigtable.py | 2 -- warcprox/dedup.py | 7 ++----- warcprox/playback.py | 39 ++++++++++++++++++++------------------- warcprox/warc.py | 7 ++----- 4 files changed, 24 insertions(+), 31 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..c3a9bd8 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -233,8 +233,6 @@ class RethinkCapturesDedup: "url": entry["url"].encode("utf-8"), "date": entry["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8"), } - if "warc_id" in entry: - dedup_info["id"] = entry["warc_id"].encode("utf-8") return dedup_info else: return None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..79be80f 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -55,13 +55,12 @@ class DedupDb(object): conn.close() def save(self, digest_key, response_record, bucket=""): - record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') key = digest_key.decode('utf-8') + "|" + bucket - py_value = {'id':record_id, 'url':url, 'date':date} + py_value = {'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) conn = sqlite3.connect(self.file) @@ -81,7 +80,6 @@ class DedupDb(object): conn.close() if result_tuple: result = json.loads(result_tuple[0]) - result['id'] = result['id'].encode('latin1') result['url'] = result['url'].encode('latin1') result['date'] = result['date'].encode('latin1') self.logger.debug('dedup db lookup of key=%s returning %s', key, result) @@ -144,10 +142,9 @@ class RethinkDedupDb: def save(self, digest_key, response_record, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) - record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record = {'key':k,'url':url,'date':date,'id':record_id} + record = {'key': k, 'url': url, 'date': date} result = self.rr.table(self.table).insert( record, conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: diff --git a/warcprox/playback.py b/warcprox/playback.py index 663e10a..af4639f 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler): def _send_headers_and_refd_payload( - self, headers, refers_to, refers_to_target_uri, refers_to_date): + self, headers, refers_to_target_uri, refers_to_date, payload_digest): + """Parameters: + + """ location = self.server.playback_index_db.lookup_exact( - refers_to_target_uri, refers_to_date, record_id=refers_to) + refers_to_target_uri, refers_to_date, payload_digest) self.logger.debug('loading http payload from {}'.format(location)) fh = self._open_warc_at_offset(location['f'], location['o']) @@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler): if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: raise Exception('unknown revisit record profile {}'.format(warc_profile)) - refers_to = record.get_header( - warctools.WarcRecord.REFERS_TO).decode('latin1') refers_to_target_uri = record.get_header( warctools.WarcRecord.REFERS_TO_TARGET_URI).decode( 'latin1') refers_to_date = record.get_header( warctools.WarcRecord.REFERS_TO_DATE).decode('latin1') - + payload_digest = record.get_header( + warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') self.logger.debug( 'revisit record references %s:%s capture of %s', - refers_to_date, refers_to, refers_to_target_uri) + refers_to_date, payload_digest, refers_to_target_uri) return self._send_headers_and_refd_payload( - record.content[1], refers_to, refers_to_target_uri, - refers_to_date) + record.content[1], refers_to_target_uri, refers_to_date, + payload_digest) else: # send it back raw, whatever it is @@ -264,12 +266,12 @@ class PlaybackIndexDb(object): # XXX canonicalize url? url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') # there could be two visits of same url in the same second, and WARC-Date is # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ - # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} + # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...} with self._lock: conn = sqlite3.connect(self.file) @@ -283,10 +285,10 @@ class PlaybackIndexDb(object): if date_str in py_value: py_value[date_str].append( - {'f':warcfile, 'o':offset, 'i':record_id_str}) + {'f': warcfile, 'o': offset, 'd': payload_digest_str}) else: py_value[date_str] = [ - {'f':warcfile, 'o':offset, 'i':record_id_str}] + {'f': warcfile, 'o': offset, 'd': payload_digest_str}] json_value = json.dumps(py_value, separators=(',',':')) @@ -314,11 +316,11 @@ class PlaybackIndexDb(object): latest_date = max(py_value) result = py_value[latest_date][0] - result['i'] = result['i'].encode('ascii') + result['d'] = result['d'].encode('ascii') return latest_date, result # in python3 params are bytes - def lookup_exact(self, url, warc_date, record_id): + def lookup_exact(self, url, warc_date, payload_digest): conn = sqlite3.connect(self.file) cursor = conn.execute( 'select value from playback where url = ?', (url,)) @@ -334,14 +336,13 @@ class PlaybackIndexDb(object): if warc_date in py_value: for record in py_value[warc_date]: - if record['i'] == record_id: + if record['d'] == payload_digest: self.logger.debug( "found exact match for (%r,%r,%r)", - warc_date, record_id, url) - record['i'] = record['i'] + warc_date, payload_digest, url) + record['d'] = record['d'] return record else: self.logger.info( - "match not found for (%r,%r,%r)", warc_date, record_id, url) + "match not found for (%r,%r,%r)", warc_date, payload_digest, url) return None - diff --git a/warcprox/warc.py b/warcprox/warc.py index 51b1c35..53e049f 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -50,7 +50,6 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, - refers_to=recorded_url.dedup_info['id'], refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), @@ -87,8 +86,8 @@ class WarcRecordBuilder: def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, - profile=None, refers_to=None, refers_to_target_uri=None, - refers_to_date=None, payload_digest=None): + profile=None, refers_to_target_uri=None, refers_to_date=None, + payload_digest=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow()) @@ -105,8 +104,6 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if profile is not None: headers.append((warctools.WarcRecord.PROFILE, profile)) - if refers_to is not None: - headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) if refers_to_target_uri is not None: headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) if refers_to_date is not None: From ad8ba43c3d49fd2450dd4fd22d56d1962b8d0c57 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 20:38:04 +0000 Subject: [PATCH 2/5] Update unit test --- tests/test_warcprox.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b24a5c8..26e3d3f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -406,13 +406,11 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) time.sleep(0.5) # check in dedup db - # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -435,7 +433,6 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') - assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -479,13 +476,11 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie time.sleep(0.5) # check in dedup db - # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -508,7 +503,6 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') - assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -576,9 +570,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b @@ -603,9 +595,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # archive url2 bucket_a From 424f236126e3078f8b92569fe19d9ebfc99a31f2 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 22:04:56 +0000 Subject: [PATCH 3/5] Revert warc to previous behavior If record_id is available, write it to REFERS_TO header. --- warcprox/warc.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/warcprox/warc.py b/warcprox/warc.py index 53e049f..de0ec06 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -50,6 +50,7 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, + refers_to=recorded_url.dedup_info.get('id'), refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), @@ -86,8 +87,8 @@ class WarcRecordBuilder: def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, - profile=None, refers_to_target_uri=None, refers_to_date=None, - payload_digest=None): + profile=None, refers_to=None, refers_to_target_uri=None, + refers_to_date=None, payload_digest=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow()) @@ -104,6 +105,8 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if profile is not None: headers.append((warctools.WarcRecord.PROFILE, profile)) + if refers_to is not None: + headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) if refers_to_target_uri is not None: headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) if refers_to_date is not None: From 97e52b8f7b4c64b32371788219f30d59265c0647 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 16 Oct 2017 02:28:09 +0000 Subject: [PATCH 4/5] Revert changes to bigtable and dedup --- warcprox/bigtable.py | 2 ++ warcprox/dedup.py | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index c3a9bd8..387d05c 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -233,6 +233,8 @@ class RethinkCapturesDedup: "url": entry["url"].encode("utf-8"), "date": entry["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8"), } + if "warc_id" in entry: + dedup_info["id"] = entry["warc_id"].encode("utf-8") return dedup_info else: return None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 79be80f..fd1ada4 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -55,12 +55,13 @@ class DedupDb(object): conn.close() def save(self, digest_key, response_record, bucket=""): + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') key = digest_key.decode('utf-8') + "|" + bucket - py_value = {'url':url, 'date':date} + py_value = {'id':record_id, 'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) conn = sqlite3.connect(self.file) @@ -80,6 +81,7 @@ class DedupDb(object): conn.close() if result_tuple: result = json.loads(result_tuple[0]) + result['id'] = result['id'].encode('latin1') result['url'] = result['url'].encode('latin1') result['date'] = result['date'].encode('latin1') self.logger.debug('dedup db lookup of key=%s returning %s', key, result) @@ -142,9 +144,10 @@ class RethinkDedupDb: def save(self, digest_key, response_record, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record = {'key': k, 'url': url, 'date': date} + record = {'key':k,'url':url,'date':date,'id':record_id} result = self.rr.table(self.table).insert( record, conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: From 9ce3132510361c0790dc070a0f8eaa402da24db1 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 16 Oct 2017 02:41:43 +0000 Subject: [PATCH 5/5] Revert changes to test_warcprox.py --- tests/test_warcprox.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 26e3d3f..b24a5c8 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -406,11 +406,13 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) time.sleep(0.5) # check in dedup db - # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -433,6 +435,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') + assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -476,11 +479,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie time.sleep(0.5) # check in dedup db - # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -503,6 +508,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') + assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -570,7 +576,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b @@ -595,7 +603,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # archive url2 bucket_a