From 4367da7bbd3854d5fae7a095814da636dbb2ace4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 15 Oct 2013 15:52:26 -0700 Subject: [PATCH] write warcs! --- src/warcprox/proxy.py | 134 ++++++++++++++++++++++++++++-------------- 1 file changed, 91 insertions(+), 43 deletions(-) diff --git a/src/warcprox/proxy.py b/src/warcprox/proxy.py index b9883d7..117c4d7 100755 --- a/src/warcprox/proxy.py +++ b/src/warcprox/proxy.py @@ -8,17 +8,15 @@ # from socketserver import ThreadingMixIn # from http.client import HTTPResponse -from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler -from SocketServer import ThreadingMixIn -from httplib import HTTPResponse -from socket import socket -import OpenSSL.crypto -import OpenSSL.SSL +import BaseHTTPServer, SocketServer +import httplib +import socket +import urlparse +import OpenSSL.crypto, OpenSSL.crypto +import ssl import logging import sys -import ssl -from hanzo.warctools import WarcRecord -from hanzo.warctools.warc import warc_datetime_str +from hanzo import warctools import uuid import hashlib from datetime import datetime @@ -27,6 +25,7 @@ import Queue import threading import os.path import argparse +import os __author__ = 'Nadeem Douba' __copyright__ = 'Copyright 2012, PyMiProxy Project' @@ -52,11 +51,11 @@ class UnsupportedSchemeException(Exception): pass -class ProxyHandler(BaseHTTPRequestHandler): +class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def __init__(self, request, client_address, server): self.is_connect = False - BaseHTTPRequestHandler.__init__(self, request, client_address, server) + BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server) def _connect_to_host(self): # Get hostname and port to connect to @@ -64,13 +63,13 @@ class ProxyHandler(BaseHTTPRequestHandler): self.hostname, self.port = self.path.split(':') else: self.url = self.path - u = urlparse(self.url) + u = urlparse.urlparse(self.url) if u.scheme != 'http': raise UnsupportedSchemeException('Unknown scheme %s' % repr(u.scheme)) self.hostname = u.hostname self.port = u.port or 80 - self.path = urlunparse( - ParseResult( + self.path = urlparse.urlunparse( + urlparse.ParseResult( scheme='', netloc='', params=u.params, @@ -81,7 +80,7 @@ class ProxyHandler(BaseHTTPRequestHandler): ) # Connect to destination - self._proxy_sock = socket() + self._proxy_sock = socket.socket() self._proxy_sock.settimeout(10) self._proxy_sock.connect((self.hostname, int(self.port))) @@ -144,7 +143,7 @@ class ProxyHandler(BaseHTTPRequestHandler): self._proxy_sock.sendall(self.mitm_request(req)) # Parse response - h = HTTPResponse(self._proxy_sock) + h = httplib.HTTPResponse(self._proxy_sock) h.begin() # Get rid of the pesky header @@ -213,10 +212,10 @@ class InvalidInterceptorPluginException(Exception): pass -class MitmProxy(HTTPServer): +class MitmProxy(BaseHTTPServer.HTTPServer): def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, certfile='warcprox.pem'): - HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) + BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) self._res_plugins = [] self._req_plugins = [] self.certfile = certfile @@ -254,22 +253,22 @@ class MitmProxy(HTTPServer): def server_activate(self): - HTTPServer.server_activate(self) + BaseHTTPServer.HTTPServer.server_activate(self) logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) def server_close(self): - HTTPServer.server_close(self) + BaseHTTPServer.HTTPServer.server_close(self) logging.info('shut down') -class AsyncMitmProxy(ThreadingMixIn, MitmProxy): +class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy): pass class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin): - warc_record_out_queue = Queue.Queue() + warc_record_queue = Queue.Queue() def __init__(self, server, msg): InterceptorPlugin.__init__(self, server, msg) @@ -282,8 +281,8 @@ class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin): else: netloc = '{0}:{1}'.format(msg.hostname, msg.port) - self.url = urlunparse( - ParseResult( + self.url = urlparse.urlunparse( + urlparse.ParseResult( scheme='https', netloc=netloc, params='', @@ -309,56 +308,105 @@ class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin): def do_response(self, data): logging.info('{0} << {1}'.format(self.url, repr(data[:100]))) - warc_record_id = self.make_warc_uuid("{0} {1}".format(self.url, time.time())) - logging.info('{0}: {1}'.format(WarcRecord.ID, warc_record_id)) + warc_record_id = self.make_warc_uuid("{0} {1}".format(self.url, datetime.now().isoformat())) + logging.info('{0}: {1}'.format(warctools.WarcRecord.ID, warc_record_id)) headers = [] - headers.append((WarcRecord.ID, warc_record_id)) - headers.append((WarcRecord.URL, self.url)) - headers.append((WarcRecord.DATE, warc_datetime_str(datetime.now()))) - # headers.append((WarcRecord.IP_ADDRESS, ip)) - headers.append((WarcRecord.TYPE, WarcRecord.RESPONSE)) + headers.append((warctools.WarcRecord.ID, warc_record_id)) + headers.append((warctools.WarcRecord.URL, self.url)) + headers.append((warctools.WarcRecord.DATE, warctools.warc.warc_datetime_str(datetime.now()))) + # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) + headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.RESPONSE)) - warcrecord = WarcRecord(headers=headers, content=("application/http;msgtype=response", data)) + warcrecord = warctools.WarcRecord(headers=headers, content=("application/http;msgtype=response", data)) # warcrecord.write_to(sys.stdout, gzip=False) - WarcRecordQueuer.warc_record_out_queue.put(warcrecord) + WarcRecordQueuer.warc_record_queue.put(warcrecord) return data class WarcWriterThread(threading.Thread): - def __init__(self, warc_record_in_queue): + def __init__(self, warc_record_queue, directory, gzip, prefix, size, port): threading.Thread.__init__(self, name='WarcWriterThread') - self.warc_record_in_queue = warc_record_in_queue + + self.warc_record_queue = warc_record_queue + + self.directory = directory + self.gzip = gzip + self.prefix = prefix + self.size = size + self.port = port + + self._f = None + self._fpath = None + self._serial = 0 + + if not os.path.exists(directory): + logging.info("warc destination directory {0} doesn't exist, creating it".format(directory)) + os.mkdir(directory) + self.stop = threading.Event() + def timestamp17(self): + now = datetime.now() + return '{0}{1}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) + + + def _close_writer(self): + final_name = self._fpath[:-5] + logging.info('closing {0}'.format(final_name)) + self._f.close() + os.rename(self._fpath, final_name) + + self._fpath = None + self._f = None + + + # + def _writer(self): + if self._fpath and os.path.getsize(self._fpath) > self.size: + self._close_writer() + + if self._f == None: + self._fpath = '{0}/{1}-{2}-{3:05d}-{4}-{5}-{6}.warc{7}.open'.format( + self.directory, self.prefix, self.timestamp17(), self._serial, os.getpid(), socket.gethostname(), self.port, '.gz' if self.gzip else '') + self._f = open(self._fpath, 'wb') + self._serial += 1 + + return self._f + + def run(self): - logging.info('WarcWriterThread starting') + logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format(self.directory, self.gzip, self.prefix, self.size, self.port)) while not self.stop.is_set(): try: - warc_record = self.warc_record_in_queue.get(block=True, timeout=0.5) + warc_record = self.warc_record_queue.get(block=True, timeout=0.5) logging.info('got warc record to write from the queue: {0}'.format(warc_record)) - # warc_record.write_to(sys.stdout, gzip=False) + warc_record.write_to(self._writer(), gzip=self.gzip) + self._f.flush() except Queue.Empty: pass logging.info('WarcWriterThread shutting down') + self._close_writer(); if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') - arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy') + arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on') arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on') - arg_parser.add_argument('-d', '--dir', dest='dir', default='warcs', help='where to write warcs') + arg_parser.add_argument('-c', '--certfile', dest='certfile', default='warcprox.pem', help='SSL certificate file; if file does not exist, it will be created') + arg_parser.add_argument('-d', '--dir', dest='directory', default='warcs', help='where to write warcs') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') - arg_parser.add_argument('-c', '--certfile', dest='certfile', default='warcprox.pem', help="SSL certificate file; if file does not exist, it will be created") + arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes') # --max-file-size=maxArcFileSize] # [--ispartof=warcinfo ispartof] # [--description=warcinfo description] @@ -366,10 +414,10 @@ if __name__ == '__main__': # [--httpheader=warcinfo httpheader] args = arg_parser.parse_args() - proxy = AsyncMitmProxy(server_address=(args.bind, int(args.port)), certfile=args.certfile) + proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), certfile=args.certfile) proxy.register_interceptor(WarcRecordQueuer) - warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_out_queue) + warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_queue, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port)) warc_writer.start() try: