mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Stop using WarcRecord.REFERS_TO header and use payload_digest instead
Stop adding WarcRecord.REFERS_TO when building WARC record. Methods ``warc.WarcRecordBuilder._build_response_principal_record`` and ``warc.WarcRecordBuilder.build_warc_record``. Replace ``record_id`` (WarcRecord.REFERS_TO) with payload_digest in ``playback``. Playback database has ``{'f': warcfile, 'o': offset, 'd': payload_digest}`` instead of ``'i': record_id``. Make all ``dedup`` classes return only `url` and `date`. Drop `id`.
This commit is contained in:
parent
9b8043d3a2
commit
bd23e37dc0
@ -233,8 +233,6 @@ class RethinkCapturesDedup:
|
|||||||
"url": entry["url"].encode("utf-8"),
|
"url": entry["url"].encode("utf-8"),
|
||||||
"date": entry["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8"),
|
"date": entry["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8"),
|
||||||
}
|
}
|
||||||
if "warc_id" in entry:
|
|
||||||
dedup_info["id"] = entry["warc_id"].encode("utf-8")
|
|
||||||
return dedup_info
|
return dedup_info
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
@ -55,13 +55,12 @@ class DedupDb(object):
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def save(self, digest_key, response_record, bucket=""):
|
def save(self, digest_key, response_record, bucket=""):
|
||||||
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||||
|
|
||||||
key = digest_key.decode('utf-8') + "|" + bucket
|
key = digest_key.decode('utf-8') + "|" + bucket
|
||||||
|
|
||||||
py_value = {'id':record_id, 'url':url, 'date':date}
|
py_value = {'url':url, 'date':date}
|
||||||
json_value = json.dumps(py_value, separators=(',',':'))
|
json_value = json.dumps(py_value, separators=(',',':'))
|
||||||
|
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
@ -81,7 +80,6 @@ class DedupDb(object):
|
|||||||
conn.close()
|
conn.close()
|
||||||
if result_tuple:
|
if result_tuple:
|
||||||
result = json.loads(result_tuple[0])
|
result = json.loads(result_tuple[0])
|
||||||
result['id'] = result['id'].encode('latin1')
|
|
||||||
result['url'] = result['url'].encode('latin1')
|
result['url'] = result['url'].encode('latin1')
|
||||||
result['date'] = result['date'].encode('latin1')
|
result['date'] = result['date'].encode('latin1')
|
||||||
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
|
||||||
@ -144,10 +142,9 @@ class RethinkDedupDb:
|
|||||||
def save(self, digest_key, response_record, bucket=""):
|
def save(self, digest_key, response_record, bucket=""):
|
||||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||||
k = "{}|{}".format(k, bucket)
|
k = "{}|{}".format(k, bucket)
|
||||||
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||||
record = {'key':k,'url':url,'date':date,'id':record_id}
|
record = {'key': k, 'url': url, 'date': date}
|
||||||
result = self.rr.table(self.table).insert(
|
result = self.rr.table(self.table).insert(
|
||||||
record, conflict="replace").run()
|
record, conflict="replace").run()
|
||||||
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||||
|
@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler):
|
|||||||
|
|
||||||
|
|
||||||
def _send_headers_and_refd_payload(
|
def _send_headers_and_refd_payload(
|
||||||
self, headers, refers_to, refers_to_target_uri, refers_to_date):
|
self, headers, refers_to_target_uri, refers_to_date, payload_digest):
|
||||||
|
"""Parameters:
|
||||||
|
|
||||||
|
"""
|
||||||
location = self.server.playback_index_db.lookup_exact(
|
location = self.server.playback_index_db.lookup_exact(
|
||||||
refers_to_target_uri, refers_to_date, record_id=refers_to)
|
refers_to_target_uri, refers_to_date, payload_digest)
|
||||||
self.logger.debug('loading http payload from {}'.format(location))
|
self.logger.debug('loading http payload from {}'.format(location))
|
||||||
|
|
||||||
fh = self._open_warc_at_offset(location['f'], location['o'])
|
fh = self._open_warc_at_offset(location['f'], location['o'])
|
||||||
@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler):
|
|||||||
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
|
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
|
||||||
raise Exception('unknown revisit record profile {}'.format(warc_profile))
|
raise Exception('unknown revisit record profile {}'.format(warc_profile))
|
||||||
|
|
||||||
refers_to = record.get_header(
|
|
||||||
warctools.WarcRecord.REFERS_TO).decode('latin1')
|
|
||||||
refers_to_target_uri = record.get_header(
|
refers_to_target_uri = record.get_header(
|
||||||
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
|
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
|
||||||
'latin1')
|
'latin1')
|
||||||
refers_to_date = record.get_header(
|
refers_to_date = record.get_header(
|
||||||
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
|
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
|
||||||
|
payload_digest = record.get_header(
|
||||||
|
warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'revisit record references %s:%s capture of %s',
|
'revisit record references %s:%s capture of %s',
|
||||||
refers_to_date, refers_to, refers_to_target_uri)
|
refers_to_date, payload_digest, refers_to_target_uri)
|
||||||
return self._send_headers_and_refd_payload(
|
return self._send_headers_and_refd_payload(
|
||||||
record.content[1], refers_to, refers_to_target_uri,
|
record.content[1], refers_to_target_uri, refers_to_date,
|
||||||
refers_to_date)
|
payload_digest)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# send it back raw, whatever it is
|
# send it back raw, whatever it is
|
||||||
@ -264,12 +266,12 @@ class PlaybackIndexDb(object):
|
|||||||
# XXX canonicalize url?
|
# XXX canonicalize url?
|
||||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||||
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||||
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
|
payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
|
||||||
|
|
||||||
# there could be two visits of same url in the same second, and WARC-Date is
|
# there could be two visits of same url in the same second, and WARC-Date is
|
||||||
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
|
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
|
||||||
|
|
||||||
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
|
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...}
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
@ -283,10 +285,10 @@ class PlaybackIndexDb(object):
|
|||||||
|
|
||||||
if date_str in py_value:
|
if date_str in py_value:
|
||||||
py_value[date_str].append(
|
py_value[date_str].append(
|
||||||
{'f':warcfile, 'o':offset, 'i':record_id_str})
|
{'f': warcfile, 'o': offset, 'd': payload_digest_str})
|
||||||
else:
|
else:
|
||||||
py_value[date_str] = [
|
py_value[date_str] = [
|
||||||
{'f':warcfile, 'o':offset, 'i':record_id_str}]
|
{'f': warcfile, 'o': offset, 'd': payload_digest_str}]
|
||||||
|
|
||||||
json_value = json.dumps(py_value, separators=(',',':'))
|
json_value = json.dumps(py_value, separators=(',',':'))
|
||||||
|
|
||||||
@ -314,11 +316,11 @@ class PlaybackIndexDb(object):
|
|||||||
|
|
||||||
latest_date = max(py_value)
|
latest_date = max(py_value)
|
||||||
result = py_value[latest_date][0]
|
result = py_value[latest_date][0]
|
||||||
result['i'] = result['i'].encode('ascii')
|
result['d'] = result['d'].encode('ascii')
|
||||||
return latest_date, result
|
return latest_date, result
|
||||||
|
|
||||||
# in python3 params are bytes
|
# in python3 params are bytes
|
||||||
def lookup_exact(self, url, warc_date, record_id):
|
def lookup_exact(self, url, warc_date, payload_digest):
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
cursor = conn.execute(
|
cursor = conn.execute(
|
||||||
'select value from playback where url = ?', (url,))
|
'select value from playback where url = ?', (url,))
|
||||||
@ -334,14 +336,13 @@ class PlaybackIndexDb(object):
|
|||||||
|
|
||||||
if warc_date in py_value:
|
if warc_date in py_value:
|
||||||
for record in py_value[warc_date]:
|
for record in py_value[warc_date]:
|
||||||
if record['i'] == record_id:
|
if record['d'] == payload_digest:
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"found exact match for (%r,%r,%r)",
|
"found exact match for (%r,%r,%r)",
|
||||||
warc_date, record_id, url)
|
warc_date, payload_digest, url)
|
||||||
record['i'] = record['i']
|
record['d'] = record['d']
|
||||||
return record
|
return record
|
||||||
else:
|
else:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"match not found for (%r,%r,%r)", warc_date, record_id, url)
|
"match not found for (%r,%r,%r)", warc_date, payload_digest, url)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -50,7 +50,6 @@ class WarcRecordBuilder:
|
|||||||
url=recorded_url.url, warc_date=warc_date,
|
url=recorded_url.url, warc_date=warc_date,
|
||||||
data=response_header_block,
|
data=response_header_block,
|
||||||
warc_type=warctools.WarcRecord.REVISIT,
|
warc_type=warctools.WarcRecord.REVISIT,
|
||||||
refers_to=recorded_url.dedup_info['id'],
|
|
||||||
refers_to_target_uri=recorded_url.dedup_info['url'],
|
refers_to_target_uri=recorded_url.dedup_info['url'],
|
||||||
refers_to_date=recorded_url.dedup_info['date'],
|
refers_to_date=recorded_url.dedup_info['date'],
|
||||||
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
|
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
|
||||||
@ -87,8 +86,8 @@ class WarcRecordBuilder:
|
|||||||
|
|
||||||
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
|
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
|
||||||
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
|
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
|
||||||
profile=None, refers_to=None, refers_to_target_uri=None,
|
profile=None, refers_to_target_uri=None, refers_to_date=None,
|
||||||
refers_to_date=None, payload_digest=None):
|
payload_digest=None):
|
||||||
|
|
||||||
if warc_date is None:
|
if warc_date is None:
|
||||||
warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow())
|
warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow())
|
||||||
@ -105,8 +104,6 @@ class WarcRecordBuilder:
|
|||||||
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
|
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
|
||||||
if profile is not None:
|
if profile is not None:
|
||||||
headers.append((warctools.WarcRecord.PROFILE, profile))
|
headers.append((warctools.WarcRecord.PROFILE, profile))
|
||||||
if refers_to is not None:
|
|
||||||
headers.append((warctools.WarcRecord.REFERS_TO, refers_to))
|
|
||||||
if refers_to_target_uri is not None:
|
if refers_to_target_uri is not None:
|
||||||
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
|
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
|
||||||
if refers_to_date is not None:
|
if refers_to_date is not None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user