diff --git a/warcprox.py b/warcprox.py index 9bb9cd1..d404888 100755 --- a/warcprox.py +++ b/warcprox.py @@ -30,6 +30,7 @@ import tempfile import base64 import anydbm import json +import contextlib class CertificateAuthority(object): @@ -218,13 +219,13 @@ class ProxyingRecordingHTTPResponse(httplib.HTTPResponse): self.fp = self.recorder -class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): +class MitmProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def __init__(self, request, client_address, server): self.is_connect = False BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server) - def _connect_to_host(self): + def _determine_host_port(self): # Get hostname and port to connect to if self.is_connect: self.hostname, self.port = self.path.split(':') @@ -246,6 +247,7 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): ) ) + def _connect_to_host(self): # Connect to destination self._proxy_sock = socket.socket() self._proxy_sock.settimeout(10) @@ -265,6 +267,7 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.is_connect = True try: # Connect to destination first + self._determine_host_port() self._connect_to_host() # If successful, let's do this! @@ -301,18 +304,42 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_COMMAND(self): - if not self.is_connect: try: # Connect to destination + self._determine_host_port() self._connect_to_host() assert self.url except Exception as e: self.send_error(500, str(e)) return else: + # if self.is_connect we already connected in do_CONNECT self.url = self._construct_tunneled_url() + self._proxy_request() + + + def _proxy_request(self): + raise Exception('_proxy_request() not implemented in MitmProxyHandler, must be implemented in subclass!') + + + def __getattr__(self, item): + if item.startswith('do_'): + return self.do_COMMAND + + def log_error(self, fmt, *args): + logging.error("{0} - - [{1}] {2}".format(self.address_string(), + self.log_date_time_string(), fmt % args)) + + def log_message(self, fmt, *args): + logging.info("{0} - - [{1}] {2}".format(self.address_string(), + self.log_date_time_string(), fmt % args)) + + +class WarcProxyHandler(MitmProxyHandler): + + def _proxy_request(self): # Build request req = '%s %s %s\r\n' % (self.command, self.path, self.request_version) @@ -357,19 +384,6 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.server.recorded_url_q.put(recorded_url) - def __getattr__(self, item): - if item.startswith('do_'): - return self.do_COMMAND - - def log_error(self, fmt, *args): - logging.error("{0} - - [{1}] {2}".format(self.address_string(), - self.log_date_time_string(), fmt % args)) - - def log_message(self, fmt, *args): - logging.info("{0} - - [{1}] {2}".format(self.address_string(), - self.log_date_time_string(), fmt % args)) - - class RecordedUrl: def __init__(self, url, request_data, response_recorder, remote_ip): self.url = url @@ -381,20 +395,94 @@ class RecordedUrl: class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): def __init__(self, server_address, req_handler_class=WarcProxyHandler, - bind_and_activate=True, ca_file='./warcprox-ca.pem', - certs_dir='./warcprox-ca', recorded_url_q=None, + bind_and_activate=True, ca=None, recorded_url_q=None, digest_algorithm='sha1'): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) - self.ca = CertificateAuthority(ca_file, certs_dir) + self.ca = ca self.recorded_url_q = recorded_url_q self.digest_algorithm = digest_algorithm def server_activate(self): BaseHTTPServer.HTTPServer.server_activate(self) - logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) + logging.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) def server_close(self): - logging.info('shutting down') + logging.info('WarcProxy shutting down') + BaseHTTPServer.HTTPServer.server_close(self) + + +class PlaybackProxyHandler(MitmProxyHandler): + + def _connect_to_host(self): + # don't connect to host! + pass + + def _proxy_request(self): + logging.info('PlaybackProxyHandler handling request for {}'.format(self.url)) + + date, location = self.server.playback_index_db.lookup_latest(self.url) + logging.info('lookup_latest returned {}:{}'.format(date, location)) + + response = None + if location is not None: + response = self.gather_response(location['f'], location['o']) + + if response is None: + response = ('HTTP/1.1 404 Not Found\r\n' + + 'Content-Type: text/plain\r\n' + + 'Content-Length: 15\r\n' + + '\r\n' + + 'not in archive\n') + + self.connection.sendall(response) + + def gather_response(self, warcfilename, offset): + warcpath = None + for p in (os.path.sep.join([self.server.warcs_dir, warcfilename]), + os.path.sep.join([self.server.warcs_dir, '{}.open'.format(warcfilename)])): + if os.path.exists(p): + warcpath = p + + if warcpath is None: + logging.error('{} not found'.format(warcfilename)) + return None + + fh = warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset) + with contextlib.closing(fh): + for (offset, record, errors) in fh.read_records(limit=1, offsets=True): + pass + + logging.info('record_stream.read_records() returned {}'.format((offset,record,errors))) + + if record: + content_type, content = record.content + return content + # if record.type == WarcRecord.RESPONSE and content_type.startswith('application/http'): + # content = parse_http_response(record) + elif errors: + logging.error('warc errors at {}:{} -- {}'.format(warcpath, offset, errors)) + return None + + logging.error('warctools reader returned no warc record and no errors??') + return None + + +class PlaybackProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): + + def __init__(self, server_address, req_handler_class=PlaybackProxyHandler, + bind_and_activate=True, ca=None, playback_index_db=None, + warcs_dir=None): + BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) + self.ca = ca + self.playback_index_db = playback_index_db + self.warcs_dir = warcs_dir + + def server_activate(self): + BaseHTTPServer.HTTPServer.server_activate(self) + logging.info('PlaybackProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) + + def server_close(self): + logging.info('PlaybackProxy shutting down') BaseHTTPServer.HTTPServer.server_close(self) @@ -412,6 +500,10 @@ class DedupDb: def close(self): self.db.close() + def sync(self): + # XXX depends on db impl? + self.db.sync() + def save(self, key, response_record, offset): record_id = response_record.get_header(warctools.WarcRecord.ID) @@ -439,7 +531,8 @@ class WarcWriterThread(threading.Thread): # port is only used for warc filename def __init__(self, recorded_url_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0, - digest_algorithm='sha1', base32=False, dedup_db=None): + digest_algorithm='sha1', base32=False, dedup_db=None, + playback_index_db=None): threading.Thread.__init__(self, name='WarcWriterThread') self.recorded_url_q = recorded_url_q @@ -452,6 +545,8 @@ class WarcWriterThread(threading.Thread): self.base32 = base32 self.dedup_db = dedup_db + self.playback_index_db = playback_index_db + # warc path and filename stuff self.directory = directory self.prefix = prefix @@ -581,10 +676,10 @@ class WarcWriterThread(threading.Thread): def _close_writer(self): if self._fpath: - final_name = self._fpath[:-5] - logging.info('closing {0}'.format(final_name)) + logging.info('closing {0}'.format(self._f_finalname)) self._f.close() - os.rename(self._fpath, final_name) + finalpath = os.path.sep.join([self.directory, self._f_finalname]) + os.rename(self._fpath, finalpath) self._fpath = None self._f = None @@ -621,14 +716,14 @@ class WarcWriterThread(threading.Thread): self._close_writer() if self._f == None: - filename = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( + self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( self.prefix, self.timestamp17(), self._serial, os.getpid(), socket.gethostname(), self.port, '.gz' if self.gzip else '') - self._fpath = os.path.sep.join([self.directory, filename + '.open']) + self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open']) self._f = open(self._fpath, 'wb') - warcinfo_record = self._build_warcinfo_record(filename) + warcinfo_record = self._build_warcinfo_record(self._f_finalname) warcinfo_record.write_to(self._f, gzip=self.gzip) self._serial += 1 @@ -643,6 +738,9 @@ class WarcWriterThread(threading.Thread): key = self.digest_str(recorded_url.response_recorder.payload_digest) self.dedup_db.save(key, recordset[0], recordset_offset) + if self.playback_index_db is not None: + playback_index_db.save(self._f_finalname, recordset, recordset_offset) + recorded_url.response_recorder.tempfile.close() def run(self): @@ -650,7 +748,7 @@ class WarcWriterThread(threading.Thread): os.path.abspath(self.directory), self.gzip, self.rollover_size, self.rollover_idle_time, self.prefix, self.port)) - self._last_activity = time.time() + self._last_sync = self._last_activity = time.time() while not self.stop.is_set(): try: @@ -684,10 +782,68 @@ class WarcWriterThread(threading.Thread): logging.info('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) self._close_writer() + if time.time() - self._last_sync > 60: + if self.dedup_db: + self.dedup_db.sync() + if self.playback_index_db: + self.playback_index_db.sync() + self._last_sync = time.time() + logging.info('WarcWriterThread shutting down') self._close_writer(); +class PlaybackIndexDb: + + def __init__(self, dbm_file='./warcprox-playback-index.db'): + if os.path.exists(dbm_file): + logging.info('opening existing playback index database {}'.format(dbm_file)) + else: + logging.info('creating new playback index database {}'.format(dbm_file)) + + self.db = anydbm.open(dbm_file, 'c') + + + def close(self): + self.db.close() + + def sync(self): + # XXX depends on db impl? + self.db.sync() + + + def save(self, warcfile, recordset, offset): + response_record = recordset[0] + # XXX canonicalize url + url = response_record.get_header(warctools.WarcRecord.URL) + date = response_record.get_header(warctools.WarcRecord.DATE) + + # url:{date1:{'f':warcfile,'o':response_offset,'q':request_offset,'t':response/revisit,'u':revisit_target_url,'d':revisit_target_date},date2:{...},...} + if url in self.db: + existing_json_value = self.db[url] + py_value = json.loads(existing_json_value) + else: + py_value = {} + + py_value[date] = {'f':warcfile, 'o':offset} + + json_value = json.dumps(py_value, separators=(',',':')) + + self.db[url] = json_value + + logging.info('playback index saved: {}:{}'.format(url, json_value)) + + + def lookup_latest(self, url): + if url not in self.db: + return None, None + + json_value = self.db[url] + py_value = json.loads(json_value) + + latest_date = max(py_value) + return latest_date, py_value[latest_date] + if __name__ == '__main__': arg_parser = argparse.ArgumentParser( @@ -718,9 +874,14 @@ if __name__ == '__main__': arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', default='sha1', help='digest algorithm, one of {}'.format(', '.join(hashlib.algorithms))) arg_parser.add_argument('--base32', dest='base32', action='store_true', - default=False, help='write SHA1 digests in Base32 instead of hex') + default=False, help='write digests in Base32 instead of hex') arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', + default=None, help='port to listen on for instant playback') + arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + default='./warcprox-playback-index.db', + help='playback index database file (only used if --playback-port is specified)') arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') # [--ispartof=warcinfo ispartof] @@ -739,31 +900,45 @@ if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=loglevel, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') - if args.dedup_db_file in (None, '', '/dev/null'): - logging.info('deduplication disabled') - dedup_db = None - else: - dedup_db = DedupDb(args.dedup_db_file) - try: hashlib.new(args.digest_algorithm) except Exception as e: logging.fatal(e) exit(1) + if args.dedup_db_file in (None, '', '/dev/null'): + logging.info('deduplication disabled') + dedup_db = None + else: + dedup_db = DedupDb(args.dedup_db_file) + recorded_url_q = Queue.Queue() + ca = CertificateAuthority(args.cacert, args.certs_dir) + proxy = WarcProxy(server_address=(args.address, int(args.port)), - ca_file=args.cacert, certs_dir=args.certs_dir, - recorded_url_q=recorded_url_q, + ca=ca, recorded_url_q=recorded_url_q, digest_algorithm=args.digest_algorithm) + if args.playback_port is not None: + playback_index_db = PlaybackIndexDb(args.playback_index_db_file) + playback_server_address=(args.address, int(args.playback_port)) + playback_proxy = PlaybackProxy(server_address=playback_server_address, + ca=ca, playback_index_db=playback_index_db, + warcs_dir=args.directory) + playback_proxy_thread = threading.Thread(target=playback_proxy.serve_forever, name='PlaybackProxyThread') + playback_proxy_thread.start() + else: + playback_index_db = None + playback_proxy = None + warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, port=int(args.port), rollover_size=int(args.size), rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, base32=args.base32, dedup_db=dedup_db, - digest_algorithm=args.digest_algorithm) + digest_algorithm=args.digest_algorithm, + playback_index_db=playback_index_db) proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start() @@ -781,6 +956,14 @@ if __name__ == '__main__': warc_writer.stop.set() proxy.shutdown() proxy.server_close() + + if playback_proxy is not None: + playback_proxy.shutdown() + playback_proxy.server_close() + if dedup_db is not None: dedup_db.close() + if playback_index_db is not None: + playback_index_db.close() +