write warcs!

This commit is contained in:
Noah Levitt 2013-10-15 15:52:26 -07:00
parent 6345845b48
commit 4367da7bbd

View File

@ -8,17 +8,15 @@
# from socketserver import ThreadingMixIn # from socketserver import ThreadingMixIn
# from http.client import HTTPResponse # from http.client import HTTPResponse
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler import BaseHTTPServer, SocketServer
from SocketServer import ThreadingMixIn import httplib
from httplib import HTTPResponse import socket
from socket import socket import urlparse
import OpenSSL.crypto import OpenSSL.crypto, OpenSSL.crypto
import OpenSSL.SSL import ssl
import logging import logging
import sys import sys
import ssl from hanzo import warctools
from hanzo.warctools import WarcRecord
from hanzo.warctools.warc import warc_datetime_str
import uuid import uuid
import hashlib import hashlib
from datetime import datetime from datetime import datetime
@ -27,6 +25,7 @@ import Queue
import threading import threading
import os.path import os.path
import argparse import argparse
import os
__author__ = 'Nadeem Douba' __author__ = 'Nadeem Douba'
__copyright__ = 'Copyright 2012, PyMiProxy Project' __copyright__ = 'Copyright 2012, PyMiProxy Project'
@ -52,11 +51,11 @@ class UnsupportedSchemeException(Exception):
pass pass
class ProxyHandler(BaseHTTPRequestHandler): class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def __init__(self, request, client_address, server): def __init__(self, request, client_address, server):
self.is_connect = False self.is_connect = False
BaseHTTPRequestHandler.__init__(self, request, client_address, server) BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def _connect_to_host(self): def _connect_to_host(self):
# Get hostname and port to connect to # Get hostname and port to connect to
@ -64,13 +63,13 @@ class ProxyHandler(BaseHTTPRequestHandler):
self.hostname, self.port = self.path.split(':') self.hostname, self.port = self.path.split(':')
else: else:
self.url = self.path self.url = self.path
u = urlparse(self.url) u = urlparse.urlparse(self.url)
if u.scheme != 'http': if u.scheme != 'http':
raise UnsupportedSchemeException('Unknown scheme %s' % repr(u.scheme)) raise UnsupportedSchemeException('Unknown scheme %s' % repr(u.scheme))
self.hostname = u.hostname self.hostname = u.hostname
self.port = u.port or 80 self.port = u.port or 80
self.path = urlunparse( self.path = urlparse.urlunparse(
ParseResult( urlparse.ParseResult(
scheme='', scheme='',
netloc='', netloc='',
params=u.params, params=u.params,
@ -81,7 +80,7 @@ class ProxyHandler(BaseHTTPRequestHandler):
) )
# Connect to destination # Connect to destination
self._proxy_sock = socket() self._proxy_sock = socket.socket()
self._proxy_sock.settimeout(10) self._proxy_sock.settimeout(10)
self._proxy_sock.connect((self.hostname, int(self.port))) self._proxy_sock.connect((self.hostname, int(self.port)))
@ -144,7 +143,7 @@ class ProxyHandler(BaseHTTPRequestHandler):
self._proxy_sock.sendall(self.mitm_request(req)) self._proxy_sock.sendall(self.mitm_request(req))
# Parse response # Parse response
h = HTTPResponse(self._proxy_sock) h = httplib.HTTPResponse(self._proxy_sock)
h.begin() h.begin()
# Get rid of the pesky header # Get rid of the pesky header
@ -213,10 +212,10 @@ class InvalidInterceptorPluginException(Exception):
pass pass
class MitmProxy(HTTPServer): class MitmProxy(BaseHTTPServer.HTTPServer):
def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, certfile='warcprox.pem'): def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, certfile='warcprox.pem'):
HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self._res_plugins = [] self._res_plugins = []
self._req_plugins = [] self._req_plugins = []
self.certfile = certfile self.certfile = certfile
@ -254,22 +253,22 @@ class MitmProxy(HTTPServer):
def server_activate(self): def server_activate(self):
HTTPServer.server_activate(self) BaseHTTPServer.HTTPServer.server_activate(self)
logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self): def server_close(self):
HTTPServer.server_close(self) BaseHTTPServer.HTTPServer.server_close(self)
logging.info('shut down') logging.info('shut down')
class AsyncMitmProxy(ThreadingMixIn, MitmProxy): class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy):
pass pass
class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin): class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin):
warc_record_out_queue = Queue.Queue() warc_record_queue = Queue.Queue()
def __init__(self, server, msg): def __init__(self, server, msg):
InterceptorPlugin.__init__(self, server, msg) InterceptorPlugin.__init__(self, server, msg)
@ -282,8 +281,8 @@ class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin):
else: else:
netloc = '{0}:{1}'.format(msg.hostname, msg.port) netloc = '{0}:{1}'.format(msg.hostname, msg.port)
self.url = urlunparse( self.url = urlparse.urlunparse(
ParseResult( urlparse.ParseResult(
scheme='https', scheme='https',
netloc=netloc, netloc=netloc,
params='', params='',
@ -309,56 +308,105 @@ class WarcRecordQueuer(RequestInterceptorPlugin, ResponseInterceptorPlugin):
def do_response(self, data): def do_response(self, data):
logging.info('{0} << {1}'.format(self.url, repr(data[:100]))) logging.info('{0} << {1}'.format(self.url, repr(data[:100])))
warc_record_id = self.make_warc_uuid("{0} {1}".format(self.url, time.time())) warc_record_id = self.make_warc_uuid("{0} {1}".format(self.url, datetime.now().isoformat()))
logging.info('{0}: {1}'.format(WarcRecord.ID, warc_record_id)) logging.info('{0}: {1}'.format(warctools.WarcRecord.ID, warc_record_id))
headers = [] headers = []
headers.append((WarcRecord.ID, warc_record_id)) headers.append((warctools.WarcRecord.ID, warc_record_id))
headers.append((WarcRecord.URL, self.url)) headers.append((warctools.WarcRecord.URL, self.url))
headers.append((WarcRecord.DATE, warc_datetime_str(datetime.now()))) headers.append((warctools.WarcRecord.DATE, warctools.warc.warc_datetime_str(datetime.now())))
# headers.append((WarcRecord.IP_ADDRESS, ip)) # headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
headers.append((WarcRecord.TYPE, WarcRecord.RESPONSE)) headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.RESPONSE))
warcrecord = WarcRecord(headers=headers, content=("application/http;msgtype=response", data)) warcrecord = warctools.WarcRecord(headers=headers, content=("application/http;msgtype=response", data))
# warcrecord.write_to(sys.stdout, gzip=False) # warcrecord.write_to(sys.stdout, gzip=False)
WarcRecordQueuer.warc_record_out_queue.put(warcrecord) WarcRecordQueuer.warc_record_queue.put(warcrecord)
return data return data
class WarcWriterThread(threading.Thread): class WarcWriterThread(threading.Thread):
def __init__(self, warc_record_in_queue): def __init__(self, warc_record_queue, directory, gzip, prefix, size, port):
threading.Thread.__init__(self, name='WarcWriterThread') threading.Thread.__init__(self, name='WarcWriterThread')
self.warc_record_in_queue = warc_record_in_queue
self.warc_record_queue = warc_record_queue
self.directory = directory
self.gzip = gzip
self.prefix = prefix
self.size = size
self.port = port
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
logging.info("warc destination directory {0} doesn't exist, creating it".format(directory))
os.mkdir(directory)
self.stop = threading.Event() self.stop = threading.Event()
def timestamp17(self):
now = datetime.now()
return '{0}{1}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
def _close_writer(self):
final_name = self._fpath[:-5]
logging.info('closing {0}'.format(final_name))
self._f.close()
os.rename(self._fpath, final_name)
self._fpath = None
self._f = None
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.size:
self._close_writer()
if self._f == None:
self._fpath = '{0}/{1}-{2}-{3:05d}-{4}-{5}-{6}.warc{7}.open'.format(
self.directory, self.prefix, self.timestamp17(), self._serial, os.getpid(), socket.gethostname(), self.port, '.gz' if self.gzip else '')
self._f = open(self._fpath, 'wb')
self._serial += 1
return self._f
def run(self): def run(self):
logging.info('WarcWriterThread starting') logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format(self.directory, self.gzip, self.prefix, self.size, self.port))
while not self.stop.is_set(): while not self.stop.is_set():
try: try:
warc_record = self.warc_record_in_queue.get(block=True, timeout=0.5) warc_record = self.warc_record_queue.get(block=True, timeout=0.5)
logging.info('got warc record to write from the queue: {0}'.format(warc_record)) logging.info('got warc record to write from the queue: {0}'.format(warc_record))
# warc_record.write_to(sys.stdout, gzip=False) warc_record.write_to(self._writer(), gzip=self.gzip)
self._f.flush()
except Queue.Empty: except Queue.Empty:
pass pass
logging.info('WarcWriterThread shutting down') logging.info('WarcWriterThread shutting down')
self._close_writer();
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s')
arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy') arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on') arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on') arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on')
arg_parser.add_argument('-d', '--dir', dest='dir', default='warcs', help='where to write warcs') arg_parser.add_argument('-c', '--certfile', dest='certfile', default='warcprox.pem', help='SSL certificate file; if file does not exist, it will be created')
arg_parser.add_argument('-d', '--dir', dest='directory', default='warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-c', '--certfile', dest='certfile', default='warcprox.pem', help="SSL certificate file; if file does not exist, it will be created") arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes')
# --max-file-size=maxArcFileSize] # --max-file-size=maxArcFileSize]
# [--ispartof=warcinfo ispartof] # [--ispartof=warcinfo ispartof]
# [--description=warcinfo description] # [--description=warcinfo description]
@ -366,10 +414,10 @@ if __name__ == '__main__':
# [--httpheader=warcinfo httpheader] # [--httpheader=warcinfo httpheader]
args = arg_parser.parse_args() args = arg_parser.parse_args()
proxy = AsyncMitmProxy(server_address=(args.bind, int(args.port)), certfile=args.certfile) proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), certfile=args.certfile)
proxy.register_interceptor(WarcRecordQueuer) proxy.register_interceptor(WarcRecordQueuer)
warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_out_queue) warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_queue, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port))
warc_writer.start() warc_writer.start()
try: try: