diff --git a/warcprox.py b/warcprox.py index df166e2..1d8623b 100755 --- a/warcprox.py +++ b/warcprox.py @@ -69,9 +69,9 @@ class CertificateAuthority(object): logging.info('generated CA key+cert and wrote to {}'.format(self.ca_file)) - def _read_ca(self, file): - self.cert = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, open(file).read()) - self.key = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, open(file).read()) + def _read_ca(self, filename): + self.cert = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, open(filename).read()) + self.key = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, open(filename).read()) logging.info('read CA key+cert from {}'.format(self.ca_file)) def __getitem__(self, cn): @@ -116,7 +116,8 @@ class ProxyingRecorder: def __init__(self, fp, proxy_dest): self.fp = fp - self.buf = tempfile.SpooledTemporaryFile(max_size=1024) + # "The file has no name, and will cease to exist when it is closed." + self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) self.block_sha1 = hashlib.sha1() self.payload_sha1 = None self.proxy_dest = proxy_dest @@ -152,7 +153,7 @@ class ProxyingRecorder: self.block_sha1.update(hunk) - self.buf.write(hunk) + self.tempfile.write(hunk) self.proxy_dest.sendall(hunk) self.len += len(hunk) @@ -185,7 +186,7 @@ class ProxyingRecordingHTTPResponse(httplib.HTTPResponse): self.fp = self.recorder -class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): +class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def __init__(self, request, client_address, server): self.is_connect = False @@ -224,7 +225,8 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): def _transition_to_ssl(self): - self.request = ssl.wrap_socket(self.request, server_side=True, certfile=self.server.ca[self.hostname]) + self.connection = ssl.wrap_socket(self.connection, server_side=True, + certfile=self.server.ca[self.hostname]) def do_CONNECT(self): @@ -236,7 +238,6 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): # If successful, let's do this! self.send_response(200, 'Connection established') self.end_headers() - #self.request.sendall('%s 200 Connection established\r\n\r\n' % self.request_version) self._transition_to_ssl() except Exception as e: self.send_error(500, str(e)) @@ -244,26 +245,41 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): # Reload! self.setup() - try: - logging.debug("host={} port={} path={} calling self.handle_one_request()".format(self.hostname, self.port, self.path)) - self.handle_one_request() - except ssl.SSLError, e: - logging.error("host={} port={} path={} caught SSLError {}".format(self.hostname, self.port, self.path, e)) - pass + self.handle_one_request() + + + def _construct_tunneled_url(self): + if int(self.port) == 443: + netloc = self.hostname + else: + netloc = '{}:{}'.format(self.hostname, self.port) + + result = urlparse.urlunparse( + urlparse.ParseResult( + scheme='https', + netloc=netloc, + params='', + path=self.path, + query='', + fragment='' + ) + ) + + return result def do_COMMAND(self): - # Is this an SSL tunnel? if not self.is_connect: try: # Connect to destination self._connect_to_host() + assert self.url except Exception as e: self.send_error(500, str(e)) return - - warc_record_queuer = WarcRecordQueuer(self.server, self) + else: + self.url = _construct_tunneled_url() # Build request req = '%s %s %s\r\n' % (self.command, self.path, self.request_version) @@ -287,44 +303,41 @@ class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): # to the proxy client. # Proxy and record the response - h = ProxyingRecordingHTTPResponse(self._proxy_sock, proxy_dest=self.request) + h = ProxyingRecordingHTTPResponse(self._proxy_sock, proxy_dest=self.connection) h.begin() - buf = h.read(4096) + buf = h.read(8192) while buf != '': - buf = h.read(4096) + buf = h.read(8192) # Let's close off the remote end h.close() self._proxy_sock.close() - warc_record_queuer.do_request(req) - warc_record_queuer.do_response(h.recorder) + self.server.recordset_q.create_and_queue(self.url, req, h.recorder) + def __getattr__(self, item): if item.startswith('do_'): return self.do_COMMAND - - def log_error(self, format, *args): + def log_error(self, fmt, *args): logging.error("{0} - - [{1}] {2}".format(self.address_string(), - self.log_date_time_string(), format % args)) + self.log_date_time_string(), fmt % args)) - - def log_message(self, format, *args): + def log_message(self, fmt, *args): logging.info("{0} - - [{1}] {2}".format(self.address_string(), - self.log_date_time_string(), format % args)) + self.log_date_time_string(), fmt % args)) -class InvalidInterceptorPluginException(Exception): - pass +class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): - -class MitmProxy(BaseHTTPServer.HTTPServer): - - def __init__(self, server_address, req_handler_class=ProxyHandler, bind_and_activate=True, ca_file='./warcprox-ca.pem', certs_dir='./warcprox-ca'): + def __init__(self, server_address, req_handler_class=WarcProxyHandler, + bind_and_activate=True, ca_file='./warcprox-ca.pem', + certs_dir='./warcprox-ca', recordset_q=None): BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) self.ca = CertificateAuthority(ca_file, certs_dir) + self.recordset_q = recordset_q def server_activate(self): BaseHTTPServer.HTTPServer.server_activate(self) @@ -335,109 +348,74 @@ class MitmProxy(BaseHTTPServer.HTTPServer): BaseHTTPServer.HTTPServer.server_close(self) -class AsyncMitmProxy(SocketServer.ThreadingMixIn, MitmProxy): - pass +# Each item in the queue is a tuple of warc records, which should be written +# consecutively in the same warc. +class WarcRecordsetQueue(Queue.Queue): -# assumes do_request happens before do_response -class WarcRecordQueuer: + def create_and_queue(self, url, request_data, response_recorder): + warc_date = warctools.warc.warc_datetime_str(datetime.now()) + + response_record, response_record_id = self.make_record(url=url, + warc_date=warc_date, recorder=response_recorder, + warc_type=warctools.WarcRecord.RESPONSE, + content_type="application/http;msgtype=response") + + request_record, request_record_id = self.make_record(url=url, + warc_date=warc_date, data=request_data, + warc_type=warctools.WarcRecord.REQUEST, + content_type="application/http;msgtype=request", + concurrent_to=response_record_id) + + record_group = (response_record, request_record) + self.put(record_group) - # Each item in the queue is a tuple of warc records which should be written - # together, e.g. (reponse, request) where request has WARC-Concurrent-To - # pointing to response. - warc_record_group_queue = Queue.Queue() @staticmethod - def make_warc_uuid(text): - return "".format(uuid.UUID(hashlib.sha1(text).hexdigest()[0:32])) + def make_record(url, warc_date=None, recorder=None, data=None, + concurrent_to=None, warc_type=None, content_type=None): + if warc_date is None: + warc_date = warctools.warc.warc_datetime_str(datetime.now()) - def __init__(self, server, msg): - self.server = server - self.msg = msg + record_id = warctools.WarcRecord.random_warc_uuid() - if msg.is_connect: - # have to construct the url if proxy request is a CONNECT - assert not msg.url + headers = [] + headers.append((warctools.WarcRecord.ID, record_id)) + headers.append((warctools.WarcRecord.DATE, warc_date)) + headers.append((warctools.WarcRecord.URL, url)) + # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) + if warc_type is not None: + headers.append((warctools.WarcRecord.TYPE, warc_type)) + if concurrent_to is not None: + headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) + if content_type is not None: + headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) - if int(msg.port) == 443: - netloc = msg.hostname - else: - netloc = '{0}:{1}'.format(msg.hostname, msg.port) + if recorder is not None: + headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, 'sha1:{}'.format(recorder.block_sha1.hexdigest()))) + if recorder.payload_sha1 is not None: + headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, 'sha1:{}'.format(recorder.payload_sha1.hexdigest()))) + + recorder.tempfile.seek(0) + record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) - self.url = urlparse.urlunparse( - urlparse.ParseResult( - scheme='https', - netloc=netloc, - params='', - path=msg.path, - query='', - fragment='' - ) - ) else: - assert msg.url - self.url = msg.url + headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)))) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, 'sha1:{}'.format(hashlib.sha1(data).hexdigest()))) + content_tuple = content_type, data + record = warctools.WarcRecord(headers=headers, content=content_tuple) - def _warc_date(self): - try: - return self._d - except AttributeError: - self._d = warctools.warc.warc_datetime_str(datetime.now()) - return self._d + return record, record_id - def do_request(self, data): - logging.info('{0} >> {1}'.format(self.url, repr(data[:40]))) - - record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date())) - - headers = [] - headers.append((warctools.WarcRecord.ID, record_id)) - headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.REQUEST)) - headers.append((warctools.WarcRecord.URL, self.url)) - headers.append((warctools.WarcRecord.DATE, self._warc_date())) - # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) - - content_tuple = "application/http;msgtype=request", data - self._request_record = warctools.WarcRecord(headers=headers, content=content_tuple) - - - def do_response(self, recorder): - logging.info('{} << {} bytes'.format(self.url, len(recorder))) - - record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(self.url, self._warc_date())) - - headers = [] - headers.append((warctools.WarcRecord.ID, record_id)) - headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.RESPONSE)) - headers.append((warctools.WarcRecord.URL, self.url)) - headers.append((warctools.WarcRecord.DATE, self._warc_date())) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, 'sha1:{}'.format(recorder.block_sha1.hexdigest()))) - headers.append((warctools.WarcRecord.CONTENT_TYPE, "application/http;msgtype=response")) - headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)))) - if recorder.payload_sha1 is not None: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, 'sha1:{}'.format(recorder.payload_sha1.hexdigest()))) - # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) - - recorder.buf.seek(0) - response_record = warctools.WarcRecord(headers=headers, content_file=recorder.buf) - - try: - self._request_record.set_header(warctools.WarcRecord.CONCURRENT_TO, record_id) - record_group = response_record, self._request_record - except AttributeError: - record_group = response_record, # tuple with one item - - WarcRecordQueuer.warc_record_group_queue.put(record_group) - - class WarcWriterThread(threading.Thread): - def __init__(self, warc_record_group_queue, directory, gzip, prefix, size, port): + def __init__(self, recordset_q, directory, gzip, prefix, size, port): threading.Thread.__init__(self, name='WarcWriterThread') - self.warc_record_group_queue = warc_record_group_queue + self.recordset_q = recordset_q self.directory = directory self.gzip = gzip @@ -460,7 +438,6 @@ class WarcWriterThread(threading.Thread): now = datetime.now() return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) - def _close_writer(self): if self._fpath: final_name = self._fpath[:-5] @@ -471,26 +448,9 @@ class WarcWriterThread(threading.Thread): self._fpath = None self._f = None - # WARC/1.0 - # WARC-Type: warcinfo - # WARC-Date: 2013-10-15T22:11:29Z - # WARC-Filename: ARCHIVEIT-3714-WEEKLY-14487-20131015221129606-00000-wbgrp-crawl105.us.archive.org-6442.warc.gz - # WARC-Record-ID: - # Content-Type: application/warc-fields - # Content-Length: 713 - # - # software: Heritrix/3.1.2-SNAPSHOT-20131011-0101 http://crawler.archive.org - # ip: 207.241.226.68 - # hostname: wbgrp-crawl105.us.archive.org - # format: WARC File Format 1.0 - # conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf - # isPartOf: 3714-20131015221121926 - # description: recurrence=WEEKLY, maxDuration=259200, maxDocumentCount=null, isTestCrawl=false, isPatchCrawl=false, oneTimeSubtype=null, seedCount=1, accountId - # robots: obey - # http-header-user-agent: Mozilla/5.0 (compatible; archive.org_bot; Archive-It; +http://archive-it.org/files/site-owners.html) def _make_warcinfo_record(self, filename): warc_record_date = warctools.warc.warc_datetime_str(datetime.now()) - record_id = WarcRecordQueuer.make_warc_uuid("{0} {1}".format(filename, warc_record_date)) + record_id = warctools.WarcRecord.random_warc_uuid() headers = [] headers.append((warctools.WarcRecord.ID, record_id)) @@ -542,11 +502,18 @@ class WarcWriterThread(threading.Thread): while not self.stop.is_set(): try: - warc_record_group = self.warc_record_group_queue.get(block=True, timeout=0.5) - logging.debug('got warc record group to write from the queue: {0}'.format(warc_record_group)) + recordset = self.recordset_q.get(block=True, timeout=0.5) writer = self._writer() - for record in warc_record_group: + + for record in recordset: record.write_to(writer, gzip=self.gzip) + logging.info('wrote warc record {}'.format(record)) + + if record.content_file: + # XXX now we know we're done with this... messy to + # handle this here, but where else can it happen? + record.content_file.close() + self._f.flush() except Queue.Empty: pass @@ -556,6 +523,7 @@ class WarcWriterThread(threading.Thread): if __name__ == '__main__': + arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on') @@ -581,12 +549,16 @@ if __name__ == '__main__': else: loglevel = logging.INFO - logging.basicConfig(stream=sys.stdout, level=loglevel, format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') + logging.basicConfig(stream=sys.stdout, level=loglevel, + format='%(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s(%(filename)s:%(lineno)d) %(message)s') - proxy = AsyncMitmProxy(server_address=(args.address, int(args.port)), - ca_file=args.cacert, certs_dir=args.certs_dir) + recordset_q = WarcRecordsetQueue() - warc_writer = WarcWriterThread(WarcRecordQueuer.warc_record_group_queue, + proxy = WarcProxy(server_address=(args.address, int(args.port)), + ca_file=args.cacert, certs_dir=args.certs_dir, + recordset_q=recordset_q) + + warc_writer = WarcWriterThread(recordset_q=recordset_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, size=int(args.size), port=int(args.port))