support multiple captures of same url in the same second (revisits and non-revisits)

This commit is contained in:
Noah Levitt 2013-11-22 11:19:27 -08:00
parent 28c8dd81f9
commit bdd218d338

View File

@ -261,7 +261,7 @@ class MitmProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def _transition_to_ssl(self):
self.request = self.connection = ssl.wrap_socket(self.connection,
self.request = self.connection = ssl.wrap_socket(self.connection,
server_side=True, certfile=self.server.ca[self.hostname])
@ -330,7 +330,7 @@ class MitmProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
return self.do_COMMAND
def log_error(self, fmt, *args):
self.logger.error("{0} - - [{1}] {2}".format(self.address_string(),
self.logger.error("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
def log_message(self, fmt, *args):
@ -345,14 +345,14 @@ class WarcProxyHandler(MitmProxyHandler):
def _proxy_request(self):
# Build request
req = '%s %s %s\r\n' % (self.command, self.path, self.request_version)
# Add headers to the request
req += '%s\r\n' % self.headers
# Append message body if present to the request
if 'Content-Length' in self.headers:
req += self.rfile.read(int(self.headers['Content-Length']))
# Send it down the pipe!
self._proxy_sock.sendall(req)
@ -363,14 +363,14 @@ class WarcProxyHandler(MitmProxyHandler):
# client. So we ignore the values returned by h.read() below. Instead
# the ProxyingRecordingHTTPResponse takes care of sending the raw bytes
# to the proxy client.
# Proxy and record the response
h = ProxyingRecordingHTTPResponse(self._proxy_sock,
proxy_dest=self.connection,
h = ProxyingRecordingHTTPResponse(self._proxy_sock,
proxy_dest=self.connection,
digest_algorithm=self.server.digest_algorithm)
h.begin()
buf = h.read(8192)
buf = h.read(8192)
while buf != '':
buf = h.read(8192)
@ -399,7 +399,7 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
logger = logging.getLogger('warcprox.WarcProxy')
def __init__(self, server_address=('localhost', 8000),
req_handler_class=WarcProxyHandler, bind_and_activate=True,
req_handler_class=WarcProxyHandler, bind_and_activate=True,
ca=None, recorded_url_q=None, digest_algorithm='sha1'):
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
@ -493,7 +493,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
sz = len(headers)
while True:
buf = payload_fh.read(8192)
buf = payload_fh.read(8192)
if buf == '': break
self.connection.sendall(buf)
sz += len(buf)
@ -501,8 +501,8 @@ class PlaybackProxyHandler(MitmProxyHandler):
return status, sz
def _send_headers_and_refd_payload(self, headers, refers_to_target_uri, refers_to_date):
location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date)
def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date):
location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to)
self.logger.debug('loading http payload from {}'.format(location))
fh = self._open_warc_at_offset(location['f'], location['o'])
@ -537,7 +537,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type == warctools.WarcRecord.RESPONSE:
@ -557,11 +557,12 @@ class PlaybackProxyHandler(MitmProxyHandler):
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
raise Exception('unknown revisit record profile {}'.format(warc_profile))
refers_to = record.get_header(warctools.WarcRecord.REFERS_TO)
refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI)
refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE)
self.logger.debug('revisit record references {} capture of {}'.format(refers_to_date, refers_to_target_uri))
return self._send_headers_and_refd_payload(record.content[1], refers_to_target_uri, refers_to_date)
self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri))
return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date)
else:
raise Exception('unknown warc record type {}'.format(warc_type))
@ -575,8 +576,8 @@ class PlaybackProxyHandler(MitmProxyHandler):
class PlaybackProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
logger = logging.getLogger('warcprox.PlaybackProxy')
def __init__(self, server_address, req_handler_class=PlaybackProxyHandler,
bind_and_activate=True, ca=None, playback_index_db=None,
def __init__(self, server_address, req_handler_class=PlaybackProxyHandler,
bind_and_activate=True, ca=None, playback_index_db=None,
warcs_dir=None):
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = ca
@ -633,8 +634,8 @@ class DedupDb(object):
class WarcWriterThread(threading.Thread):
logger = logging.getLogger('warcprox.WarcWriterThread')
# port is only used for warc filename
def __init__(self, recorded_url_q=None, directory='./warcs',
# port is only used for warc filename
def __init__(self, recorded_url_q=None, directory='./warcs',
rollover_size=1000000000, rollover_idle_time=None, gzip=False,
prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False,
dedup_db=None, playback_index_db=None):
@ -661,7 +662,7 @@ class WarcWriterThread(threading.Thread):
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory)
@ -699,16 +700,16 @@ class WarcWriterThread(threading.Thread):
else:
# response record
principal_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
recorder=recorded_url.response_recorder,
url=recorded_url.url, warc_date=warc_date,
recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
request_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=recorded_url.request_data,
warc_type=warctools.WarcRecord.REQUEST,
url=recorded_url.url, warc_date=warc_date,
data=recorded_url.request_data,
warc_type=warctools.WarcRecord.REQUEST,
content_type=httptools.RequestMessage.CONTENT_TYPE,
concurrent_to=principal_record.id)
@ -753,10 +754,10 @@ class WarcWriterThread(threading.Thread):
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder))))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
self.digest_str(recorder.block_digest)))
if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
self.digest_str(recorder.payload_digest)))
recorder.tempfile.seek(0)
@ -765,7 +766,7 @@ class WarcWriterThread(threading.Thread):
else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data))))
block_digest = hashlib.new(self.digest_algorithm, data)
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
self.digest_str(block_digest)))
content_tuple = content_type, data
@ -805,8 +806,8 @@ class WarcWriterThread(threading.Thread):
warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)))
warcinfo_fields.append('format: WARC File Format 1.0')
# warcinfo_fields.append('robots: ignore')
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = '\r\n'.join(warcinfo_fields) + '\r\n'
record = warctools.WarcRecord(headers=headers, content=('application/warc-fields', data))
@ -849,7 +850,7 @@ class WarcWriterThread(threading.Thread):
def run(self):
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.directory), self.gzip, self.rollover_size,
os.path.abspath(self.directory), self.gzip, self.rollover_size,
self.rollover_idle_time, self.prefix, self.port))
self._last_sync = self._last_activity = time.time()
@ -859,9 +860,9 @@ class WarcWriterThread(threading.Thread):
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self._last_activity = time.time()
recordset = self.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
@ -879,17 +880,17 @@ class WarcWriterThread(threading.Thread):
self._final_tasks(recorded_url, recordset, recordset_offset)
except Queue.Empty:
if (self._fpath is not None
and self.rollover_idle_time is not None
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
self._close_writer()
if time.time() - self._last_sync > 60:
if self.dedup_db:
if self.dedup_db:
self.dedup_db.sync()
if self.playback_index_db:
if self.playback_index_db:
self.playback_index_db.sync()
self._last_sync = time.time()
@ -912,23 +913,32 @@ class PlaybackIndexDb(object):
def close(self):
self.db.close()
def sync(self):
self.db.sync()
def save(self, warcfile, recordset, offset):
response_record = recordset[0]
# XXX canonicalize url?
url = response_record.get_header(warctools.WarcRecord.URL)
date = response_record.get_header(warctools.WarcRecord.DATE)
record_id = response_record.get_header(warctools.WarcRecord.ID)
# url:{date1:{'f':warcfile,'o':response_offset,'q':request_offset,'t':response/revisit,'u':revisit_target_url,'d':revisit_target_date},date2:{...},...}
# there could be two visits of same url in the same second, and WARC-Date is
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
if url in self.db:
existing_json_value = self.db[url]
py_value = json.loads(existing_json_value)
else:
py_value = {}
py_value[date] = {'f':warcfile, 'o':offset}
if date in py_value:
py_value[date].append({'f':warcfile, 'o':offset, 'i':record_id})
else:
py_value[date] = [{'f':warcfile, 'o':offset, 'i':record_id}]
json_value = json.dumps(py_value, separators=(',',':'))
@ -942,36 +952,43 @@ class PlaybackIndexDb(object):
return None, None
json_value = self.db[url]
self.logger.debug("'{}':{}".format(url, json_value))
py_value = json.loads(json_value)
latest_date = max(py_value)
return latest_date, py_value[latest_date]
return latest_date, py_value[latest_date][0]
def lookup_exact(self, url, warc_date):
def lookup_exact(self, url, warc_date, record_id):
if url not in self.db:
return None
json_value = self.db[url]
self.logger.debug("'{}':{}".format(url, json_value))
py_value = json.loads(json_value)
if warc_date in py_value:
return py_value[warc_date]
for record in py_value[warc_date]:
if record['i'] == record_id:
self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url)))
return record
else:
self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url)))
return None
class WarcproxController(object):
logger = logging.getLogger('warcprox.WarcproxController')
def __init__(self, proxy=None, warc_writer=None, playback_proxy=None):
"""
Create warcprox controller.
If supplied, proxy should be an instance of WarcProxy, and warc_writer
should be an instance of WarcWriterThread. If not supplied, they are
created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If not
supplied, no playback proxy will run.
"""
@ -987,7 +1004,7 @@ class WarcproxController(object):
self.playback_proxy = playback_proxy
def run_until_shutdown(self):
"""Start warcprox and run until shut down.
@ -997,11 +1014,11 @@ class WarcproxController(object):
proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
self.warc_writer.start()
if self.playback_proxy is not None:
playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread')
playback_proxy_thread.start()
self.stop = threading.Event()
try:
@ -1009,7 +1026,7 @@ class WarcproxController(object):
self.logger.info('SIGTERM will initiate graceful shutdown')
except ValueError:
pass
try:
while not self.stop.is_set():
time.sleep(0.5)
@ -1019,10 +1036,10 @@ class WarcproxController(object):
self.warc_writer.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
if self.warc_writer.dedup_db is not None:
self.warc_writer.dedup_db.close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()
self.playback_proxy.server_close()
@ -1040,29 +1057,29 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address',
arg_parser.add_argument('-b', '--address', dest='address',
default='localhost', help='address to listen on')
arg_parser.add_argument('-c', '--cacert', dest='cacert',
default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
arg_parser.add_argument('-c', '--cacert', dest='cacert',
default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
help='CA certificate file; if file does not exist, it will be created')
arg_parser.add_argument('--certs-dir', dest='certs_dir',
default='./{0}-warcprox-ca'.format(socket.gethostname()),
arg_parser.add_argument('--certs-dir', dest='certs_dir',
default='./{0}-warcprox-ca'.format(socket.gethostname()),
help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory',
arg_parser.add_argument('-d', '--dir', dest='directory',
default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-s', '--size', dest='size',
default=1000*1000*1000,
arg_parser.add_argument('-s', '--size', dest='size',
default=1000*1000*1000,
help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None,
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hashlib.algorithms)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write digests in Base32 instead of hex')
@ -1071,7 +1088,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
@ -1094,8 +1111,8 @@ def main(argv=sys.argv):
else:
loglevel = logging.INFO
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
try:
hashlib.new(args.digest_algorithm)
@ -1121,7 +1138,7 @@ def main(argv=sys.argv):
playback_index_db = PlaybackIndexDb(args.playback_index_db_file)
playback_server_address=(args.address, int(args.playback_port))
playback_proxy = PlaybackProxy(server_address=playback_server_address,
ca=ca, playback_index_db=playback_index_db,
ca=ca, playback_index_db=playback_index_db,
warcs_dir=args.directory)
else:
playback_index_db = None
@ -1129,7 +1146,7 @@ def main(argv=sys.argv):
warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q,
directory=args.directory, gzip=args.gzip, prefix=args.prefix,
port=int(args.port), rollover_size=int(args.size),
port=int(args.port), rollover_size=int(args.size),
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None,
base32=args.base32, dedup_db=dedup_db,
digest_algorithm=args.digest_algorithm,