From e370ec6fe2896026bdb895bce28fa6277e37a1f6 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 30 Oct 2013 13:36:32 -0700 Subject: [PATCH] refactor so that warc records are constructed in the warc writer thread; this way the disk-based dedup lookup, to decide whether to write a revisit record, happens out-of-band; and maybe more importantly, now all dedup db reading and writing happens in a single thread, so we don't have to worry about dbm thread safety; also, dedup info is not saved or looked up for urls with empty payload --- warcprox.py | 196 +++++++++++++++++++++++++++------------------------- 1 file changed, 100 insertions(+), 96 deletions(-) diff --git a/warcprox.py b/warcprox.py index 5f7f71f..93ef4e1 100755 --- a/warcprox.py +++ b/warcprox.py @@ -183,6 +183,12 @@ class ProxyingRecorder: def __len__(self): return self.len + def payload_size(self): + if self.payload_offset is not None: + return self.len - self.payload_offset + else: + return 0 + class ProxyingRecordingHTTPResponse(httplib.HTTPResponse): @@ -325,7 +331,9 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): h.close() self._proxy_sock.close() - self.server.recordset_q.create_and_queue(self.url, req, h.recorder, remote_ip) + recorded_url = RecordedUrl(url=self.url, request_data=req, + response_recorder=h.recorder, remote_ip=remote_ip) + self.server.recorded_url_q.put(recorded_url) def __getattr__(self, item): @@ -341,14 +349,22 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.log_date_time_string(), fmt % args)) +class RecordedUrl: + def __init__(self, url, request_data, response_recorder, remote_ip): + self.url = url + self.request_data = request_data + self.response_recorder = response_recorder + self.remote_ip = remote_ip + + class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): def __init__(self, server_address, req_handler_class=WarcProxyHandler, bind_and_activate=True, ca_file='./warcprox-ca.pem', - certs_dir='./warcprox-ca', recordset_q=None): + certs_dir='./warcprox-ca', recorded_url_q=None): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) self.ca = CertificateAuthority(ca_file, certs_dir) - self.recordset_q = recordset_q + self.recorded_url_q = recorded_url_q def server_activate(self): BaseHTTPServer.HTTPServer.server_activate(self) @@ -374,21 +390,16 @@ class DedupDb: self.db.close() - def warc_record_written(self, record, warcfile, offset): - warc_type = record.get_header(warctools.WarcRecord.TYPE) - if warc_type != warctools.WarcRecord.RESPONSE: - return + def save(self, key, response_record, offset): + record_id = response_record.get_header(warctools.WarcRecord.ID) + url = response_record.get_header(warctools.WarcRecord.URL) + date = response_record.get_header(warctools.WarcRecord.DATE) - key = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST) - if key is None: - return + py_value = {'i':record_id, 'u':url, 'd':date} + json_value = json.dumps(py_value, separators=(',',':')) - record_id = record.get_header(warctools.WarcRecord.ID) - url = record.get_header(warctools.WarcRecord.URL) - date = record.get_header(warctools.WarcRecord.DATE) - - value = json.dumps({'i':record_id, 'u':url, 'd':date}) - self.db[key] = value + self.db[key] = json_value + logging.info('dedup db saved {}={}'.format(key, json_value)) def lookup(self, key): @@ -400,57 +411,85 @@ class DedupDb: return None -# Each item in the queue is a tuple of warc records, which should be written -# consecutively in the same warc. -class WarcRecordsetQueue(Queue.Queue): +class WarcWriterThread(threading.Thread): - def __init__(self, base32=False, dedup_db=None): - Queue.Queue.__init__(self) + # port is only used for warc filename + def __init__(self, recorded_url_q, directory, rollover_size=1000000000, + rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0, + base32=False, dedup_db=None): + threading.Thread.__init__(self, name='WarcWriterThread') + + self.recorded_url_q = recorded_url_q + + self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + + self.gzip = gzip self.base32 = base32 + self.dedup_db = dedup_db + # warc path and filename stuff + self.directory = directory + self.prefix = prefix + self.port = port - def create_and_queue(self, url, request_data, response_recorder, remote_ip): + self._f = None + self._fpath = None + self._serial = 0 + + if not os.path.exists(directory): + logging.info("warc destination directory {} doesn't exist, creating it".format(directory)) + os.mkdir(directory) + + self.stop = threading.Event() + + self.listeners = [] + + # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record + def build_warc_records(self, recorded_url): warc_date = warctools.warc.warc_datetime_str(datetime.now()) dedup_info = None - if dedup_db is not None and response_recorder.payload_sha1 is not None: - key = 'sha1:{}'.format(self.digest_str(response_recorder.payload_sha1)) + if dedup_db is not None and recorded_url.response_recorder.payload_sha1 is not None: + key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1)) dedup_info = dedup_db.lookup(key) if dedup_info is not None: # revisit record - response_recorder.tempfile.seek(0) - if response_recorder.payload_offset is not None: - response_header_block = response_recorder.tempfile.read(response_recorder.payload_offset) + recorded_url.response_recorder.tempfile.seek(0) + if recorded_url.response_recorder.payload_offset is not None: + response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset) else: - response_header_block = response_recorder.tempfile.read() + response_header_block = recorded_url.response_recorder.tempfile.read() - principal_record, principal_record_id = self.make_record(url=url, - warc_date=warc_date, data=response_header_block, + principal_record, principal_record_id = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, refers_to=dedup_info['i'], refers_to_target_uri=dedup_info['u'], refers_to_date=dedup_info['d'], profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE, - remote_ip=remote_ip) + remote_ip=recorded_url.remote_ip) else: # response record - principal_record, principal_record_id = self.make_record(url=url, - warc_date=warc_date, recorder=response_recorder, + principal_record, principal_record_id = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + recorder=recorded_url.response_recorder, warc_type=warctools.WarcRecord.RESPONSE, content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE, - remote_ip=remote_ip) + remote_ip=recorded_url.remote_ip) - request_record, request_record_id = self.make_record(url=url, - warc_date=warc_date, data=request_data, + request_record, request_record_id = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + data=recorded_url.request_data, warc_type=warctools.WarcRecord.REQUEST, content_type=warctools.WarcRecord.HTTP_REQUEST_MIMETYPE, concurrent_to=principal_record_id) - record_group = (principal_record, request_record) - self.put(record_group) + return principal_record, request_record def digest_str(self, hash_obj): @@ -460,7 +499,7 @@ class WarcRecordsetQueue(Queue.Queue): return hash_obj.hexdigest() - def make_record(self, url, warc_date=None, recorder=None, data=None, + def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, profile=None, refers_to=None, refers_to_target_uri=None, refers_to_date=None): @@ -491,7 +530,6 @@ class WarcRecordsetQueue(Queue.Queue): if content_type is not None: headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) - if recorder is not None: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) headers.append((warctools.WarcRecord.BLOCK_DIGEST, @@ -513,38 +551,6 @@ class WarcRecordsetQueue(Queue.Queue): return record, record_id - -class WarcWriterThread(threading.Thread): - - # port is only used for warc filename - def __init__(self, recordset_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0): - threading.Thread.__init__(self, name='WarcWriterThread') - - self.recordset_q = recordset_q - - self.rollover_size = rollover_size - self.rollover_idle_time = rollover_idle_time - - self.gzip = gzip - - # warc path and filename stuff - self.directory = directory - self.prefix = prefix - self.port = port - - self._f = None - self._fpath = None - self._serial = 0 - - if not os.path.exists(directory): - logging.info("warc destination directory {} doesn't exist, creating it".format(directory)) - os.mkdir(directory) - - self.stop = threading.Event() - - self.listeners = [] - - def timestamp17(self): now = datetime.now() return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) @@ -559,7 +565,7 @@ class WarcWriterThread(threading.Thread): self._fpath = None self._f = None - def _make_warcinfo_record(self, filename): + def _build_warcinfo_record(self, filename): warc_record_date = warctools.warc.warc_datetime_str(datetime.now()) record_id = warctools.WarcRecord.random_warc_uuid() @@ -575,7 +581,7 @@ class WarcWriterThread(threading.Thread): warcinfo_fields.append('hostname: {0}'.format(hostname)) warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname))) warcinfo_fields.append('format: WARC File Format 1.0') - warcinfo_fields.append('robots: ignore') # XXX implement robots support + # warcinfo_fields.append('robots: ignore') # warcinfo_fields.append('description: {0}'.format(self.description)) # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) data = '\r\n'.join(warcinfo_fields) + '\r\n' @@ -598,7 +604,7 @@ class WarcWriterThread(threading.Thread): self._f = open(self._fpath, 'wb') - warcinfo_record = self._make_warcinfo_record(filename) + warcinfo_record = self._build_warcinfo_record(filename) warcinfo_record.write_to(self._f, gzip=self.gzip) self._serial += 1 @@ -606,10 +612,14 @@ class WarcWriterThread(threading.Thread): return self._f - def register_listener(self, listener): - """listener should be a function that takes 3 arguments (record, warcfile, offset)""" - self.listeners.append(listener) + def _final_tasks(self, recorded_url, recordset, recordset_offset): + if (self.dedup_db is not None + and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1)) + self.dedup_db.save(key, recordset[0], recordset_offset) + recorded_url.response_recorder.tempfile.close() def run(self): logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( @@ -620,11 +630,14 @@ class WarcWriterThread(threading.Thread): while not self.stop.is_set(): try: - recordset = self.recordset_q.get(block=True, timeout=0.5) - + recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + self._last_activity = time.time() + + recordset = self.build_warc_records(recorded_url) writer = self._writer() + recordset_offset = writer.tell() for record in recordset: offset = writer.tell() @@ -635,11 +648,10 @@ class WarcWriterThread(threading.Thread): record.get_header(warctools.WarcRecord.URL), self._fpath, offset)) - for listener in self.listeners: - listener(record, self._fpath, offset) - self._f.flush() + self._final_tasks(recorded_url, recordset, recordset_offset) + except Queue.Empty: if (self._fpath is not None and self.rollover_idle_time is not None @@ -707,25 +719,17 @@ if __name__ == '__main__': else: dedup_db = DedupDb(args.dedup_db_file) - recordset_q = WarcRecordsetQueue(base32=args.base32, dedup_db=dedup_db) + recorded_url_q = Queue.Queue() proxy = WarcProxy(server_address=(args.address, int(args.port)), ca_file=args.cacert, certs_dir=args.certs_dir, - recordset_q=recordset_q) + recorded_url_q=recorded_url_q) - warc_writer = WarcWriterThread(recordset_q=recordset_q, + warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, port=int(args.port), rollover_size=int(args.size), - rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) - - def close_content_file(record, warcfile, offset): - if record.content_file: - logging.info('closing record.content_file={}'.format(record.content_file)) - record.content_file.close() - - warc_writer.register_listener(close_content_file) - if dedup_db is not None: - warc_writer.register_listener(dedup_db.warc_record_written) + rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, + base32=args.base32, dedup_db=dedup_db) proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start()