instant playback partially working

This commit is contained in:
Noah Levitt 2013-11-01 12:42:40 -07:00
parent dab8a956c2
commit 77d33f21a8

View File

@ -30,6 +30,7 @@ import tempfile
import base64
import anydbm
import json
import contextlib
class CertificateAuthority(object):
@ -218,13 +219,13 @@ class ProxyingRecordingHTTPResponse(httplib.HTTPResponse):
self.fp = self.recorder
class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class MitmProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.is_connect = False
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def _connect_to_host(self):
def _determine_host_port(self):
# Get hostname and port to connect to
if self.is_connect:
self.hostname, self.port = self.path.split(':')
@ -246,6 +247,7 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
)
)
def _connect_to_host(self):
# Connect to destination
self._proxy_sock = socket.socket()
self._proxy_sock.settimeout(10)
@ -265,6 +267,7 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.is_connect = True
try:
# Connect to destination first
self._determine_host_port()
self._connect_to_host()
# If successful, let's do this!
@ -301,18 +304,42 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_COMMAND(self):
if not self.is_connect:
try:
# Connect to destination
self._determine_host_port()
self._connect_to_host()
assert self.url
except Exception as e:
self.send_error(500, str(e))
return
else:
# if self.is_connect we already connected in do_CONNECT
self.url = self._construct_tunneled_url()
self._proxy_request()
def _proxy_request(self):
raise Exception('_proxy_request() not implemented in MitmProxyHandler, must be implemented in subclass!')
def __getattr__(self, item):
if item.startswith('do_'):
return self.do_COMMAND
def log_error(self, fmt, *args):
logging.error("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
def log_message(self, fmt, *args):
logging.info("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
class WarcProxyHandler(MitmProxyHandler):
def _proxy_request(self):
# Build request
req = '%s %s %s\r\n' % (self.command, self.path, self.request_version)
@ -357,19 +384,6 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.server.recorded_url_q.put(recorded_url)
def __getattr__(self, item):
if item.startswith('do_'):
return self.do_COMMAND
def log_error(self, fmt, *args):
logging.error("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
def log_message(self, fmt, *args):
logging.info("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
class RecordedUrl:
def __init__(self, url, request_data, response_recorder, remote_ip):
self.url = url
@ -381,20 +395,94 @@ class RecordedUrl:
class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, server_address, req_handler_class=WarcProxyHandler,
bind_and_activate=True, ca_file='./warcprox-ca.pem',
certs_dir='./warcprox-ca', recorded_url_q=None,
bind_and_activate=True, ca=None, recorded_url_q=None,
digest_algorithm='sha1'):
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = CertificateAuthority(ca_file, certs_dir)
self.ca = ca
self.recorded_url_q = recorded_url_q
self.digest_algorithm = digest_algorithm
def server_activate(self):
BaseHTTPServer.HTTPServer.server_activate(self)
logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
logging.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self):
logging.info('shutting down')
logging.info('WarcProxy shutting down')
BaseHTTPServer.HTTPServer.server_close(self)
class PlaybackProxyHandler(MitmProxyHandler):
def _connect_to_host(self):
# don't connect to host!
pass
def _proxy_request(self):
logging.info('PlaybackProxyHandler handling request for {}'.format(self.url))
date, location = self.server.playback_index_db.lookup_latest(self.url)
logging.info('lookup_latest returned {}:{}'.format(date, location))
response = None
if location is not None:
response = self.gather_response(location['f'], location['o'])
if response is None:
response = ('HTTP/1.1 404 Not Found\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 15\r\n'
+ '\r\n'
+ 'not in archive\n')
self.connection.sendall(response)
def gather_response(self, warcfilename, offset):
warcpath = None
for p in (os.path.sep.join([self.server.warcs_dir, warcfilename]),
os.path.sep.join([self.server.warcs_dir, '{}.open'.format(warcfilename)])):
if os.path.exists(p):
warcpath = p
if warcpath is None:
logging.error('{} not found'.format(warcfilename))
return None
fh = warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset)
with contextlib.closing(fh):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
logging.info('record_stream.read_records() returned {}'.format((offset,record,errors)))
if record:
content_type, content = record.content
return content
# if record.type == WarcRecord.RESPONSE and content_type.startswith('application/http'):
# content = parse_http_response(record)
elif errors:
logging.error('warc errors at {}:{} -- {}'.format(warcpath, offset, errors))
return None
logging.error('warctools reader returned no warc record and no errors??')
return None
class PlaybackProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, server_address, req_handler_class=PlaybackProxyHandler,
bind_and_activate=True, ca=None, playback_index_db=None,
warcs_dir=None):
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = ca
self.playback_index_db = playback_index_db
self.warcs_dir = warcs_dir
def server_activate(self):
BaseHTTPServer.HTTPServer.server_activate(self)
logging.info('PlaybackProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self):
logging.info('PlaybackProxy shutting down')
BaseHTTPServer.HTTPServer.server_close(self)
@ -412,6 +500,10 @@ class DedupDb:
def close(self):
self.db.close()
def sync(self):
# XXX depends on db impl?
self.db.sync()
def save(self, key, response_record, offset):
record_id = response_record.get_header(warctools.WarcRecord.ID)
@ -439,7 +531,8 @@ class WarcWriterThread(threading.Thread):
# port is only used for warc filename
def __init__(self, recorded_url_q, directory, rollover_size=1000000000,
rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0,
digest_algorithm='sha1', base32=False, dedup_db=None):
digest_algorithm='sha1', base32=False, dedup_db=None,
playback_index_db=None):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
@ -452,6 +545,8 @@ class WarcWriterThread(threading.Thread):
self.base32 = base32
self.dedup_db = dedup_db
self.playback_index_db = playback_index_db
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
@ -581,10 +676,10 @@ class WarcWriterThread(threading.Thread):
def _close_writer(self):
if self._fpath:
final_name = self._fpath[:-5]
logging.info('closing {0}'.format(final_name))
logging.info('closing {0}'.format(self._f_finalname))
self._f.close()
os.rename(self._fpath, final_name)
finalpath = os.path.sep.join([self.directory, self._f_finalname])
os.rename(self._fpath, finalpath)
self._fpath = None
self._f = None
@ -621,14 +716,14 @@ class WarcWriterThread(threading.Thread):
self._close_writer()
if self._f == None:
filename = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial, os.getpid(),
socket.gethostname(), self.port, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([self.directory, filename + '.open'])
self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open'])
self._f = open(self._fpath, 'wb')
warcinfo_record = self._build_warcinfo_record(filename)
warcinfo_record = self._build_warcinfo_record(self._f_finalname)
warcinfo_record.write_to(self._f, gzip=self.gzip)
self._serial += 1
@ -643,6 +738,9 @@ class WarcWriterThread(threading.Thread):
key = self.digest_str(recorded_url.response_recorder.payload_digest)
self.dedup_db.save(key, recordset[0], recordset_offset)
if self.playback_index_db is not None:
playback_index_db.save(self._f_finalname, recordset, recordset_offset)
recorded_url.response_recorder.tempfile.close()
def run(self):
@ -650,7 +748,7 @@ class WarcWriterThread(threading.Thread):
os.path.abspath(self.directory), self.gzip, self.rollover_size,
self.rollover_idle_time, self.prefix, self.port))
self._last_activity = time.time()
self._last_sync = self._last_activity = time.time()
while not self.stop.is_set():
try:
@ -684,10 +782,68 @@ class WarcWriterThread(threading.Thread):
logging.info('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
self._close_writer()
if time.time() - self._last_sync > 60:
if self.dedup_db:
self.dedup_db.sync()
if self.playback_index_db:
self.playback_index_db.sync()
self._last_sync = time.time()
logging.info('WarcWriterThread shutting down')
self._close_writer();
class PlaybackIndexDb:
def __init__(self, dbm_file='./warcprox-playback-index.db'):
if os.path.exists(dbm_file):
logging.info('opening existing playback index database {}'.format(dbm_file))
else:
logging.info('creating new playback index database {}'.format(dbm_file))
self.db = anydbm.open(dbm_file, 'c')
def close(self):
self.db.close()
def sync(self):
# XXX depends on db impl?
self.db.sync()
def save(self, warcfile, recordset, offset):
response_record = recordset[0]
# XXX canonicalize url
url = response_record.get_header(warctools.WarcRecord.URL)
date = response_record.get_header(warctools.WarcRecord.DATE)
# url:{date1:{'f':warcfile,'o':response_offset,'q':request_offset,'t':response/revisit,'u':revisit_target_url,'d':revisit_target_date},date2:{...},...}
if url in self.db:
existing_json_value = self.db[url]
py_value = json.loads(existing_json_value)
else:
py_value = {}
py_value[date] = {'f':warcfile, 'o':offset}
json_value = json.dumps(py_value, separators=(',',':'))
self.db[url] = json_value
logging.info('playback index saved: {}:{}'.format(url, json_value))
def lookup_latest(self, url):
if url not in self.db:
return None, None
json_value = self.db[url]
py_value = json.loads(json_value)
latest_date = max(py_value)
return latest_date, py_value[latest_date]
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(
@ -718,9 +874,14 @@ if __name__ == '__main__':
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hashlib.algorithms)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write SHA1 digests in Base32 instead of hex')
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
@ -739,31 +900,45 @@ if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s')
if args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = DedupDb(args.dedup_db_file)
try:
hashlib.new(args.digest_algorithm)
except Exception as e:
logging.fatal(e)
exit(1)
if args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = DedupDb(args.dedup_db_file)
recorded_url_q = Queue.Queue()
ca = CertificateAuthority(args.cacert, args.certs_dir)
proxy = WarcProxy(server_address=(args.address, int(args.port)),
ca_file=args.cacert, certs_dir=args.certs_dir,
recorded_url_q=recorded_url_q,
ca=ca, recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm)
if args.playback_port is not None:
playback_index_db = PlaybackIndexDb(args.playback_index_db_file)
playback_server_address=(args.address, int(args.playback_port))
playback_proxy = PlaybackProxy(server_address=playback_server_address,
ca=ca, playback_index_db=playback_index_db,
warcs_dir=args.directory)
playback_proxy_thread = threading.Thread(target=playback_proxy.serve_forever, name='PlaybackProxyThread')
playback_proxy_thread.start()
else:
playback_index_db = None
playback_proxy = None
warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q,
directory=args.directory, gzip=args.gzip, prefix=args.prefix,
port=int(args.port), rollover_size=int(args.size),
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None,
base32=args.base32, dedup_db=dedup_db,
digest_algorithm=args.digest_algorithm)
digest_algorithm=args.digest_algorithm,
playback_index_db=playback_index_db)
proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
@ -781,6 +956,14 @@ if __name__ == '__main__':
warc_writer.stop.set()
proxy.shutdown()
proxy.server_close()
if playback_proxy is not None:
playback_proxy.shutdown()
playback_proxy.server_close()
if dedup_db is not None:
dedup_db.close()
if playback_index_db is not None:
playback_index_db.close()