diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 85391c7..4adeba9 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -261,7 +261,7 @@ class MitmProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def _transition_to_ssl(self): - self.request = self.connection = ssl.wrap_socket(self.connection, + self.request = self.connection = ssl.wrap_socket(self.connection, server_side=True, certfile=self.server.ca[self.hostname]) @@ -330,7 +330,7 @@ class MitmProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): return self.do_COMMAND def log_error(self, fmt, *args): - self.logger.error("{0} - - [{1}] {2}".format(self.address_string(), + self.logger.error("{0} - - [{1}] {2}".format(self.address_string(), self.log_date_time_string(), fmt % args)) def log_message(self, fmt, *args): @@ -345,14 +345,14 @@ class WarcProxyHandler(MitmProxyHandler): def _proxy_request(self): # Build request req = '%s %s %s\r\n' % (self.command, self.path, self.request_version) - + # Add headers to the request req += '%s\r\n' % self.headers # Append message body if present to the request if 'Content-Length' in self.headers: req += self.rfile.read(int(self.headers['Content-Length'])) - + # Send it down the pipe! self._proxy_sock.sendall(req) @@ -363,14 +363,14 @@ class WarcProxyHandler(MitmProxyHandler): # client. So we ignore the values returned by h.read() below. Instead # the ProxyingRecordingHTTPResponse takes care of sending the raw bytes # to the proxy client. - + # Proxy and record the response - h = ProxyingRecordingHTTPResponse(self._proxy_sock, - proxy_dest=self.connection, + h = ProxyingRecordingHTTPResponse(self._proxy_sock, + proxy_dest=self.connection, digest_algorithm=self.server.digest_algorithm) h.begin() - - buf = h.read(8192) + + buf = h.read(8192) while buf != '': buf = h.read(8192) @@ -399,7 +399,7 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): logger = logging.getLogger('warcprox.WarcProxy') def __init__(self, server_address=('localhost', 8000), - req_handler_class=WarcProxyHandler, bind_and_activate=True, + req_handler_class=WarcProxyHandler, bind_and_activate=True, ca=None, recorded_url_q=None, digest_algorithm='sha1'): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) @@ -493,7 +493,7 @@ class PlaybackProxyHandler(MitmProxyHandler): sz = len(headers) while True: - buf = payload_fh.read(8192) + buf = payload_fh.read(8192) if buf == '': break self.connection.sendall(buf) sz += len(buf) @@ -501,8 +501,8 @@ class PlaybackProxyHandler(MitmProxyHandler): return status, sz - def _send_headers_and_refd_payload(self, headers, refers_to_target_uri, refers_to_date): - location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date) + def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date): + location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to) self.logger.debug('loading http payload from {}'.format(location)) fh = self._open_warc_at_offset(location['f'], location['o']) @@ -537,7 +537,7 @@ class PlaybackProxyHandler(MitmProxyHandler): if errors: raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) - + warc_type = record.get_header(warctools.WarcRecord.TYPE) if warc_type == warctools.WarcRecord.RESPONSE: @@ -557,11 +557,12 @@ class PlaybackProxyHandler(MitmProxyHandler): if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: raise Exception('unknown revisit record profile {}'.format(warc_profile)) + refers_to = record.get_header(warctools.WarcRecord.REFERS_TO) refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI) refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE) - self.logger.debug('revisit record references {} capture of {}'.format(refers_to_date, refers_to_target_uri)) - return self._send_headers_and_refd_payload(record.content[1], refers_to_target_uri, refers_to_date) + self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri)) + return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date) else: raise Exception('unknown warc record type {}'.format(warc_type)) @@ -575,8 +576,8 @@ class PlaybackProxyHandler(MitmProxyHandler): class PlaybackProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): logger = logging.getLogger('warcprox.PlaybackProxy') - def __init__(self, server_address, req_handler_class=PlaybackProxyHandler, - bind_and_activate=True, ca=None, playback_index_db=None, + def __init__(self, server_address, req_handler_class=PlaybackProxyHandler, + bind_and_activate=True, ca=None, playback_index_db=None, warcs_dir=None): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) self.ca = ca @@ -633,8 +634,8 @@ class DedupDb(object): class WarcWriterThread(threading.Thread): logger = logging.getLogger('warcprox.WarcWriterThread') - # port is only used for warc filename - def __init__(self, recorded_url_q=None, directory='./warcs', + # port is only used for warc filename + def __init__(self, recorded_url_q=None, directory='./warcs', rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False, dedup_db=None, playback_index_db=None): @@ -661,7 +662,7 @@ class WarcWriterThread(threading.Thread): self._f = None self._fpath = None self._serial = 0 - + if not os.path.exists(directory): self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) os.mkdir(directory) @@ -699,16 +700,16 @@ class WarcWriterThread(threading.Thread): else: # response record principal_record = self.build_warc_record( - url=recorded_url.url, warc_date=warc_date, - recorder=recorded_url.response_recorder, + url=recorded_url.url, warc_date=warc_date, + recorder=recorded_url.response_recorder, warc_type=warctools.WarcRecord.RESPONSE, content_type=httptools.ResponseMessage.CONTENT_TYPE, remote_ip=recorded_url.remote_ip) request_record = self.build_warc_record( - url=recorded_url.url, warc_date=warc_date, - data=recorded_url.request_data, - warc_type=warctools.WarcRecord.REQUEST, + url=recorded_url.url, warc_date=warc_date, + data=recorded_url.request_data, + warc_type=warctools.WarcRecord.REQUEST, content_type=httptools.RequestMessage.CONTENT_TYPE, concurrent_to=principal_record.id) @@ -753,10 +754,10 @@ class WarcWriterThread(threading.Thread): if recorder is not None: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, + headers.append((warctools.WarcRecord.BLOCK_DIGEST, self.digest_str(recorder.block_digest))) if recorder.payload_digest is not None: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, + headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, self.digest_str(recorder.payload_digest))) recorder.tempfile.seek(0) @@ -765,7 +766,7 @@ class WarcWriterThread(threading.Thread): else: headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)))) block_digest = hashlib.new(self.digest_algorithm, data) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, + headers.append((warctools.WarcRecord.BLOCK_DIGEST, self.digest_str(block_digest))) content_tuple = content_type, data @@ -805,8 +806,8 @@ class WarcWriterThread(threading.Thread): warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname))) warcinfo_fields.append('format: WARC File Format 1.0') # warcinfo_fields.append('robots: ignore') - # warcinfo_fields.append('description: {0}'.format(self.description)) - # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) + # warcinfo_fields.append('description: {0}'.format(self.description)) + # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) data = '\r\n'.join(warcinfo_fields) + '\r\n' record = warctools.WarcRecord(headers=headers, content=('application/warc-fields', data)) @@ -849,7 +850,7 @@ class WarcWriterThread(threading.Thread): def run(self): self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.directory), self.gzip, self.rollover_size, + os.path.abspath(self.directory), self.gzip, self.rollover_size, self.rollover_idle_time, self.prefix, self.port)) self._last_sync = self._last_activity = time.time() @@ -859,9 +860,9 @@ class WarcWriterThread(threading.Thread): recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) self._last_activity = time.time() - + recordset = self.build_warc_records(recorded_url) - + writer = self._writer() recordset_offset = writer.tell() @@ -879,17 +880,17 @@ class WarcWriterThread(threading.Thread): self._final_tasks(recorded_url, recordset, recordset_offset) except Queue.Empty: - if (self._fpath is not None - and self.rollover_idle_time is not None + if (self._fpath is not None + and self.rollover_idle_time is not None and self.rollover_idle_time > 0 and time.time() - self._last_activity > self.rollover_idle_time): self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) self._close_writer() if time.time() - self._last_sync > 60: - if self.dedup_db: + if self.dedup_db: self.dedup_db.sync() - if self.playback_index_db: + if self.playback_index_db: self.playback_index_db.sync() self._last_sync = time.time() @@ -912,23 +913,32 @@ class PlaybackIndexDb(object): def close(self): self.db.close() + def sync(self): self.db.sync() + def save(self, warcfile, recordset, offset): response_record = recordset[0] # XXX canonicalize url? url = response_record.get_header(warctools.WarcRecord.URL) date = response_record.get_header(warctools.WarcRecord.DATE) + record_id = response_record.get_header(warctools.WarcRecord.ID) - # url:{date1:{'f':warcfile,'o':response_offset,'q':request_offset,'t':response/revisit,'u':revisit_target_url,'d':revisit_target_date},date2:{...},...} + # there could be two visits of same url in the same second, and WARC-Date is + # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ + + # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} if url in self.db: existing_json_value = self.db[url] py_value = json.loads(existing_json_value) else: py_value = {} - py_value[date] = {'f':warcfile, 'o':offset} + if date in py_value: + py_value[date].append({'f':warcfile, 'o':offset, 'i':record_id}) + else: + py_value[date] = [{'f':warcfile, 'o':offset, 'i':record_id}] json_value = json.dumps(py_value, separators=(',',':')) @@ -942,36 +952,43 @@ class PlaybackIndexDb(object): return None, None json_value = self.db[url] + self.logger.debug("'{}':{}".format(url, json_value)) py_value = json.loads(json_value) latest_date = max(py_value) - return latest_date, py_value[latest_date] + return latest_date, py_value[latest_date][0] - def lookup_exact(self, url, warc_date): + def lookup_exact(self, url, warc_date, record_id): if url not in self.db: return None json_value = self.db[url] + self.logger.debug("'{}':{}".format(url, json_value)) py_value = json.loads(json_value) if warc_date in py_value: - return py_value[warc_date] + for record in py_value[warc_date]: + if record['i'] == record_id: + self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) + return record else: + self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) return None + class WarcproxController(object): logger = logging.getLogger('warcprox.WarcproxController') def __init__(self, proxy=None, warc_writer=None, playback_proxy=None): """ Create warcprox controller. - + If supplied, proxy should be an instance of WarcProxy, and warc_writer should be an instance of WarcWriterThread. If not supplied, they are created with default values. - + If supplied, playback_proxy should be an instance of PlaybackProxy. If not supplied, no playback proxy will run. """ @@ -987,7 +1004,7 @@ class WarcproxController(object): self.playback_proxy = playback_proxy - + def run_until_shutdown(self): """Start warcprox and run until shut down. @@ -997,11 +1014,11 @@ class WarcproxController(object): proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') proxy_thread.start() self.warc_writer.start() - + if self.playback_proxy is not None: playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread') playback_proxy_thread.start() - + self.stop = threading.Event() try: @@ -1009,7 +1026,7 @@ class WarcproxController(object): self.logger.info('SIGTERM will initiate graceful shutdown') except ValueError: pass - + try: while not self.stop.is_set(): time.sleep(0.5) @@ -1019,10 +1036,10 @@ class WarcproxController(object): self.warc_writer.stop.set() self.proxy.shutdown() self.proxy.server_close() - + if self.warc_writer.dedup_db is not None: self.warc_writer.dedup_db.close() - + if self.playback_proxy is not None: self.playback_proxy.shutdown() self.playback_proxy.server_close() @@ -1040,29 +1057,29 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser = argparse.ArgumentParser(prog=prog, description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-p', '--port', dest='port', default='8000', + arg_parser.add_argument('-p', '--port', dest='port', default='8000', help='port to listen on') - arg_parser.add_argument('-b', '--address', dest='address', + arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on') - arg_parser.add_argument('-c', '--cacert', dest='cacert', - default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), + arg_parser.add_argument('-c', '--cacert', dest='cacert', + default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), help='CA certificate file; if file does not exist, it will be created') - arg_parser.add_argument('--certs-dir', dest='certs_dir', - default='./{0}-warcprox-ca'.format(socket.gethostname()), + arg_parser.add_argument('--certs-dir', dest='certs_dir', + default='./{0}-warcprox-ca'.format(socket.gethostname()), help='where to store and load generated certificates') - arg_parser.add_argument('-d', '--dir', dest='directory', + arg_parser.add_argument('-d', '--dir', dest='directory', default='./warcs', help='where to write warcs') - arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') - arg_parser.add_argument('-n', '--prefix', dest='prefix', + arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') - arg_parser.add_argument('-s', '--size', dest='size', - default=1000*1000*1000, + arg_parser.add_argument('-s', '--size', dest='size', + default=1000*1000*1000, help='WARC file rollover size threshold in bytes') - arg_parser.add_argument('--rollover-idle-time', - dest='rollover_idle_time', default=None, + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") - arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', + arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', default='sha1', help='digest algorithm, one of {}'.format(', '.join(hashlib.algorithms))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write digests in Base32 instead of hex') @@ -1071,7 +1088,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('-P', '--playback-port', dest='playback_port', default=None, help='port to listen on for instant playback') arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', - default='./warcprox-playback-index.db', + default='./warcprox-playback-index.db', help='playback index database file (only used if --playback-port is specified)') arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') @@ -1094,8 +1111,8 @@ def main(argv=sys.argv): else: loglevel = logging.INFO - logging.basicConfig(stream=sys.stdout, level=loglevel, - format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + logging.basicConfig(stream=sys.stdout, level=loglevel, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') try: hashlib.new(args.digest_algorithm) @@ -1121,7 +1138,7 @@ def main(argv=sys.argv): playback_index_db = PlaybackIndexDb(args.playback_index_db_file) playback_server_address=(args.address, int(args.playback_port)) playback_proxy = PlaybackProxy(server_address=playback_server_address, - ca=ca, playback_index_db=playback_index_db, + ca=ca, playback_index_db=playback_index_db, warcs_dir=args.directory) else: playback_index_db = None @@ -1129,7 +1146,7 @@ def main(argv=sys.argv): warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, - port=int(args.port), rollover_size=int(args.size), + port=int(args.port), rollover_size=int(args.size), rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, base32=args.base32, dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,