diff --git a/warcprox.py b/warcprox.py index eb661b7..80c5d1b 100755 --- a/warcprox.py +++ b/warcprox.py @@ -112,8 +112,10 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): if 'Content-Length' in self.headers: req += self.rfile.read(int(self.headers['Content-Length'])) + interceptors = [p(self.server, self) for p in self.server._interceptors] + # Send it down the pipe! - self._proxy_sock.sendall(self.mitm_request(req)) + self._proxy_sock.sendall(self.mitm_request(req, interceptors)) # Parse response h = httplib.HTTPResponse(self._proxy_sock) @@ -132,18 +134,18 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self._proxy_sock.close() # Relay the message - self.request.sendall(self.mitm_response(res)) + self.request.sendall(self.mitm_response(res, interceptors)) - def mitm_request(self, data): - for p in self.server._req_plugins: - data = p(self.server, self).do_request(data) + def mitm_request(self, data, interceptors): + for i in interceptors: + data = i.do_request(data) return data - def mitm_response(self, data): - for p in self.server._res_plugins: - data = p(self.server, self).do_response(data) + def mitm_response(self, data, interceptors): + for i in interceptors: + data = i.do_response(data) return data @@ -162,21 +164,17 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.log_date_time_string(), format % args)) +# InterceptorPlugin modified from pymiproxy to send the request and response +# from a single transaction through the same instance of the interceptor class InterceptorPlugin(object): def __init__(self, server, msg): self.server = server self.message = msg - -class RequestInterceptorPlugin(InterceptorPlugin): - def do_request(self, data): return data - -class ResponseInterceptorPlugin(InterceptorPlugin): - def do_response(self, data): return data @@ -189,8 +187,7 @@ class MitmProxy(BaseHTTPServer.HTTPServer): def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, certfile='warcprox.pem'): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) - self._res_plugins = [] - self._req_plugins = [] + self._interceptors = [] self.certfile = certfile if not os.path.exists(certfile): @@ -219,10 +216,7 @@ class MitmProxy(BaseHTTPServer.HTTPServer): def register_interceptor(self, interceptor_class): if not issubclass(interceptor_class, InterceptorPlugin): raise InvalidInterceptorPluginException('Expected type InterceptorPlugin got %s instead' % type(interceptor_class)) - if issubclass(interceptor_class, RequestInterceptorPlugin): - self._req_plugins.append(interceptor_class) - if issubclass(interceptor_class, ResponseInterceptorPlugin): - self._res_plugins.append(interceptor_class) + self._interceptors.append(interceptor_class) def server_activate(self): @@ -239,14 +233,24 @@ class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy): pass -class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin): +# assumes do_request happens before do_response +class WarcRecordQueuer(InterceptorPlugin): + + # Each item in the queue is a tuple of warc records which should be written + # together, e.g. (reponse, request) where request has WARC-Concurrent-To + # pointing to response. + warc_record_group_queue = Queue.Queue() + + @staticmethod + def make_warc_uuid(text): + return "".format(uuid.UUID(hashlib.sha1(text).hexdigest()[0:32])) - warc_record_queue = Queue.Queue() def __init__(self, server, msg): InterceptorPlugin.__init__(self, server, msg) if msg.is_connect: + # have to construct the url if proxy request is a CONNECT assert not msg.url if int(msg.port) == 443: @@ -269,44 +273,65 @@ class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin): self.url = msg.url + def _warc_date(self): + try: + return self._d + except AttributeError: + self._d = warctools.warc.warc_datetime_str(datetime.now()) + return self._d + + def do_request(self, data): logging.info('{0} >> {1}'.format(self.url, repr(data[:100]))) + + record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date())) + + headers = [] + headers.append((warctools.WarcRecord.ID, record_id)) + headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.REQUEST)) + headers.append((warctools.WarcRecord.URL, self.url)) + headers.append((warctools.WarcRecord.DATE, self._warc_date())) + # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) + + content_tuple = "application/http;msgtype=request", data + self._request_record = warctools.WarcRecord(headers=headers, content=content_tuple) + return data - @staticmethod - def make_warc_uuid(text): - return "".format(uuid.UUID(hashlib.sha1(text).hexdigest()[0:32])) - - def do_response(self, data): logging.info('{0} << {1}'.format(self.url, repr(data[:100]))) - warc_record_date = warctools.warc.warc_datetime_str(datetime.now()) - warc_record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, warc_record_date)) - logging.info('{0}: {1}'.format(warctools.WarcRecord.ID, warc_record_id)) + record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date())) headers = [] - headers.append((warctools.WarcRecord.ID, warc_record_id)) - headers.append((warctools.WarcRecord.URL, self.url)) - headers.append((warctools.WarcRecord.DATE, warc_record_date)) - # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) + headers.append((warctools.WarcRecord.ID, record_id)) headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.RESPONSE)) + headers.append((warctools.WarcRecord.URL, self.url)) + headers.append((warctools.WarcRecord.DATE, self._warc_date())) + # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) - warcrecord = warctools.WarcRecord(headers=headers, content=("application/http;msgtype=response", data)) + content_tuple = ("application/http;msgtype=response", data) - # warcrecord.write_to(sys.stdout, gzip=False) - WarcRecordQueuer.warc_record_queue.put(warcrecord) + response_record = warctools.WarcRecord(headers=headers, content=content_tuple) + + try: + self._request_record.set_header(warctools.WarcRecord.CONCURRENT_TO, record_id) + record_group = response_record, self._request_record + except AttributeError: + record_group = response_record, # tuple with one item + + WarcRecordQueuer.warc_record_group_queue.put(record_group) return data class WarcWriterThread(threading.Thread): - def __init__(self, warc_record_queue, directory, gzip, prefix, size, port): + def __init__(self, warc_record_group_queue, directory, gzip, prefix, size, port): threading.Thread.__init__(self, name='WarcWriterThread') - self.warc_record_queue = warc_record_queue + self.warc_record_group_queue = warc_record_group_queue self.directory = directory self.gzip = gzip @@ -359,10 +384,10 @@ class WarcWriterThread(threading.Thread): # http-header-user-agent: Mozilla/5.0 (compatible; archive.org_bot; Archive-It; +http://archive-it.org/files/site-owners.html) def _make_warcinfo_record(self, filename): warc_record_date = warctools.warc.warc_datetime_str(datetime.now()) - warc_record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(filename, warc_record_date)) + record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(filename, warc_record_date)) headers = [] - headers.append((warctools.WarcRecord.ID, warc_record_id)) + headers.append((warctools.WarcRecord.ID, record_id)) headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO)) headers.append((warctools.WarcRecord.FILENAME, filename)) headers.append((warctools.WarcRecord.DATE, warc_record_date)) @@ -379,9 +404,9 @@ class WarcWriterThread(threading.Thread): # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) data = '\r\n'.join(warcinfo_fields) + '\r\n' - warcrecord = warctools.WarcRecord(headers=headers, content=('application/warc-fields', data)) + record = warctools.WarcRecord(headers=headers, content=('application/warc-fields', data)) - return warcrecord + return record # @@ -405,13 +430,15 @@ class WarcWriterThread(threading.Thread): def run(self): - logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format(self.directory, self.gzip, self.prefix, self.size, self.port)) + logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format( + os.path.abspath(self.directory), self.gzip, self.prefix, self.size, self.port)) while not self.stop.is_set(): try: - warc_record = self.warc_record_queue.get(block=True, timeout=0.5) - logging.info('got warc record to write from the queue: {0}'.format(warc_record)) - warc_record.write_to(self._writer(), gzip=self.gzip) + warc_record_group = self.warc_record_group_queue.get(block=True, timeout=0.5) + logging.info('got warc record group to write from the queue: {0}'.format(warc_record_group)) + for record in warc_record_group: + record.write_to(self._writer(), gzip=self.gzip) self._f.flush() except Queue.Empty: pass @@ -432,7 +459,6 @@ if __name__ == '__main__': arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes') - # --max-file-size=maxArcFileSize] # [--ispartof=warcinfo ispartof] # [--description=warcinfo description] # [--operator=warcinfo operator] @@ -442,7 +468,7 @@ if __name__ == '__main__': proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), certfile=args.certfile) proxy.register_interceptor(WarcRecordQueuer) - warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_queue, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port)) + warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_group_queue, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port)) warc_writer.start() try: