diff --git a/warcprox.py b/warcprox.py index f0057c4..25bfd3e 100755 --- a/warcprox.py +++ b/warcprox.py @@ -19,6 +19,7 @@ import os import argparse import random import httplib +import re class CertificateAuthority(object): @@ -110,15 +111,49 @@ class Recorder: def __init__(self, fp): self.fp = fp - self.recorded = bytearray('') + self.data = bytearray('') + self.block_sha1 = hashlib.sha1() + self.payload_sha1 = None + + + def _update(self, chunk): + if self.payload_sha1 is None: + # convoluted handling of two newlines crossing chunks + # XXX write tests for this + if self.data.endswith('\n'): + if chunk.startswith('\n'): + self.payload_sha1 = hashlib.sha1() + self.payload_sha1.update(chunk[1:]) + elif chunk.startswith('\r\n'): + self.payload_sha1 = hashlib.sha1() + self.payload_sha1.update(chunk[2:]) + elif self.data.endswith('\n\r'): + if chunk.startswith('\n'): + self.payload_sha1 = hashlib.sha1() + self.payload_sha1.update(chunk[1:]) + else: + m = re.search(r'\n\r?\n', chunk) + if m is not None: + self.payload_sha1 = hashlib.sha1() + self.payload_sha1.update(chunk[m.end():]) + else: + self.payload_sha1.update(chunk) + + self.block_sha1.update(chunk) + self.data.extend(chunk) def read(self, size=-1): - result = self.fp.read(size=size) - self.recorded.extend(result) - return result + chunk = self.fp.read(size=size) + self._update(chunk) + return chunk + def readline(self, size=-1): - return self.fp.readline(size=size) + # XXX does not call self.read(); if it ever did this would break + chunk = self.fp.readline(size=size) + self._update(chunk) + return chunk + def close(self): return self.fp.close() @@ -217,6 +252,8 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): return # Extract path + warc_record_queuer = WarcRecordQueuer(self.server, self) + # Build request req = '%s %s %s\r\n' % (self.command, self.path, self.request_version) @@ -227,10 +264,10 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): if 'Content-Length' in self.headers: req += self.rfile.read(int(self.headers['Content-Length'])) - interceptors = [p(self.server, self) for p in self.server._interceptors] + warc_record_queuer.do_request(req) # Send it down the pipe! - self._proxy_sock.sendall(self.mitm_request(req, interceptors)) + self._proxy_sock.sendall(req) # Parse response h = RecordingHTTPResponse(self._proxy_sock) @@ -249,25 +286,13 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.request.sendall(buf) buf = h.read(4096) - self.mitm_response(h.recorded(), interceptors) + warc_record_queuer.do_response(h.recorder) # Let's close off the remote end h.close() self._proxy_sock.close() - def mitm_request(self, data, interceptors): - for i in interceptors: - data = i.do_request(data) - return data - - - def mitm_response(self, data, interceptors): - for i in interceptors: - data = i.do_response(data) - return data - - def __getattr__(self, item): if item.startswith('do_'): return self.do_COMMAND @@ -283,21 +308,6 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.log_date_time_string(), format % args)) -# InterceptorPlugin modified from pymiproxy to send the request and response -# from a single transaction through the same instance of the interceptor -class InterceptorPlugin(object): - - def __init__(self, server, msg): - self.server = server - self.message = msg - - def do_request(self, data): - return data - - def do_response(self, data): - return data - - class InvalidInterceptorPluginException(Exception): pass @@ -306,24 +316,15 @@ class MitmProxy(BaseHTTPServer.HTTPServer): def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, ca_file='./warcprox-ca.pem', certs_dir='./warcprox-ca'): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) - self._interceptors = [] self.ca = CertificateAuthority(ca_file, certs_dir) - - def register_interceptor(self, interceptor_class): - if not issubclass(interceptor_class, InterceptorPlugin): - raise InvalidInterceptorPluginException('Expected type InterceptorPlugin got %s instead' % type(interceptor_class)) - self._interceptors.append(interceptor_class) - - def server_activate(self): BaseHTTPServer.HTTPServer.server_activate(self) logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) - def server_close(self): + logging.info('shutting down') BaseHTTPServer.HTTPServer.server_close(self) - logging.info('shut down') class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy): @@ -331,7 +332,7 @@ class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy): # assumes do_request happens before do_response -class WarcRecordQueuer(InterceptorPlugin): +class WarcRecordQueuer: # Each item in the queue is a tuple of warc records which should be written # together, e.g. (reponse, request) where request has WARC-Concurrent-To @@ -344,7 +345,8 @@ class WarcRecordQueuer(InterceptorPlugin): def __init__(self, server, msg): - InterceptorPlugin.__init__(self, server, msg) + self.server = server + self.msg = msg if msg.is_connect: # have to construct the url if proxy request is a CONNECT @@ -379,7 +381,7 @@ class WarcRecordQueuer(InterceptorPlugin): def do_request(self, data): - logging.info('{0} >> {1}'.format(self.url, repr(data[:100]))) + logging.info('{0} >> {1}'.format(self.url, repr(data[:40]))) record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date())) @@ -393,11 +395,9 @@ class WarcRecordQueuer(InterceptorPlugin): content_tuple = "application/http;msgtype=request", data self._request_record = warctools.WarcRecord(headers=headers, content=content_tuple) - return data - - def do_response(self, data): - logging.info('{0} << {1}'.format(self.url, repr(data[:100]))) + def do_response(self, recorder): + logging.info('{0} << {1}'.format(self.url, repr(recorder.data[:40]))) record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date())) @@ -406,9 +406,12 @@ class WarcRecordQueuer(InterceptorPlugin): headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.RESPONSE)) headers.append((warctools.WarcRecord.URL, self.url)) headers.append((warctools.WarcRecord.DATE, self._warc_date())) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, 'sha1:{}'.format(recorder.block_sha1.hexdigest()))) + if recorder.payload_sha1 is not None: + headers.append(('WARC-Payload-Digest', 'sha1:{}'.format(recorder.block_sha1.hexdigest()))) # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) - content_tuple = ("application/http;msgtype=response", data) + content_tuple = ("application/http;msgtype=response", recorder.data) response_record = warctools.WarcRecord(headers=headers, content=content_tuple) @@ -420,8 +423,6 @@ class WarcRecordQueuer(InterceptorPlugin): WarcRecordQueuer.warc_record_group_queue.put(record_group) - return data - class WarcWriterThread(threading.Thread): @@ -564,10 +565,12 @@ if __name__ == '__main__': # [--httpheader=warcinfo httpheader] args = arg_parser.parse_args() - proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), ca_file=args.cacert, certs_dir=args.certs_dir) - proxy.register_interceptor(WarcRecordQueuer) + proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), + ca_file=args.cacert, certs_dir=args.certs_dir) - warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_group_queue, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port)) + warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_group_queue, + directory=args.directory, gzip=args.gzip, prefix=args.prefix, + size=int(args.size), port=int(args.port)) warc_writer.start() try: