warcprox/warcprox.py

482 lines
17 KiB
Python
Raw Normal View History

2013-10-15 10:54:18 -07:00
#!/usr/bin/python
# vim:set sw=4 et:
#
2013-10-15 15:52:26 -07:00
import BaseHTTPServer, SocketServer
import httplib
import socket
import urlparse
2013-10-15 17:51:09 -07:00
import OpenSSL
2013-10-15 15:52:26 -07:00
import ssl
2013-10-15 10:54:18 -07:00
import logging
import sys
2013-10-15 15:52:26 -07:00
from hanzo import warctools
2013-10-15 10:54:18 -07:00
import uuid
import hashlib
from datetime import datetime
import Queue
import threading
import os, os.path
2013-10-15 14:11:31 -07:00
import argparse
2012-07-19 11:08:14 -04:00
class UnsupportedSchemeException(Exception):
pass
2013-10-15 15:52:26 -07:00
class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
2012-07-19 11:08:14 -04:00
def __init__(self, request, client_address, server):
self.is_connect = False
2013-10-15 15:52:26 -07:00
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
2012-07-19 11:08:14 -04:00
def _connect_to_host(self):
# Get hostname and port to connect to
if self.is_connect:
self.hostname, self.port = self.path.split(':')
2012-07-19 11:08:14 -04:00
else:
2013-10-15 10:54:18 -07:00
self.url = self.path
2013-10-15 15:52:26 -07:00
u = urlparse.urlparse(self.url)
2012-07-19 11:08:14 -04:00
if u.scheme != 'http':
raise UnsupportedSchemeException('Unknown scheme %s' % repr(u.scheme))
self.hostname = u.hostname
self.port = u.port or 80
2013-10-15 15:52:26 -07:00
self.path = urlparse.urlunparse(
urlparse.ParseResult(
scheme='',
netloc='',
params=u.params,
path=u.path or '/',
query=u.query,
fragment=u.fragment
)
)
2012-07-19 11:08:14 -04:00
# Connect to destination
2013-10-15 15:52:26 -07:00
self._proxy_sock = socket.socket()
2012-07-19 11:08:14 -04:00
self._proxy_sock.settimeout(10)
self._proxy_sock.connect((self.hostname, int(self.port)))
2012-07-19 11:08:14 -04:00
# Wrap socket if SSL is required
if self.is_connect:
2013-10-15 14:11:31 -07:00
self._proxy_sock = ssl.wrap_socket(self._proxy_sock)
2012-07-19 11:08:14 -04:00
def _transition_to_ssl(self):
2013-10-15 14:11:31 -07:00
self.request = ssl.wrap_socket(self.request, server_side=True, certfile=self.server.certfile)
2012-07-19 11:08:14 -04:00
def do_CONNECT(self):
self.is_connect = True
try:
# Connect to destination first
self._connect_to_host()
# If successful, let's do this!
self.send_response(200, 'Connection established')
self.end_headers()
#self.request.sendall('%s 200 Connection established\r\n\r\n' % self.request_version)
self._transition_to_ssl()
2013-10-15 10:54:18 -07:00
except Exception as e:
2012-07-19 11:08:14 -04:00
self.send_error(500, str(e))
return
# Reload!
self.setup()
self.handle_one_request()
2013-10-15 10:54:18 -07:00
# try:
# except ssl.SSLError, e:
# logging.warn("caught SSLError {0}".format(e))
# pass
2012-07-19 11:08:14 -04:00
def do_COMMAND(self):
# Is this an SSL tunnel?
if not self.is_connect:
try:
# Connect to destination
self._connect_to_host()
2013-10-15 10:54:18 -07:00
except Exception as e:
2012-07-19 11:08:14 -04:00
self.send_error(500, str(e))
return
# Extract path
# Build request
req = '%s %s %s\r\n' % (self.command, self.path, self.request_version)
2013-10-15 10:54:18 -07:00
2012-07-19 11:08:14 -04:00
# Add headers to the request
req += '%s\r\n' % self.headers
# Append message body if present to the request
if 'Content-Length' in self.headers:
req += self.rfile.read(int(self.headers['Content-Length']))
2013-10-15 18:37:26 -07:00
interceptors = [p(self.server, self) for p in self.server._interceptors]
2012-07-19 11:08:14 -04:00
# Send it down the pipe!
2013-10-15 18:37:26 -07:00
self._proxy_sock.sendall(self.mitm_request(req, interceptors))
2012-07-19 11:08:14 -04:00
# Parse response
2013-10-15 15:52:26 -07:00
h = httplib.HTTPResponse(self._proxy_sock)
2012-07-19 11:08:14 -04:00
h.begin()
# Get rid of the pesky header
del h.msg['Transfer-Encoding']
# Time to relay the message across
res = '%s %s %s\r\n' % (self.request_version, h.status, h.reason)
res += '%s\r\n' % h.msg
res += h.read()
# Let's close off the remote end
h.close()
self._proxy_sock.close()
# Relay the message
2013-10-15 18:37:26 -07:00
self.request.sendall(self.mitm_response(res, interceptors))
2012-07-19 11:08:14 -04:00
2013-10-15 14:11:31 -07:00
2013-10-15 18:37:26 -07:00
def mitm_request(self, data, interceptors):
for i in interceptors:
data = i.do_request(data)
2012-07-19 11:08:14 -04:00
return data
2013-10-15 14:11:31 -07:00
2013-10-15 18:37:26 -07:00
def mitm_response(self, data, interceptors):
for i in interceptors:
data = i.do_response(data)
2012-07-19 11:08:14 -04:00
return data
2013-10-15 14:11:31 -07:00
2012-07-19 11:08:14 -04:00
def __getattr__(self, item):
if item.startswith('do_'):
return self.do_COMMAND
2013-10-15 14:11:31 -07:00
def log_error(self, format, *args):
logging.error("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), format % args))
def log_message(self, format, *args):
logging.info("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), format % args))
2013-10-15 18:37:26 -07:00
# InterceptorPlugin modified from pymiproxy to send the request and response
# from a single transaction through the same instance of the interceptor
2012-07-19 11:08:14 -04:00
class InterceptorPlugin(object):
def __init__(self, server, msg):
self.server = server
self.message = msg
def do_request(self, data):
return data
def do_response(self, data):
return data
class InvalidInterceptorPluginException(Exception):
pass
2013-10-15 15:52:26 -07:00
class MitmProxy(BaseHTTPServer.HTTPServer):
2012-07-19 11:08:14 -04:00
2013-10-15 14:11:31 -07:00
def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, certfile='warcprox.pem'):
2013-10-15 15:52:26 -07:00
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
2013-10-15 18:37:26 -07:00
self._interceptors = []
self.certfile = certfile
if not os.path.exists(certfile):
self._generate_cert(certfile)
def _generate_cert(self, certfile):
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
cert = OpenSSL.crypto.X509()
cert.set_version(3)
cert.set_serial_number(1)
cert.get_subject().CN = 'warcprox man-in-the-middle archiving http/s proxy'
cert.gmtime_adj_notBefore(0) # now
cert.gmtime_adj_notAfter(100*365*24*60*60) # 100 yrs in future
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, "sha1")
with open(certfile, 'wb+') as f:
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
2012-07-19 11:08:14 -04:00
def register_interceptor(self, interceptor_class):
if not issubclass(interceptor_class, InterceptorPlugin):
raise InvalidInterceptorPluginException('Expected type InterceptorPlugin got %s instead' % type(interceptor_class))
2013-10-15 18:37:26 -07:00
self._interceptors.append(interceptor_class)
2012-07-19 11:08:14 -04:00
2013-10-15 10:54:18 -07:00
def server_activate(self):
2013-10-15 15:52:26 -07:00
BaseHTTPServer.HTTPServer.server_activate(self)
2013-10-15 10:54:18 -07:00
logging.info('listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self):
2013-10-15 15:52:26 -07:00
BaseHTTPServer.HTTPServer.server_close(self)
2013-10-15 10:54:18 -07:00
logging.info('shut down')
2013-10-15 15:52:26 -07:00
class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy):
2012-07-19 11:08:14 -04:00
pass
2013-10-15 18:37:26 -07:00
# assumes do_request happens before do_response
class WarcRecordQueuer(InterceptorPlugin):
# Each item in the queue is a tuple of warc records which should be written
# together, e.g. (reponse, request) where request has WARC-Concurrent-To
# pointing to response.
warc_record_group_queue = Queue.Queue()
@staticmethod
def make_warc_uuid(text):
return "<urn:uuid:{0}>".format(uuid.UUID(hashlib.sha1(text).hexdigest()[0:32]))
2012-07-19 11:08:14 -04:00
2013-10-15 10:54:18 -07:00
def __init__(self, server, msg):
InterceptorPlugin.__init__(self, server, msg)
if msg.is_connect:
2013-10-15 18:37:26 -07:00
# have to construct the url if proxy request is a CONNECT
2013-10-15 10:54:18 -07:00
assert not msg.url
if int(msg.port) == 443:
netloc = msg.hostname
else:
netloc = '{0}:{1}'.format(msg.hostname, msg.port)
2013-10-15 15:52:26 -07:00
self.url = urlparse.urlunparse(
urlparse.ParseResult(
2013-10-15 10:54:18 -07:00
scheme='https',
netloc=netloc,
params='',
path=msg.path,
query='',
fragment=''
)
)
else:
assert msg.url
self.url = msg.url
2013-10-15 18:37:26 -07:00
def _warc_date(self):
try:
return self._d
except AttributeError:
self._d = warctools.warc.warc_datetime_str(datetime.now())
return self._d
2013-10-15 10:54:18 -07:00
def do_request(self, data):
logging.info('{0} >> {1}'.format(self.url, repr(data[:100])))
2012-07-19 11:08:14 -04:00
2013-10-15 18:37:26 -07:00
record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date()))
2013-10-15 10:54:18 -07:00
2013-10-15 18:37:26 -07:00
headers = []
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.REQUEST))
headers.append((warctools.WarcRecord.URL, self.url))
headers.append((warctools.WarcRecord.DATE, self._warc_date()))
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
content_tuple = "application/http;msgtype=request", data
self._request_record = warctools.WarcRecord(headers=headers, content=content_tuple)
return data
2013-10-15 10:54:18 -07:00
def do_response(self, data):
logging.info('{0} << {1}'.format(self.url, repr(data[:100])))
2013-10-15 18:37:26 -07:00
record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date()))
2013-10-15 10:54:18 -07:00
headers = []
2013-10-15 18:37:26 -07:00
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.RESPONSE))
2013-10-15 15:52:26 -07:00
headers.append((warctools.WarcRecord.URL, self.url))
2013-10-15 18:37:26 -07:00
headers.append((warctools.WarcRecord.DATE, self._warc_date()))
2013-10-15 15:52:26 -07:00
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
2013-10-15 10:54:18 -07:00
2013-10-15 18:37:26 -07:00
content_tuple = ("application/http;msgtype=response", data)
response_record = warctools.WarcRecord(headers=headers, content=content_tuple)
try:
self._request_record.set_header(warctools.WarcRecord.CONCURRENT_TO, record_id)
record_group = response_record, self._request_record
except AttributeError:
record_group = response_record, # tuple with one item
2013-10-15 10:54:18 -07:00
2013-10-15 18:37:26 -07:00
WarcRecordQueuer.warc_record_group_queue.put(record_group)
2013-10-15 10:54:18 -07:00
2012-07-19 11:08:14 -04:00
return data
2013-10-15 10:54:18 -07:00
class WarcWriterThread(threading.Thread):
2013-10-15 18:37:26 -07:00
def __init__(self, warc_record_group_queue, directory, gzip, prefix, size, port):
2013-10-15 10:54:18 -07:00
threading.Thread.__init__(self, name='WarcWriterThread')
2013-10-15 15:52:26 -07:00
2013-10-15 18:37:26 -07:00
self.warc_record_group_queue = warc_record_group_queue
2013-10-15 15:52:26 -07:00
self.directory = directory
self.gzip = gzip
self.prefix = prefix
self.size = size
self.port = port
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
logging.info("warc destination directory {0} doesn't exist, creating it".format(directory))
os.mkdir(directory)
2013-10-15 10:54:18 -07:00
self.stop = threading.Event()
2012-07-19 11:08:14 -04:00
2013-10-15 15:52:26 -07:00
def timestamp17(self):
now = datetime.now()
return '{0}{1}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
def _close_writer(self):
2013-10-15 17:51:09 -07:00
if self._fpath:
final_name = self._fpath[:-5]
logging.info('closing {0}'.format(final_name))
self._f.close()
os.rename(self._fpath, final_name)
self._fpath = None
self._f = None
# WARC/1.0
# WARC-Type: warcinfo
# WARC-Date: 2013-10-15T22:11:29Z
# WARC-Filename: ARCHIVEIT-3714-WEEKLY-14487-20131015221129606-00000-wbgrp-crawl105.us.archive.org-6442.warc.gz
# WARC-Record-ID: <urn:uuid:8c5d5d7d-11df-4a83-9999-8d6c8244316b>
# Content-Type: application/warc-fields
# Content-Length: 713
#
# software: Heritrix/3.1.2-SNAPSHOT-20131011-0101 http://crawler.archive.org
# ip: 207.241.226.68
# hostname: wbgrp-crawl105.us.archive.org
# format: WARC File Format 1.0
# conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf
# isPartOf: 3714-20131015221121926
# description: recurrence=WEEKLY, maxDuration=259200, maxDocumentCount=null, isTestCrawl=false, isPatchCrawl=false, oneTimeSubtype=null, seedCount=1, accountId
# robots: obey
# http-header-user-agent: Mozilla/5.0 (compatible; archive.org_bot; Archive-It; +http://archive-it.org/files/site-owners.html)
def _make_warcinfo_record(self, filename):
warc_record_date = warctools.warc.warc_datetime_str(datetime.now())
2013-10-15 18:37:26 -07:00
record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(filename, warc_record_date))
2013-10-15 15:52:26 -07:00
2013-10-15 17:51:09 -07:00
headers = []
2013-10-15 18:37:26 -07:00
headers.append((warctools.WarcRecord.ID, record_id))
2013-10-15 17:51:09 -07:00
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO))
headers.append((warctools.WarcRecord.FILENAME, filename))
headers.append((warctools.WarcRecord.DATE, warc_record_date))
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
warcinfo_fields = []
warcinfo_fields.append('software: warcprox.py https://github.com/nlevitt/warcprox')
hostname = socket.gethostname()
warcinfo_fields.append('hostname: {0}'.format(hostname))
warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)))
warcinfo_fields.append('format: WARC File Format 1.0')
warcinfo_fields.append('robots: ignore') # XXX implement robots support
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = '\r\n'.join(warcinfo_fields) + '\r\n'
2013-10-15 18:37:26 -07:00
record = warctools.WarcRecord(headers=headers, content=('application/warc-fields', data))
2013-10-15 17:51:09 -07:00
2013-10-15 18:37:26 -07:00
return record
2013-10-15 15:52:26 -07:00
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.size:
self._close_writer()
if self._f == None:
2013-10-15 17:51:09 -07:00
filename = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial, os.getpid(),
socket.gethostname(), self.port, '.gz' if self.gzip else '')
self._fpath = '{0}/{1}.open'.format(self.directory, filename)
2013-10-15 15:52:26 -07:00
self._f = open(self._fpath, 'wb')
2013-10-15 17:51:09 -07:00
warcinfo_record = self._make_warcinfo_record(filename)
warcinfo_record.write_to(self._f, gzip=self.gzip)
2013-10-15 15:52:26 -07:00
self._serial += 1
return self._f
2013-10-15 10:54:18 -07:00
def run(self):
2013-10-15 18:37:26 -07:00
logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format(
os.path.abspath(self.directory), self.gzip, self.prefix, self.size, self.port))
2013-10-15 10:54:18 -07:00
while not self.stop.is_set():
try:
2013-10-15 18:37:26 -07:00
warc_record_group = self.warc_record_group_queue.get(block=True, timeout=0.5)
logging.info('got warc record group to write from the queue: {0}'.format(warc_record_group))
for record in warc_record_group:
record.write_to(self._writer(), gzip=self.gzip)
2013-10-15 15:52:26 -07:00
self._f.flush()
2013-10-15 10:54:18 -07:00
except Queue.Empty:
pass
logging.info('WarcWriterThread shutting down')
2013-10-15 15:52:26 -07:00
self._close_writer();
2012-07-19 11:08:14 -04:00
if __name__ == '__main__':
2013-10-15 13:00:08 -07:00
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s')
2013-10-15 10:54:18 -07:00
2013-10-15 15:52:26 -07:00
arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
2013-10-15 14:11:31 -07:00
arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on')
2013-10-15 15:52:26 -07:00
arg_parser.add_argument('-c', '--certfile', dest='certfile', default='warcprox.pem', help='SSL certificate file; if file does not exist, it will be created')
arg_parser.add_argument('-d', '--dir', dest='directory', default='warcs', help='where to write warcs')
2013-10-15 14:11:31 -07:00
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix')
2013-10-15 15:52:26 -07:00
arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes')
2013-10-15 14:11:31 -07:00
# [--ispartof=warcinfo ispartof]
# [--description=warcinfo description]
# [--operator=warcinfo operator]
# [--httpheader=warcinfo httpheader]
args = arg_parser.parse_args()
2013-10-15 15:52:26 -07:00
proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), certfile=args.certfile)
2013-10-15 10:54:18 -07:00
proxy.register_interceptor(WarcRecordQueuer)
2013-10-15 18:37:26 -07:00
warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_group_queue, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port))
2013-10-15 10:54:18 -07:00
warc_writer.start()
2012-07-19 11:08:14 -04:00
try:
proxy.serve_forever()
except KeyboardInterrupt:
2013-10-15 10:54:18 -07:00
pass
finally:
warc_writer.stop.set()
2012-07-19 11:08:14 -04:00
proxy.server_close()