basic deduplication on payload digest using in-memory store

This commit is contained in:
Noah Levitt 2013-10-29 18:59:21 -07:00
parent 57c21920bd
commit 975657c74b
2 changed files with 108 additions and 18 deletions

View File

@ -74,6 +74,8 @@ incorporated into warctools mainline.
- dns cache?? the system already does a fine job I'm thinking
- keepalive with remote servers?
- python3
- special handling for 304 not-modified (either write revisit record, or modify
request so server never responds with 304)
#### To not do

View File

@ -10,7 +10,6 @@ import ssl
import logging
import sys
from hanzo import warctools
import uuid
import hashlib
from datetime import datetime
import Queue
@ -120,6 +119,7 @@ class ProxyingRecorder:
# "The file has no name, and will cease to exist when it is closed."
self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024)
self.block_sha1 = hashlib.sha1()
self.payload_offset = None
self.payload_sha1 = None
self.proxy_dest = proxy_dest
self._prev_hunk_last_two_bytes = ''
@ -133,18 +133,22 @@ class ProxyingRecorder:
if hunk.startswith('\n'):
self.payload_sha1 = hashlib.sha1()
self.payload_sha1.update(hunk[1:])
self.payload_offset = self.len + 1
elif hunk.startswith('\r\n'):
self.payload_sha1 = hashlib.sha1()
self.payload_sha1.update(hunk[2:])
self.payload_offset = self.len + 2
elif self._prev_hunk_last_two_bytes == '\n\r':
if hunk.startswith('\n'):
self.payload_sha1 = hashlib.sha1()
self.payload_sha1.update(hunk[1:])
self.payload_offset = self.len + 1
else:
m = re.search(r'\n\r?\n', hunk)
if m is not None:
self.payload_sha1 = hashlib.sha1()
self.payload_sha1.update(hunk[m.end():])
self.payload_offset = self.len + m.end()
# if we still haven't found start of payload hold on to these bytes
if self.payload_sha1 is None:
@ -158,13 +162,15 @@ class ProxyingRecorder:
self.proxy_dest.sendall(hunk)
self.len += len(hunk)
def read(self, size=-1):
hunk = self.fp.read(size=size)
self._update(hunk)
return hunk
def readline(self, size=-1):
# XXX does not call self.read(); if it ever did this would break
# XXX depends on implementation details of self.fp.readline(), in
# particular that it doesn't call self.fp.read()
hunk = self.fp.readline(size=size)
self._update(hunk)
return hunk
@ -350,12 +356,41 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
logging.info('shutting down')
BaseHTTPServer.HTTPServer.server_close(self)
class DedupDb:
def __init__(self):
# XXX in memory for the moment
self.db = {}
def warc_record_written(self, record, warcfile, offset):
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
return
payload_digest = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST)
if payload_digest is None:
return
record_id = record.get_header(warctools.WarcRecord.ID)
url = record.get_header(warctools.WarcRecord.URL)
date = record.get_header(warctools.WarcRecord.DATE)
self.db[payload_digest] = {'i':record_id, 'u':url, 'd':date}
def lookup(self, key):
if key in self.db:
return self.db[key]
else:
return None
# Each item in the queue is a tuple of warc records, which should be written
# consecutively in the same warc.
class WarcRecordsetQueue(Queue.Queue):
def __init__(self, base32=False):
def __init__(self, base32=False, dedup_db=None):
Queue.Queue.__init__(self)
self.base32 = base32
@ -363,19 +398,43 @@ class WarcRecordsetQueue(Queue.Queue):
def create_and_queue(self, url, request_data, response_recorder, remote_ip):
warc_date = warctools.warc.warc_datetime_str(datetime.now())
response_record, response_record_id = self.make_record(url=url,
warc_date=warc_date, recorder=response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type="application/http;msgtype=response",
remote_ip=remote_ip)
if dedup_db is not None and response_recorder.payload_sha1 is not None:
key = 'sha1:{}'.format(self.digest_str(response_recorder.payload_sha1))
dedup_info = dedup_db.lookup(key)
if dedup_info is not None:
# revisit record
response_recorder.tempfile.seek(0)
if response_recorder.payload_offset is not None:
response_header_block = response_recorder.tempfile.read(response_recorder.payload_offset)
else:
response_header_block = response_recorder.tempfile.read()
principal_record, principal_record_id = self.make_record(url=url,
warc_date=warc_date, data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=dedup_info['i'],
refers_to_target_uri=dedup_info['u'],
refers_to_date=dedup_info['d'],
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE,
remote_ip=remote_ip)
else:
# response record
principal_record, principal_record_id = self.make_record(url=url,
warc_date=warc_date, recorder=response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE,
remote_ip=remote_ip)
request_record, request_record_id = self.make_record(url=url,
warc_date=warc_date, data=request_data,
warc_type=warctools.WarcRecord.REQUEST,
content_type="application/http;msgtype=request",
concurrent_to=response_record_id)
content_type=warctools.WarcRecord.HTTP_REQUEST_MIMETYPE,
concurrent_to=principal_record_id)
record_group = (response_record, request_record)
record_group = (principal_record, request_record)
self.put(record_group)
@ -387,7 +446,9 @@ class WarcRecordsetQueue(Queue.Queue):
def make_record(self, url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None):
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
profile=None, refers_to=None, refers_to_target_uri=None,
refers_to_date=None):
if warc_date is None:
warc_date = warctools.warc.warc_datetime_str(datetime.now())
@ -398,8 +459,16 @@ class WarcRecordsetQueue(Queue.Queue):
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
headers.append((warctools.WarcRecord.ID, record_id))
if profile is not None:
headers.append((warctools.WarcRecord.TYPE, profile))
headers.append((warctools.WarcRecord.DATE, warc_date))
headers.append((warctools.WarcRecord.URL, url))
if refers_to is not None:
headers.append((warctools.WarcRecord.REFERS_TO, refers_to))
if refers_to_target_uri is not None:
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
if refers_to_date is not None:
headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if concurrent_to is not None:
@ -407,6 +476,7 @@ class WarcRecordsetQueue(Queue.Queue):
if content_type is not None:
headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type))
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder))))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
@ -457,6 +527,8 @@ class WarcWriterThread(threading.Thread):
self.stop = threading.Event()
self.listeners = []
def timestamp17(self):
now = datetime.now()
@ -483,7 +555,7 @@ class WarcWriterThread(threading.Thread):
headers.append((warctools.WarcRecord.DATE, warc_record_date))
warcinfo_fields = []
warcinfo_fields.append('software: warcprox.py https://github.com/nlevitt/warcprox')
warcinfo_fields.append('software: warcprox.py https://github.com/internetarchive/warcprox')
hostname = socket.gethostname()
warcinfo_fields.append('hostname: {0}'.format(hostname))
warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)))
@ -519,6 +591,11 @@ class WarcWriterThread(threading.Thread):
return self._f
def register_listener(self, listener):
"""listener should be a function that takes 3 arguments (record, warcfile, offset)"""
self.listeners.append(listener)
def run(self):
logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.directory), self.gzip, self.rollover_size,
@ -543,12 +620,11 @@ class WarcWriterThread(threading.Thread):
record.get_header(warctools.WarcRecord.URL),
self._fpath, offset))
if record.content_file:
# XXX now we know we're done with this... messy to
# handle this here, but where else can it happen?
record.content_file.close()
for listener in self.listeners:
listener(record, self._fpath, offset)
self._f.flush()
except Queue.Empty:
if (self._fpath is not None
and self.rollover_idle_time is not None
@ -590,6 +666,8 @@ if __name__ == '__main__':
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write SHA1 digests in Base32 instead of hex')
# arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
# default='./dedup.db', help='persistent deduplication database file')
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
@ -608,7 +686,9 @@ if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s')
recordset_q = WarcRecordsetQueue(base32=args.base32)
dedup_db = DedupDb()
recordset_q = WarcRecordsetQueue(base32=args.base32, dedup_db=dedup_db)
proxy = WarcProxy(server_address=(args.address, int(args.port)),
ca_file=args.cacert, certs_dir=args.certs_dir,
@ -619,6 +699,14 @@ if __name__ == '__main__':
port=int(args.port), rollover_size=int(args.size),
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
def close_content_file(record, warcfile, offset):
if record.content_file:
logging.info('closing record.content_file={}'.format(record.content_file))
record.content_file.close()
warc_writer.register_listener(close_content_file)
warc_writer.register_listener(dedup_db.warc_record_written)
proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
warc_writer.start()