diff --git a/README.md b/README.md index e298b4f..f8d5094 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ incorporated into warctools mainline. - dns cache?? the system already does a fine job I'm thinking - keepalive with remote servers? - python3 +- special handling for 304 not-modified (either write revisit record, or modify + request so server never responds with 304) #### To not do diff --git a/warcprox.py b/warcprox.py index 66dcf7e..f985ef9 100755 --- a/warcprox.py +++ b/warcprox.py @@ -10,7 +10,6 @@ import ssl import logging import sys from hanzo import warctools -import uuid import hashlib from datetime import datetime import Queue @@ -120,6 +119,7 @@ class ProxyingRecorder: # "The file has no name, and will cease to exist when it is closed." self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) self.block_sha1 = hashlib.sha1() + self.payload_offset = None self.payload_sha1 = None self.proxy_dest = proxy_dest self._prev_hunk_last_two_bytes = '' @@ -133,18 +133,22 @@ class ProxyingRecorder: if hunk.startswith('\n'): self.payload_sha1 = hashlib.sha1() self.payload_sha1.update(hunk[1:]) + self.payload_offset = self.len + 1 elif hunk.startswith('\r\n'): self.payload_sha1 = hashlib.sha1() self.payload_sha1.update(hunk[2:]) + self.payload_offset = self.len + 2 elif self._prev_hunk_last_two_bytes == '\n\r': if hunk.startswith('\n'): self.payload_sha1 = hashlib.sha1() self.payload_sha1.update(hunk[1:]) + self.payload_offset = self.len + 1 else: m = re.search(r'\n\r?\n', hunk) if m is not None: self.payload_sha1 = hashlib.sha1() self.payload_sha1.update(hunk[m.end():]) + self.payload_offset = self.len + m.end() # if we still haven't found start of payload hold on to these bytes if self.payload_sha1 is None: @@ -158,13 +162,15 @@ class ProxyingRecorder: self.proxy_dest.sendall(hunk) self.len += len(hunk) + def read(self, size=-1): hunk = self.fp.read(size=size) self._update(hunk) return hunk def readline(self, size=-1): - # XXX does not call self.read(); if it ever did this would break + # XXX depends on implementation details of self.fp.readline(), in + # particular that it doesn't call self.fp.read() hunk = self.fp.readline(size=size) self._update(hunk) return hunk @@ -350,12 +356,41 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): logging.info('shutting down') BaseHTTPServer.HTTPServer.server_close(self) +class DedupDb: + + def __init__(self): + # XXX in memory for the moment + self.db = {} + + + def warc_record_written(self, record, warcfile, offset): + warc_type = record.get_header(warctools.WarcRecord.TYPE) + if warc_type != warctools.WarcRecord.RESPONSE: + return + + payload_digest = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST) + if payload_digest is None: + return + + record_id = record.get_header(warctools.WarcRecord.ID) + url = record.get_header(warctools.WarcRecord.URL) + date = record.get_header(warctools.WarcRecord.DATE) + + self.db[payload_digest] = {'i':record_id, 'u':url, 'd':date} + + + def lookup(self, key): + if key in self.db: + return self.db[key] + else: + return None + # Each item in the queue is a tuple of warc records, which should be written # consecutively in the same warc. class WarcRecordsetQueue(Queue.Queue): - def __init__(self, base32=False): + def __init__(self, base32=False, dedup_db=None): Queue.Queue.__init__(self) self.base32 = base32 @@ -363,19 +398,43 @@ class WarcRecordsetQueue(Queue.Queue): def create_and_queue(self, url, request_data, response_recorder, remote_ip): warc_date = warctools.warc.warc_datetime_str(datetime.now()) - response_record, response_record_id = self.make_record(url=url, - warc_date=warc_date, recorder=response_recorder, - warc_type=warctools.WarcRecord.RESPONSE, - content_type="application/http;msgtype=response", - remote_ip=remote_ip) + if dedup_db is not None and response_recorder.payload_sha1 is not None: + key = 'sha1:{}'.format(self.digest_str(response_recorder.payload_sha1)) + dedup_info = dedup_db.lookup(key) + + if dedup_info is not None: + # revisit record + response_recorder.tempfile.seek(0) + if response_recorder.payload_offset is not None: + response_header_block = response_recorder.tempfile.read(response_recorder.payload_offset) + else: + response_header_block = response_recorder.tempfile.read() + + principal_record, principal_record_id = self.make_record(url=url, + warc_date=warc_date, data=response_header_block, + warc_type=warctools.WarcRecord.REVISIT, + refers_to=dedup_info['i'], + refers_to_target_uri=dedup_info['u'], + refers_to_date=dedup_info['d'], + profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, + content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE, + remote_ip=remote_ip) + + else: + # response record + principal_record, principal_record_id = self.make_record(url=url, + warc_date=warc_date, recorder=response_recorder, + warc_type=warctools.WarcRecord.RESPONSE, + content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE, + remote_ip=remote_ip) request_record, request_record_id = self.make_record(url=url, warc_date=warc_date, data=request_data, warc_type=warctools.WarcRecord.REQUEST, - content_type="application/http;msgtype=request", - concurrent_to=response_record_id) + content_type=warctools.WarcRecord.HTTP_REQUEST_MIMETYPE, + concurrent_to=principal_record_id) - record_group = (response_record, request_record) + record_group = (principal_record, request_record) self.put(record_group) @@ -387,7 +446,9 @@ class WarcRecordsetQueue(Queue.Queue): def make_record(self, url, warc_date=None, recorder=None, data=None, - concurrent_to=None, warc_type=None, content_type=None, remote_ip=None): + concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, + profile=None, refers_to=None, refers_to_target_uri=None, + refers_to_date=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.now()) @@ -398,8 +459,16 @@ class WarcRecordsetQueue(Queue.Queue): if warc_type is not None: headers.append((warctools.WarcRecord.TYPE, warc_type)) headers.append((warctools.WarcRecord.ID, record_id)) + if profile is not None: + headers.append((warctools.WarcRecord.TYPE, profile)) headers.append((warctools.WarcRecord.DATE, warc_date)) headers.append((warctools.WarcRecord.URL, url)) + if refers_to is not None: + headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) + if refers_to_target_uri is not None: + headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) + if refers_to_date is not None: + headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date)) if remote_ip is not None: headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if concurrent_to is not None: @@ -407,6 +476,7 @@ class WarcRecordsetQueue(Queue.Queue): if content_type is not None: headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) + if recorder is not None: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, @@ -457,6 +527,8 @@ class WarcWriterThread(threading.Thread): self.stop = threading.Event() + self.listeners = [] + def timestamp17(self): now = datetime.now() @@ -483,7 +555,7 @@ class WarcWriterThread(threading.Thread): headers.append((warctools.WarcRecord.DATE, warc_record_date)) warcinfo_fields = [] - warcinfo_fields.append('software: warcprox.py https://github.com/nlevitt/warcprox') + warcinfo_fields.append('software: warcprox.py https://github.com/internetarchive/warcprox') hostname = socket.gethostname() warcinfo_fields.append('hostname: {0}'.format(hostname)) warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname))) @@ -519,6 +591,11 @@ class WarcWriterThread(threading.Thread): return self._f + def register_listener(self, listener): + """listener should be a function that takes 3 arguments (record, warcfile, offset)""" + self.listeners.append(listener) + + def run(self): logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( os.path.abspath(self.directory), self.gzip, self.rollover_size, @@ -543,12 +620,11 @@ class WarcWriterThread(threading.Thread): record.get_header(warctools.WarcRecord.URL), self._fpath, offset)) - if record.content_file: - # XXX now we know we're done with this... messy to - # handle this here, but where else can it happen? - record.content_file.close() + for listener in self.listeners: + listener(record, self._fpath, offset) self._f.flush() + except Queue.Empty: if (self._fpath is not None and self.rollover_idle_time is not None @@ -590,6 +666,8 @@ if __name__ == '__main__': help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write SHA1 digests in Base32 instead of hex') + # arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + # default='./dedup.db', help='persistent deduplication database file') arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') # [--ispartof=warcinfo ispartof] @@ -608,7 +686,9 @@ if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=loglevel, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') - recordset_q = WarcRecordsetQueue(base32=args.base32) + dedup_db = DedupDb() + + recordset_q = WarcRecordsetQueue(base32=args.base32, dedup_db=dedup_db) proxy = WarcProxy(server_address=(args.address, int(args.port)), ca_file=args.cacert, certs_dir=args.certs_dir, @@ -619,6 +699,14 @@ if __name__ == '__main__': port=int(args.port), rollover_size=int(args.size), rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) + def close_content_file(record, warcfile, offset): + if record.content_file: + logging.info('closing record.content_file={}'.format(record.content_file)) + record.content_file.close() + + warc_writer.register_listener(close_content_file) + warc_writer.register_listener(dedup_db.warc_record_written) + proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start() warc_writer.start()