diff --git a/README.rst b/README.rst index 8104632..ff6a6b0 100644 --- a/README.rst +++ b/README.rst @@ -13,7 +13,7 @@ pymiproxy, warcprox is also GPL. Install ~~~~~~~ -Warcprox runs on python 2.7 or python 3.2+. +Warcprox runs on python 3.4. To install latest release run: diff --git a/setup.py b/setup.py index e64d368..355d78b 100755 --- a/setup.py +++ b/setup.py @@ -58,10 +58,6 @@ setuptools.setup(name='warcprox', 'Development Status :: 5 - Production/Stable', 'Environment :: Console', 'License :: OSI Approved :: GNU General Public License (GPL)', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', 'Topic :: Internet :: Proxy Servers', 'Topic :: Internet :: WWW/HTTP', diff --git a/tox.ini b/tox.ini index b01a85f..4fce66b 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27, py32, py33, py34 +envlist = py34 [testenv] commands = py.test diff --git a/warcprox/__init__.py b/warcprox/__init__.py index e69de29..e061a70 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -0,0 +1,8 @@ +def _read_version_bytes(): + import os + version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt']) + with open(version_txt, 'rb') as fin: + return fin.read().strip() + +version_bytes = _read_version_bytes().strip() +version_str = version_bytes.decode('utf-8') diff --git a/warcprox/certauth.py b/warcprox/certauth.py new file mode 100644 index 0000000..f0c6951 --- /dev/null +++ b/warcprox/certauth.py @@ -0,0 +1,91 @@ +# vim:set sw=4 et: + +import logging +import os +import OpenSSL +import socket +import random + +class CertificateAuthority(object): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca'): + self.ca_file = ca_file + self.certs_dir = certs_dir + + if not os.path.exists(ca_file): + self._generate_ca() + else: + self._read_ca(ca_file) + + if not os.path.exists(certs_dir): + self.logger.info("directory for generated certs {} doesn't exist, creating it".format(certs_dir)) + os.mkdir(certs_dir) + + + def _generate_ca(self): + # Generate key + self.key = OpenSSL.crypto.PKey() + self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + + # Generate certificate + self.cert = OpenSSL.crypto.X509() + self.cert.set_version(2) + # avoid sec_error_reused_issuer_and_serial + self.cert.set_serial_number(random.randint(0,2**64-1)) + self.cert.get_subject().CN = 'Warcprox CA on {}'.format(socket.gethostname())[:64] + self.cert.gmtime_adj_notBefore(0) # now + self.cert.gmtime_adj_notAfter(100*365*24*60*60) # 100 yrs in future + self.cert.set_issuer(self.cert.get_subject()) + self.cert.set_pubkey(self.key) + self.cert.add_extensions([ + OpenSSL.crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"), + OpenSSL.crypto.X509Extension(b"keyUsage", True, b"keyCertSign, cRLSign"), + OpenSSL.crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=self.cert), + ]) + self.cert.sign(self.key, "sha1") + + with open(self.ca_file, 'wb+') as f: + f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, self.key)) + f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, self.cert)) + + self.logger.info('generated CA key+cert and wrote to {}'.format(self.ca_file)) + + + def _read_ca(self, filename): + self.cert = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, open(filename).read()) + self.key = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, open(filename).read()) + self.logger.info('read CA key+cert from {}'.format(self.ca_file)) + + def __getitem__(self, cn): + cnp = os.path.sep.join([self.certs_dir, '%s.pem' % cn]) + if not os.path.exists(cnp): + # create certificate + key = OpenSSL.crypto.PKey() + key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + + # Generate CSR + req = OpenSSL.crypto.X509Req() + req.get_subject().CN = cn + req.set_pubkey(key) + req.sign(key, 'sha1') + + # Sign CSR + cert = OpenSSL.crypto.X509() + cert.set_subject(req.get_subject()) + cert.set_serial_number(random.randint(0,2**64-1)) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(10*365*24*60*60) + cert.set_issuer(self.cert.get_subject()) + cert.set_pubkey(req.get_pubkey()) + cert.sign(self.key, 'sha1') + + with open(cnp, 'wb+') as f: + f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key)) + f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert)) + + self.logger.info('wrote generated key+cert to {}'.format(cnp)) + + return cnp + + diff --git a/warcprox/dedup.py b/warcprox/dedup.py new file mode 100644 index 0000000..dea2fbb --- /dev/null +++ b/warcprox/dedup.py @@ -0,0 +1,56 @@ +# vim:set sw=4 et: + +try: + import dbm.gnu as dbm_gnu +except ImportError: + try: + import gdbm as dbm_gnu + except ImportError: + import anydbm as dbm_gnu + +import logging +import os +import json +from hanzo import warctools + +class DedupDb(object): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, dbm_file='./warcprox-dedup.db'): + if os.path.exists(dbm_file): + self.logger.info('opening existing deduplication database {}'.format(dbm_file)) + else: + self.logger.info('creating new deduplication database {}'.format(dbm_file)) + + self.db = dbm_gnu.open(dbm_file, 'c') + + def close(self): + self.db.close() + + def sync(self): + if hasattr(self.db, 'sync'): + self.db.sync() + + def save(self, key, response_record, offset): + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') + date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') + + py_value = {'i':record_id, 'u':url, 'd':date} + json_value = json.dumps(py_value, separators=(',',':')) + + self.db[key] = json_value.encode('utf-8') + self.logger.debug('dedup db saved {}:{}'.format(key, json_value)) + + def lookup(self, key): + if key in self.db: + json_result = self.db[key] + result = json.loads(json_result.decode('utf-8')) + result['i'] = result['i'].encode('latin1') + result['u'] = result['u'].encode('latin1') + result['d'] = result['d'].encode('latin1') + return result + else: + return None + + diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py new file mode 100644 index 0000000..63cd7a4 --- /dev/null +++ b/warcprox/mitmproxy.py @@ -0,0 +1,145 @@ +# vim:set sw=4 et: + +try: + import http.server as http_server +except ImportError: + import BaseHTTPServer as http_server + +try: + import urllib.parse as urllib_parse +except ImportError: + import urlparse as urllib_parse + +import socket +import logging +import ssl + +class MitmProxyHandler(http_server.BaseHTTPRequestHandler): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, request, client_address, server): + self.is_connect = False + + ## XXX hack around bizarre bug on my mac python 3.2 in http.server + ## where hasattr returns true in the code snippet below, but + ## self._headers_buffer is None + # + # if not hasattr(self, '_headers_buffer'): + # self._headers_buffer = [] + # self._headers_buffer.append( + self._headers_buffer = [] + + http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server) + + def _determine_host_port(self): + # Get hostname and port to connect to + if self.is_connect: + self.hostname, self.port = self.path.split(':') + else: + self.url = self.path + u = urllib_parse.urlparse(self.url) + if u.scheme != 'http': + raise Exception('Unknown scheme %s' % repr(u.scheme)) + self.hostname = u.hostname + self.port = u.port or 80 + self.path = urllib_parse.urlunparse( + urllib_parse.ParseResult( + scheme='', + netloc='', + params=u.params, + path=u.path or '/', + query=u.query, + fragment=u.fragment + ) + ) + + + def _connect_to_host(self): + # Connect to destination + self._proxy_sock = socket.socket() + self._proxy_sock.settimeout(10) + self._proxy_sock.connect((self.hostname, int(self.port))) + + # Wrap socket if SSL is required + if self.is_connect: + self._proxy_sock = ssl.wrap_socket(self._proxy_sock) + + + def _transition_to_ssl(self): + self.request = self.connection = ssl.wrap_socket(self.connection, + server_side=True, certfile=self.server.ca[self.hostname]) + + + def do_CONNECT(self): + self.is_connect = True + try: + # Connect to destination first + self._determine_host_port() + self._connect_to_host() + + # If successful, let's do this! + self.send_response(200, 'Connection established') + self.end_headers() + self._transition_to_ssl() + except Exception as e: + self.send_error(500, str(e)) + return + + # Reload! + self.setup() + self.handle_one_request() + + + def _construct_tunneled_url(self): + if int(self.port) == 443: + netloc = self.hostname + else: + netloc = '{}:{}'.format(self.hostname, self.port) + + result = urllib_parse.urlunparse( + urllib_parse.ParseResult( + scheme='https', + netloc=netloc, + params='', + path=self.path, + query='', + fragment='' + ) + ) + + return result + + + def do_COMMAND(self): + if not self.is_connect: + try: + # Connect to destination + self._determine_host_port() + self._connect_to_host() + assert self.url + except Exception as e: + self.send_error(500, str(e)) + return + else: + # if self.is_connect we already connected in do_CONNECT + self.url = self._construct_tunneled_url() + + self._proxy_request() + + + def _proxy_request(self): + raise Exception('_proxy_request() not implemented in MitmProxyHandler, must be implemented in subclass!') + + def __getattr__(self, item): + if item.startswith('do_'): + return self.do_COMMAND + + def log_error(self, fmt, *args): + self.logger.error("{0} - - [{1}] {2}".format(self.address_string(), + self.log_date_time_string(), fmt % args)) + + def log_message(self, fmt, *args): + self.logger.info("{} {} - - [{}] {}".format(self.__class__.__name__, + self.address_string(), self.log_date_time_string(), fmt % args)) + + diff --git a/warcprox/playback.py b/warcprox/playback.py new file mode 100644 index 0000000..fc7e479 --- /dev/null +++ b/warcprox/playback.py @@ -0,0 +1,280 @@ +# vim:set sw=4 et: + +try: + import http.server as http_server +except ImportError: + import BaseHTTPServer as http_server + +try: + import socketserver +except ImportError: + import SocketServer as socketserver + +try: + import dbm.gnu as dbm_gnu +except ImportError: + try: + import gdbm as dbm_gnu + except ImportError: + import anydbm as dbm_gnu + +import logging +import os +from hanzo import warctools +import json +import traceback +import re +from warcprox.mitmproxy import MitmProxyHandler + +class PlaybackProxyHandler(MitmProxyHandler): + logger = logging.getLogger(__module__ + "." + __qualname__) + + # @Override + def _connect_to_host(self): + # don't connect to host! + pass + + + # @Override + def _proxy_request(self): + date, location = self.server.playback_index_db.lookup_latest(self.url.encode('utf-8')) + self.logger.debug('lookup_latest returned {}:{}'.format(date, location)) + + status = None + if location is not None: + try: + status, sz = self._send_response_from_warc(location['f'], location['o']) + except: + status = 500 + self.logger.error('PlaybackProxyHandler problem playing back {}'.format(self.url), exc_info=1) + payload = '500 Warcprox Error\n\n{}\n'.format(traceback.format_exc()).encode('utf-8') + headers = (b'HTTP/1.1 500 Internal Server Error\r\n' + + b'Content-Type: text/plain;charset=utf-8\r\n' + + b'Content-Length: ' + str(len(payload)).encode('utf-8') + b'\r\n' + + b'\r\n') + self.connection.sendall(headers) + self.connection.sendall(payload) + sz = len(headers) + len(payload) + else: + status = 404 + payload = b'404 Not in Archive\n' + headers = (b'HTTP/1.1 404 Not Found\r\n' + + b'Content-Type: text/plain;charset=utf-8\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' + + b'\r\n') + self.connection.sendall(headers) + self.connection.sendall(payload) + sz = len(headers) + len(payload) + + self.log_message('"%s" %s %s %s', + self.requestline, str(status), str(sz), repr(location) if location else '-') + + + def _open_warc_at_offset(self, warcfilename, offset): + self.logger.debug('opening {} at offset {}'.format(warcfilename, offset)) + + warcpath = None + for p in (os.path.sep.join([self.server.warcs_dir, warcfilename]), + os.path.sep.join([self.server.warcs_dir, '{}.open'.format(warcfilename)])): + if os.path.exists(p): + warcpath = p + + if warcpath is None: + raise Exception('{} not found'.format(warcfilename)) + + return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset) + + + def _send_response(self, headers, payload_fh): + status = '-' + m = re.match(br'^HTTP/\d\.\d (\d{3})', headers) + if m is not None: + status = m.group(1) + + self.connection.sendall(headers) + sz = len(headers) + + while True: + buf = payload_fh.read(8192) + if buf == b'': break + self.connection.sendall(buf) + sz += len(buf) + + return status, sz + + + def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date): + location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to) + self.logger.debug('loading http payload from {}'.format(location)) + + fh = self._open_warc_at_offset(location['f'], location['o']) + try: + for (offset, record, errors) in fh.read_records(limit=1, offsets=True): + pass + + if errors: + raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) + + warc_type = record.get_header(warctools.WarcRecord.TYPE) + if warc_type != warctools.WarcRecord.RESPONSE: + raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type)) + + # find end of headers + while True: + line = record.content_file.readline() + if line == b'' or re.match(br'^\r?\n$', line): + break + + return self._send_response(headers, record.content_file) + + finally: + fh.close() + + + def _send_response_from_warc(self, warcfilename, offset): + fh = self._open_warc_at_offset(warcfilename, offset) + try: + for (offset, record, errors) in fh.read_records(limit=1, offsets=True): + pass + + if errors: + raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) + + warc_type = record.get_header(warctools.WarcRecord.TYPE) + + if warc_type == warctools.WarcRecord.RESPONSE: + headers_buf = bytearray() + while True: + line = record.content_file.readline() + headers_buf.extend(line) + if line == b'' or re.match(b'^\r?\n$', line): + break + + return self._send_response(headers_buf, record.content_file) + + elif warc_type == warctools.WarcRecord.REVISIT: + # response consists of http headers from revisit record and + # payload from the referenced record + warc_profile = record.get_header(warctools.WarcRecord.PROFILE) + if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: + raise Exception('unknown revisit record profile {}'.format(warc_profile)) + + refers_to = record.get_header(warctools.WarcRecord.REFERS_TO) + refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI) + refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE) + + self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri)) + return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date) + + else: + raise Exception('unknown warc record type {}'.format(warc_type)) + + finally: + fh.close() + + raise Exception('should not reach this point') + + +class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, server_address, req_handler_class=PlaybackProxyHandler, + bind_and_activate=True, ca=None, playback_index_db=None, + warcs_dir=None): + http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) + self.ca = ca + self.playback_index_db = playback_index_db + self.warcs_dir = warcs_dir + + def server_activate(self): + http_server.HTTPServer.server_activate(self) + self.logger.info('PlaybackProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) + + def server_close(self): + self.logger.info('PlaybackProxy shutting down') + http_server.HTTPServer.server_close(self) + + +class PlaybackIndexDb(object): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, dbm_file='./warcprox-playback-index.db'): + if os.path.exists(dbm_file): + self.logger.info('opening existing playback index database {}'.format(dbm_file)) + else: + self.logger.info('creating new playback index database {}'.format(dbm_file)) + + self.db = dbm_gnu.open(dbm_file, 'c') + + def close(self): + self.db.close() + + def sync(self): + if hasattr(self.db, 'sync'): + self.db.sync() + + def save(self, warcfile, recordset, offset): + response_record = recordset[0] + # XXX canonicalize url? + url = response_record.get_header(warctools.WarcRecord.URL) + date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') + record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + + # there could be two visits of same url in the same second, and WARC-Date is + # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ + + # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} + if url in self.db: + existing_json_value = self.db[url].decode('utf-8') + py_value = json.loads(existing_json_value) + else: + py_value = {} + + if date_str in py_value: + py_value[date_str].append({'f':warcfile, 'o':offset, 'i':record_id_str}) + else: + py_value[date_str] = [{'f':warcfile, 'o':offset, 'i':record_id_str}] + + json_value = json.dumps(py_value, separators=(',',':')) + + self.db[url] = json_value.encode('utf-8') + + self.logger.debug('playback index saved: {}:{}'.format(url, json_value)) + + + def lookup_latest(self, url): + if url not in self.db: + return None, None + + json_value = self.db[url].decode('utf-8') + self.logger.debug("{}:{}".format(repr(url), repr(json_value))) + py_value = json.loads(json_value) + + latest_date = max(py_value) + result = py_value[latest_date][0] + result['i'] = result['i'].encode('ascii') + return latest_date, result + + + # in python3 params are bytes + def lookup_exact(self, url, warc_date, record_id): + if url not in self.db: + return None + + json_value = self.db[url].decode('utf-8') + self.logger.debug("{}:{}".format(repr(url), repr(json_value))) + py_value = json.loads(json_value) + + warc_date_str = warc_date.decode('ascii') + + if warc_date_str in py_value: + for record in py_value[warc_date_str]: + if record['i'].encode('ascii') == record_id: + self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) + record['i'] = record['i'].encode('ascii') + return record + else: + self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) + return None + + diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index 8ecb137..e2364d0 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -1,7 +1,8 @@ #!/usr/bin/env python # vim: set sw=4 et: -from warcprox import warcprox +import warcprox.warcprox +import warcprox.certauth import unittest import threading import time @@ -117,11 +118,11 @@ class WarcproxTest(unittest.TestCase): f.close() # delete it, or CertificateAuthority will try to read it self._ca_file = f.name self._ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca') - ca = warcprox.CertificateAuthority(self._ca_file, self._ca_dir) + ca = warcprox.certauth.CertificateAuthority(self._ca_file, self._ca_dir) recorded_url_q = queue.Queue() - proxy = warcprox.WarcProxy(server_address=('localhost', 0), ca=ca, + proxy = warcprox.warcprox.WarcProxy(server_address=('localhost', 0), ca=ca, recorded_url_q=recorded_url_q) self._warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-warcs-') @@ -129,20 +130,20 @@ class WarcproxTest(unittest.TestCase): f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False) f.close() self._playback_index_db_file = f.name - playback_index_db = warcprox.PlaybackIndexDb(self._playback_index_db_file) - playback_proxy = warcprox.PlaybackProxy(server_address=('localhost', 0), ca=ca, + playback_index_db = warcprox.playback.PlaybackIndexDb(self._playback_index_db_file) + playback_proxy = warcprox.playback.PlaybackProxy(server_address=('localhost', 0), ca=ca, playback_index_db=playback_index_db, warcs_dir=self._warcs_dir) f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False) f.close() self._dedup_db_file = f.name - dedup_db = warcprox.DedupDb(self._dedup_db_file) + dedup_db = warcprox.dedup.DedupDb(self._dedup_db_file) - warc_writer = warcprox.WarcWriterThread(recorded_url_q=recorded_url_q, + warc_writer = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, directory=self._warcs_dir, port=proxy.server_port, dedup_db=dedup_db, playback_index_db=playback_index_db) - self.warcprox = warcprox.WarcproxController(proxy, warc_writer, playback_proxy) + self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer, playback_proxy) self.logger.info('starting warcprox') self.warcprox_thread = threading.Thread(name='WarcproxThread', target=self.warcprox.run_until_shutdown) diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 25bf1eb..35dee56 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -7,164 +7,48 @@ WARC writing MITM HTTP/S proxy See README.rst or https://github.com/internetarchive/warcprox """ +from __future__ import absolute_import + try: - import http.server - http_server = http.server + import http.server as http_server except ImportError: - import BaseHTTPServer - http_server = BaseHTTPServer + import BaseHTTPServer as http_server try: import socketserver except ImportError: - import SocketServer - socketserver = SocketServer - -try: - import urllib.parse - urllib_parse = urllib.parse -except ImportError: - import urlparse - urllib_parse = urlparse + import SocketServer as socketserver try: import queue except ImportError: - import Queue - queue = Queue + import Queue as queue try: - import http.client - http_client = http.client + import http.client as http_client except ImportError: - import httplib - http_client = httplib - -try: - import dbm.gnu - dbm_gnu = dbm.gnu -except ImportError: - try: - import gdbm - dbm_gnu = gdbm - except ImportError: - import anydbm - dbm_gnu = anydbm - -try: - from io import StringIO -except ImportError: - from StringIO import StringIO + import httplib as http_client import socket -import OpenSSL -import ssl import logging import sys -from hanzo import warctools, httptools import hashlib -from datetime import datetime import threading import os import argparse -import random import re import signal import time import tempfile -import base64 import json import traceback -def _read_version_bytes(): - version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt']) - with open(version_txt, 'rb') as fin: - return fin.read().strip() - -version_bytes = _read_version_bytes() -version_str = version_bytes.strip().decode('utf-8') - -class CertificateAuthority(object): - logger = logging.getLogger('warcprox.CertificateAuthority') - - def __init__(self, ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca'): - self.ca_file = ca_file - self.certs_dir = certs_dir - - if not os.path.exists(ca_file): - self._generate_ca() - else: - self._read_ca(ca_file) - - if not os.path.exists(certs_dir): - self.logger.info("directory for generated certs {} doesn't exist, creating it".format(certs_dir)) - os.mkdir(certs_dir) - - - def _generate_ca(self): - # Generate key - self.key = OpenSSL.crypto.PKey() - self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) - - # Generate certificate - self.cert = OpenSSL.crypto.X509() - self.cert.set_version(2) - # avoid sec_error_reused_issuer_and_serial - self.cert.set_serial_number(random.randint(0,2**64-1)) - self.cert.get_subject().CN = 'Warcprox CA on {}'.format(socket.gethostname())[:64] - self.cert.gmtime_adj_notBefore(0) # now - self.cert.gmtime_adj_notAfter(100*365*24*60*60) # 100 yrs in future - self.cert.set_issuer(self.cert.get_subject()) - self.cert.set_pubkey(self.key) - self.cert.add_extensions([ - OpenSSL.crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"), - OpenSSL.crypto.X509Extension(b"keyUsage", True, b"keyCertSign, cRLSign"), - OpenSSL.crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=self.cert), - ]) - self.cert.sign(self.key, "sha1") - - with open(self.ca_file, 'wb+') as f: - f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, self.key)) - f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, self.cert)) - - self.logger.info('generated CA key+cert and wrote to {}'.format(self.ca_file)) - - - def _read_ca(self, filename): - self.cert = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, open(filename).read()) - self.key = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, open(filename).read()) - self.logger.info('read CA key+cert from {}'.format(self.ca_file)) - - def __getitem__(self, cn): - cnp = os.path.sep.join([self.certs_dir, '%s.pem' % cn]) - if not os.path.exists(cnp): - # create certificate - key = OpenSSL.crypto.PKey() - key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) - - # Generate CSR - req = OpenSSL.crypto.X509Req() - req.get_subject().CN = cn - req.set_pubkey(key) - req.sign(key, 'sha1') - - # Sign CSR - cert = OpenSSL.crypto.X509() - cert.set_subject(req.get_subject()) - cert.set_serial_number(random.randint(0,2**64-1)) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(10*365*24*60*60) - cert.set_issuer(self.cert.get_subject()) - cert.set_pubkey(req.get_pubkey()) - cert.sign(self.key, 'sha1') - - with open(cnp, 'wb+') as f: - f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key)) - f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert)) - - self.logger.info('wrote generated key+cert to {}'.format(cnp)) - - return cnp +import warcprox +from warcprox.mitmproxy import MitmProxyHandler +from warcprox.dedup import DedupDb +from warcprox.certauth import CertificateAuthority +from warcprox.warcwriter import WarcWriterThread +from warcprox import playback class ProxyingRecorder(object): @@ -173,7 +57,7 @@ class ProxyingRecorder(object): calculating digests, and sending them on to the proxy client. """ - logger = logging.getLogger('warcprox.ProxyingRecordingHTTPResponse') + logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, fp, proxy_dest, digest_algorithm='sha1'): self.fp = fp @@ -275,138 +159,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp = self.recorder -class MitmProxyHandler(http_server.BaseHTTPRequestHandler): - logger = logging.getLogger('warcprox.MitmProxyHandler') - - def __init__(self, request, client_address, server): - self.is_connect = False - - ## XXX hack around bizarre bug on my mac python 3.2 in http.server - ## where hasattr returns true in the code snippet below, but - ## self._headers_buffer is None - # - # if not hasattr(self, '_headers_buffer'): - # self._headers_buffer = [] - # self._headers_buffer.append( - self._headers_buffer = [] - - http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server) - - def _determine_host_port(self): - # Get hostname and port to connect to - if self.is_connect: - self.hostname, self.port = self.path.split(':') - else: - self.url = self.path - u = urllib_parse.urlparse(self.url) - if u.scheme != 'http': - raise Exception('Unknown scheme %s' % repr(u.scheme)) - self.hostname = u.hostname - self.port = u.port or 80 - self.path = urllib_parse.urlunparse( - urllib_parse.ParseResult( - scheme='', - netloc='', - params=u.params, - path=u.path or '/', - query=u.query, - fragment=u.fragment - ) - ) - - - def _connect_to_host(self): - # Connect to destination - self._proxy_sock = socket.socket() - self._proxy_sock.settimeout(10) - self._proxy_sock.connect((self.hostname, int(self.port))) - - # Wrap socket if SSL is required - if self.is_connect: - self._proxy_sock = ssl.wrap_socket(self._proxy_sock) - - - def _transition_to_ssl(self): - self.request = self.connection = ssl.wrap_socket(self.connection, - server_side=True, certfile=self.server.ca[self.hostname]) - - - def do_CONNECT(self): - self.is_connect = True - try: - # Connect to destination first - self._determine_host_port() - self._connect_to_host() - - # If successful, let's do this! - self.send_response(200, 'Connection established') - self.end_headers() - self._transition_to_ssl() - except Exception as e: - self.send_error(500, str(e)) - return - - # Reload! - self.setup() - self.handle_one_request() - - - def _construct_tunneled_url(self): - if int(self.port) == 443: - netloc = self.hostname - else: - netloc = '{}:{}'.format(self.hostname, self.port) - - result = urllib_parse.urlunparse( - urllib_parse.ParseResult( - scheme='https', - netloc=netloc, - params='', - path=self.path, - query='', - fragment='' - ) - ) - - return result - - - def do_COMMAND(self): - if not self.is_connect: - try: - # Connect to destination - self._determine_host_port() - self._connect_to_host() - assert self.url - except Exception as e: - self.send_error(500, str(e)) - return - else: - # if self.is_connect we already connected in do_CONNECT - self.url = self._construct_tunneled_url() - - self._proxy_request() - - - def _proxy_request(self): - raise Exception('_proxy_request() not implemented in MitmProxyHandler, must be implemented in subclass!') - - def __getattr__(self, item): - if item.startswith('do_'): - return self.do_COMMAND - - def log_error(self, fmt, *args): - self.logger.error("{0} - - [{1}] {2}".format(self.address_string(), - self.log_date_time_string(), fmt % args)) - - def log_message(self, fmt, *args): - self.logger.info("{} {} - - [{}] {}".format(self.__class__.__name__, - self.address_string(), self.log_date_time_string(), fmt % args)) - - class WarcProxyHandler(MitmProxyHandler): - - logger = logging.getLogger('warcprox.WarcProxyHandler') + logger = logging.getLogger(__module__ + "." + __qualname__) def _proxy_request(self): # Build request @@ -477,7 +231,7 @@ class RecordedUrl(object): class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): - logger = logging.getLogger('warcprox.WarcProxy') + logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, server_address=('localhost', 8000), req_handler_class=WarcProxyHandler, bind_and_activate=True, @@ -505,572 +259,8 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): http_server.HTTPServer.server_close(self) -class PlaybackProxyHandler(MitmProxyHandler): - logger = logging.getLogger('warcprox.PlaybackProxyHandler') - - # @Override - def _connect_to_host(self): - # don't connect to host! - pass - - - # @Override - def _proxy_request(self): - date, location = self.server.playback_index_db.lookup_latest(self.url.encode('utf-8')) - self.logger.debug('lookup_latest returned {}:{}'.format(date, location)) - - status = None - if location is not None: - try: - status, sz = self._send_response_from_warc(location['f'], location['o']) - except: - status = 500 - self.logger.error('PlaybackProxyHandler problem playing back {}'.format(self.url), exc_info=1) - payload = b'500 Warcprox Error\n\n{}\n'.format(traceback.format_exc()).encode('utf-8') - headers = (b'HTTP/1.1 500 Internal Server Error\r\n' - + b'Content-Type: text/plain;charset=utf-8\r\n' - + b'Content-Length: ' + len(payload) + b'\r\n' - + b'\r\n') - self.connection.sendall(headers) - self.connection.sendall(payload) - sz = len(headers) + len(payload) - else: - status = 404 - payload = b'404 Not in Archive\n' - headers = (b'HTTP/1.1 404 Not Found\r\n' - + b'Content-Type: text/plain;charset=utf-8\r\n' - + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n' - + b'\r\n') - self.connection.sendall(headers) - self.connection.sendall(payload) - sz = len(headers) + len(payload) - - self.log_message('"%s" %s %s %s', - self.requestline, str(status), str(sz), repr(location) if location else '-') - - - def _open_warc_at_offset(self, warcfilename, offset): - self.logger.debug('opening {} at offset {}'.format(warcfilename, offset)) - - warcpath = None - for p in (os.path.sep.join([self.server.warcs_dir, warcfilename]), - os.path.sep.join([self.server.warcs_dir, '{}.open'.format(warcfilename)])): - if os.path.exists(p): - warcpath = p - - if warcpath is None: - raise Exception('{} not found'.format(warcfilename)) - - return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset) - - - def _send_response(self, headers, payload_fh): - status = '-' - m = re.match(br'^HTTP/\d\.\d (\d{3})', headers) - if m is not None: - status = m.group(1) - - self.connection.sendall(headers) - sz = len(headers) - - while True: - buf = payload_fh.read(8192) - if buf == b'': break - self.connection.sendall(buf) - sz += len(buf) - - return status, sz - - - def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date): - location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to) - self.logger.debug('loading http payload from {}'.format(location)) - - fh = self._open_warc_at_offset(location['f'], location['o']) - try: - for (offset, record, errors) in fh.read_records(limit=1, offsets=True): - pass - - if errors: - raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) - - warc_type = record.get_header(warctools.WarcRecord.TYPE) - if warc_type != warctools.WarcRecord.RESPONSE: - raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type)) - - # find end of headers - while True: - line = record.content_file.readline() - if line == b'' or re.match(br'^\r?\n$', line): - break - - return self._send_response(headers, record.content_file) - - finally: - fh.close() - - - def _send_response_from_warc(self, warcfilename, offset): - fh = self._open_warc_at_offset(warcfilename, offset) - try: - for (offset, record, errors) in fh.read_records(limit=1, offsets=True): - pass - - if errors: - raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) - - warc_type = record.get_header(warctools.WarcRecord.TYPE) - - if warc_type == warctools.WarcRecord.RESPONSE: - headers_buf = bytearray() - while True: - line = record.content_file.readline() - headers_buf.extend(line) - if line == b'' or re.match(b'^\r?\n$', line): - break - - return self._send_response(headers_buf, record.content_file) - - elif warc_type == warctools.WarcRecord.REVISIT: - # response consists of http headers from revisit record and - # payload from the referenced record - warc_profile = record.get_header(warctools.WarcRecord.PROFILE) - if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: - raise Exception('unknown revisit record profile {}'.format(warc_profile)) - - refers_to = record.get_header(warctools.WarcRecord.REFERS_TO) - refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI) - refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE) - - self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri)) - return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date) - - else: - raise Exception('unknown warc record type {}'.format(warc_type)) - - finally: - fh.close() - - raise Exception('should not reach this point') - - -class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): - logger = logging.getLogger('warcprox.PlaybackProxy') - - def __init__(self, server_address, req_handler_class=PlaybackProxyHandler, - bind_and_activate=True, ca=None, playback_index_db=None, - warcs_dir=None): - http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) - self.ca = ca - self.playback_index_db = playback_index_db - self.warcs_dir = warcs_dir - - def server_activate(self): - http_server.HTTPServer.server_activate(self) - self.logger.info('PlaybackProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) - - def server_close(self): - self.logger.info('PlaybackProxy shutting down') - http_server.HTTPServer.server_close(self) - - -class DedupDb(object): - logger = logging.getLogger('warcprox.DedupDb') - - def __init__(self, dbm_file='./warcprox-dedup.db'): - if os.path.exists(dbm_file): - self.logger.info('opening existing deduplication database {}'.format(dbm_file)) - else: - self.logger.info('creating new deduplication database {}'.format(dbm_file)) - - self.db = dbm_gnu.open(dbm_file, 'c') - - def close(self): - self.db.close() - - def sync(self): - if hasattr(self.db, 'sync'): - self.db.sync() - - def save(self, key, response_record, offset): - record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') - url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') - date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - - py_value = {'i':record_id, 'u':url, 'd':date} - json_value = json.dumps(py_value, separators=(',',':')) - - self.db[key] = json_value.encode('utf-8') - self.logger.debug('dedup db saved {}:{}'.format(key, json_value)) - - - def lookup(self, key): - if key in self.db: - json_result = self.db[key] - result = json.loads(json_result.decode('utf-8')) - result['i'] = result['i'].encode('latin1') - result['u'] = result['u'].encode('latin1') - result['d'] = result['d'].encode('latin1') - return result - else: - return None - - -class WarcWriterThread(threading.Thread): - logger = logging.getLogger('warcprox.WarcWriterThread') - - # port is only used for warc filename - def __init__(self, recorded_url_q=None, directory='./warcs', - rollover_size=1000000000, rollover_idle_time=None, gzip=False, - prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False, - dedup_db=None, playback_index_db=None): - - threading.Thread.__init__(self, name='WarcWriterThread') - - self.recorded_url_q = recorded_url_q - - self.rollover_size = rollover_size - self.rollover_idle_time = rollover_idle_time - - self.gzip = gzip - self.digest_algorithm = digest_algorithm - self.base32 = base32 - self.dedup_db = dedup_db - - self.playback_index_db = playback_index_db - - # warc path and filename stuff - self.directory = directory - self.prefix = prefix - self.port = port - - self._f = None - self._fpath = None - self._serial = 0 - - if not os.path.exists(directory): - self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) - os.mkdir(directory) - - self.stop = threading.Event() - - - # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record - def build_warc_records(self, recorded_url): - warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) - - dedup_info = None - if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None: - key = self.digest_str(recorded_url.response_recorder.payload_digest) - dedup_info = self.dedup_db.lookup(key) - - if dedup_info is not None: - # revisit record - recorded_url.response_recorder.tempfile.seek(0) - if recorded_url.response_recorder.payload_offset is not None: - response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset) - else: - response_header_block = recorded_url.response_recorder.tempfile.read() - - principal_record = self.build_warc_record( - url=recorded_url.url, warc_date=warc_date, - data=response_header_block, - warc_type=warctools.WarcRecord.REVISIT, - refers_to=dedup_info['i'], - refers_to_target_uri=dedup_info['u'], - refers_to_date=dedup_info['d'], - profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, - content_type=httptools.ResponseMessage.CONTENT_TYPE, - remote_ip=recorded_url.remote_ip) - else: - # response record - principal_record = self.build_warc_record( - url=recorded_url.url, warc_date=warc_date, - recorder=recorded_url.response_recorder, - warc_type=warctools.WarcRecord.RESPONSE, - content_type=httptools.ResponseMessage.CONTENT_TYPE, - remote_ip=recorded_url.remote_ip) - - request_record = self.build_warc_record( - url=recorded_url.url, warc_date=warc_date, - data=recorded_url.request_data, - warc_type=warctools.WarcRecord.REQUEST, - content_type=httptools.RequestMessage.CONTENT_TYPE, - concurrent_to=principal_record.id) - - return principal_record, request_record - - - def digest_str(self, hash_obj): - return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii')) - - - def build_warc_record(self, url, warc_date=None, recorder=None, data=None, - concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, - profile=None, refers_to=None, refers_to_target_uri=None, - refers_to_date=None): - - if warc_date is None: - warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) - - record_id = warctools.WarcRecord.random_warc_uuid() - - headers = [] - if warc_type is not None: - headers.append((warctools.WarcRecord.TYPE, warc_type)) - headers.append((warctools.WarcRecord.ID, record_id)) - headers.append((warctools.WarcRecord.DATE, warc_date)) - headers.append((warctools.WarcRecord.URL, url)) - if remote_ip is not None: - headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) - if profile is not None: - headers.append((warctools.WarcRecord.PROFILE, profile)) - if refers_to is not None: - headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) - if refers_to_target_uri is not None: - headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) - if refers_to_date is not None: - headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date)) - if concurrent_to is not None: - headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) - if content_type is not None: - headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) - - if recorder is not None: - headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, - self.digest_str(recorder.block_digest))) - if recorder.payload_digest is not None: - headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, - self.digest_str(recorder.payload_digest))) - - recorder.tempfile.seek(0) - record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) - - else: - headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) - block_digest = hashlib.new(self.digest_algorithm, data) - headers.append((warctools.WarcRecord.BLOCK_DIGEST, - self.digest_str(block_digest))) - - content_tuple = content_type, data - record = warctools.WarcRecord(headers=headers, content=content_tuple) - - return record - - - def timestamp17(self): - now = datetime.utcnow() - return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) - - def _close_writer(self): - if self._fpath: - self.logger.info('closing {0}'.format(self._f_finalname)) - self._f.close() - finalpath = os.path.sep.join([self.directory, self._f_finalname]) - os.rename(self._fpath, finalpath) - - self._fpath = None - self._f = None - - def _build_warcinfo_record(self, filename): - warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow()) - record_id = warctools.WarcRecord.random_warc_uuid() - - headers = [] - headers.append((warctools.WarcRecord.ID, record_id)) - headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO)) - headers.append((warctools.WarcRecord.FILENAME, filename.encode('latin1'))) - headers.append((warctools.WarcRecord.DATE, warc_record_date)) - - warcinfo_fields = [] - warcinfo_fields.append(b'software: warcprox ' + version_bytes) - hostname = socket.gethostname() - warcinfo_fields.append('hostname: {}'.format(hostname).encode('latin1')) - warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)).encode('latin1')) - warcinfo_fields.append(b'format: WARC File Format 1.0') - # warcinfo_fields.append('robots: ignore') - # warcinfo_fields.append('description: {0}'.format(self.description)) - # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) - data = b'\r\n'.join(warcinfo_fields) + b'\r\n' - - record = warctools.WarcRecord(headers=headers, content=(b'application/warc-fields', data)) - - return record - - - # - def _writer(self): - if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: - self._close_writer() - - if self._f == None: - self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( - self.prefix, self.timestamp17(), self._serial, os.getpid(), - socket.gethostname(), self.port, '.gz' if self.gzip else '') - self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open']) - - self._f = open(self._fpath, 'wb') - - warcinfo_record = self._build_warcinfo_record(self._f_finalname) - self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers)) - warcinfo_record.write_to(self._f, gzip=self.gzip) - - self._serial += 1 - - return self._f - - - def _final_tasks(self, recorded_url, recordset, recordset_offset): - if (self.dedup_db is not None - and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE - and recorded_url.response_recorder.payload_size() > 0): - key = self.digest_str(recorded_url.response_recorder.payload_digest) - self.dedup_db.save(key, recordset[0], recordset_offset) - - if self.playback_index_db is not None: - self.playback_index_db.save(self._f_finalname, recordset, recordset_offset) - - recorded_url.response_recorder.tempfile.close() - - def run(self): - self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.directory), self.gzip, self.rollover_size, - self.rollover_idle_time, self.prefix, self.port)) - - self._last_sync = self._last_activity = time.time() - - while not self.stop.is_set(): - try: - recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - - self._last_activity = time.time() - - recordset = self.build_warc_records(recorded_url) - - writer = self._writer() - recordset_offset = writer.tell() - - for record in recordset: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) - self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format( - record.get_header(warctools.WarcRecord.TYPE), - record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - self._fpath, offset)) - - self._f.flush() - - self._final_tasks(recorded_url, recordset, recordset_offset) - - except queue.Empty: - if (self._fpath is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) - self._close_writer() - - if time.time() - self._last_sync > 60: - if self.dedup_db: - self.dedup_db.sync() - if self.playback_index_db: - self.playback_index_db.sync() - self._last_sync = time.time() - - self.logger.info('WarcWriterThread shutting down') - self._close_writer(); - - -class PlaybackIndexDb(object): - logger = logging.getLogger('warcprox.PlaybackIndexDb') - - def __init__(self, dbm_file='./warcprox-playback-index.db'): - if os.path.exists(dbm_file): - self.logger.info('opening existing playback index database {}'.format(dbm_file)) - else: - self.logger.info('creating new playback index database {}'.format(dbm_file)) - - self.db = dbm_gnu.open(dbm_file, 'c') - - - def close(self): - self.db.close() - - - def sync(self): - if hasattr(self.db, 'sync'): - self.db.sync() - - - def save(self, warcfile, recordset, offset): - response_record = recordset[0] - # XXX canonicalize url? - url = response_record.get_header(warctools.WarcRecord.URL) - date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') - - # there could be two visits of same url in the same second, and WARC-Date is - # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ - - # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} - if url in self.db: - existing_json_value = self.db[url].decode('utf-8') - py_value = json.loads(existing_json_value) - else: - py_value = {} - - if date_str in py_value: - py_value[date_str].append({'f':warcfile, 'o':offset, 'i':record_id_str}) - else: - py_value[date_str] = [{'f':warcfile, 'o':offset, 'i':record_id_str}] - - json_value = json.dumps(py_value, separators=(',',':')) - - self.db[url] = json_value.encode('utf-8') - - self.logger.debug('playback index saved: {}:{}'.format(url, json_value)) - - - def lookup_latest(self, url): - if url not in self.db: - return None, None - - json_value = self.db[url].decode('utf-8') - self.logger.debug("{}:{}".format(repr(url), repr(json_value))) - py_value = json.loads(json_value) - - latest_date = max(py_value) - result = py_value[latest_date][0] - result['i'] = result['i'].encode('ascii') - return latest_date, result - - - # in python3 params are bytes - def lookup_exact(self, url, warc_date, record_id): - if url not in self.db: - return None - - json_value = self.db[url].decode('utf-8') - self.logger.debug("{}:{}".format(repr(url), repr(json_value))) - py_value = json.loads(json_value) - - warc_date_str = warc_date.decode('ascii') - - if warc_date_str in py_value: - for record in py_value[warc_date_str]: - if record['i'].encode('ascii') == record_id: - self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) - record['i'] = record['i'].encode('ascii') - return record - else: - self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url))) - return None - - class WarcproxController(object): - logger = logging.getLogger('warcprox.WarcproxController') + logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, proxy=None, warc_writer=None, playback_proxy=None): """ @@ -1186,7 +376,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcprox-playback-index.db', help='playback index database file (only used if --playback-port is specified)') arg_parser.add_argument('--version', action='version', - version="warcprox {}".format(version_str)) + version="warcprox {}".format(warcprox.version_str)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') # [--ispartof=warcinfo ispartof] @@ -1232,9 +422,9 @@ def main(argv=sys.argv): digest_algorithm=args.digest_algorithm) if args.playback_port is not None: - playback_index_db = PlaybackIndexDb(args.playback_index_db_file) + playback_index_db = playback.PlaybackIndexDb(args.playback_index_db_file) playback_server_address=(args.address, int(args.playback_port)) - playback_proxy = PlaybackProxy(server_address=playback_server_address, + playback_proxy = playback.PlaybackProxy(server_address=playback_server_address, ca=ca, playback_index_db=playback_index_db, warcs_dir=args.directory) else: diff --git a/warcprox/warcwriter.py b/warcprox/warcwriter.py new file mode 100644 index 0000000..d7b4da6 --- /dev/null +++ b/warcprox/warcwriter.py @@ -0,0 +1,287 @@ +# vim:set sw=4 et: + +try: + import queue +except ImportError: + import Queue as queue + +import logging +import threading +import os +import hashlib +import time +import socket +import base64 +from datetime import datetime +import hanzo.httptools +from hanzo import warctools +import warcprox + + +class WarcWriterThread(threading.Thread): + logger = logging.getLogger(__module__ + "." + __qualname__) + + # port is only used for warc filename + def __init__(self, recorded_url_q=None, directory='./warcs', + rollover_size=1000000000, rollover_idle_time=None, gzip=False, + prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False, + dedup_db=None, playback_index_db=None): + + threading.Thread.__init__(self, name='WarcWriterThread') + + self.recorded_url_q = recorded_url_q + + self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + + self.gzip = gzip + self.digest_algorithm = digest_algorithm + self.base32 = base32 + self.dedup_db = dedup_db + + self.playback_index_db = playback_index_db + + # warc path and filename stuff + self.directory = directory + self.prefix = prefix + self.port = port + + self._f = None + self._fpath = None + self._serial = 0 + + if not os.path.exists(directory): + self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) + os.mkdir(directory) + + self.stop = threading.Event() + + + # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record + def build_warc_records(self, recorded_url): + warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) + + dedup_info = None + if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None: + key = self.digest_str(recorded_url.response_recorder.payload_digest) + dedup_info = self.dedup_db.lookup(key) + + if dedup_info is not None: + # revisit record + recorded_url.response_recorder.tempfile.seek(0) + if recorded_url.response_recorder.payload_offset is not None: + response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset) + else: + response_header_block = recorded_url.response_recorder.tempfile.read() + + principal_record = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + data=response_header_block, + warc_type=warctools.WarcRecord.REVISIT, + refers_to=dedup_info['i'], + refers_to_target_uri=dedup_info['u'], + refers_to_date=dedup_info['d'], + profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, + content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, + remote_ip=recorded_url.remote_ip) + else: + # response record + principal_record = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + recorder=recorded_url.response_recorder, + warc_type=warctools.WarcRecord.RESPONSE, + content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, + remote_ip=recorded_url.remote_ip) + + request_record = self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + data=recorded_url.request_data, + warc_type=warctools.WarcRecord.REQUEST, + content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE, + concurrent_to=principal_record.id) + + return principal_record, request_record + + + def digest_str(self, hash_obj): + return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii')) + + + def build_warc_record(self, url, warc_date=None, recorder=None, data=None, + concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, + profile=None, refers_to=None, refers_to_target_uri=None, + refers_to_date=None): + + if warc_date is None: + warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) + + record_id = warctools.WarcRecord.random_warc_uuid() + + headers = [] + if warc_type is not None: + headers.append((warctools.WarcRecord.TYPE, warc_type)) + headers.append((warctools.WarcRecord.ID, record_id)) + headers.append((warctools.WarcRecord.DATE, warc_date)) + headers.append((warctools.WarcRecord.URL, url)) + if remote_ip is not None: + headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) + if profile is not None: + headers.append((warctools.WarcRecord.PROFILE, profile)) + if refers_to is not None: + headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) + if refers_to_target_uri is not None: + headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) + if refers_to_date is not None: + headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date)) + if concurrent_to is not None: + headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) + if content_type is not None: + headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) + + if recorder is not None: + headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, + self.digest_str(recorder.block_digest))) + if recorder.payload_digest is not None: + headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, + self.digest_str(recorder.payload_digest))) + + recorder.tempfile.seek(0) + record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) + + else: + headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) + block_digest = hashlib.new(self.digest_algorithm, data) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, + self.digest_str(block_digest))) + + content_tuple = content_type, data + record = warctools.WarcRecord(headers=headers, content=content_tuple) + + return record + + + def timestamp17(self): + now = datetime.utcnow() + return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) + + def _close_writer(self): + if self._fpath: + self.logger.info('closing {0}'.format(self._f_finalname)) + self._f.close() + finalpath = os.path.sep.join([self.directory, self._f_finalname]) + os.rename(self._fpath, finalpath) + + self._fpath = None + self._f = None + + def _build_warcinfo_record(self, filename): + warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow()) + record_id = warctools.WarcRecord.random_warc_uuid() + + headers = [] + headers.append((warctools.WarcRecord.ID, record_id)) + headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO)) + headers.append((warctools.WarcRecord.FILENAME, filename.encode('latin1'))) + headers.append((warctools.WarcRecord.DATE, warc_record_date)) + + warcinfo_fields = [] + warcinfo_fields.append(b'software: warcprox ' + warcprox.version_bytes) + hostname = socket.gethostname() + warcinfo_fields.append('hostname: {}'.format(hostname).encode('latin1')) + warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)).encode('latin1')) + warcinfo_fields.append(b'format: WARC File Format 1.0') + # warcinfo_fields.append('robots: ignore') + # warcinfo_fields.append('description: {0}'.format(self.description)) + # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) + data = b'\r\n'.join(warcinfo_fields) + b'\r\n' + + record = warctools.WarcRecord(headers=headers, content=(b'application/warc-fields', data)) + + return record + + + # + def _writer(self): + if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: + self._close_writer() + + if self._f == None: + self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( + self.prefix, self.timestamp17(), self._serial, os.getpid(), + socket.gethostname(), self.port, '.gz' if self.gzip else '') + self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open']) + + self._f = open(self._fpath, 'wb') + + warcinfo_record = self._build_warcinfo_record(self._f_finalname) + self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers)) + warcinfo_record.write_to(self._f, gzip=self.gzip) + + self._serial += 1 + + return self._f + + + def _final_tasks(self, recorded_url, recordset, recordset_offset): + if (self.dedup_db is not None + and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + key = self.digest_str(recorded_url.response_recorder.payload_digest) + self.dedup_db.save(key, recordset[0], recordset_offset) + + if self.playback_index_db is not None: + self.playback_index_db.save(self._f_finalname, recordset, recordset_offset) + + recorded_url.response_recorder.tempfile.close() + + def run(self): + self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( + os.path.abspath(self.directory), self.gzip, self.rollover_size, + self.rollover_idle_time, self.prefix, self.port)) + + self._last_sync = self._last_activity = time.time() + + while not self.stop.is_set(): + try: + recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + + self._last_activity = time.time() + + recordset = self.build_warc_records(recorded_url) + + writer = self._writer() + recordset_offset = writer.tell() + + for record in recordset: + offset = writer.tell() + record.write_to(writer, gzip=self.gzip) + self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format( + record.get_header(warctools.WarcRecord.TYPE), + record.get_header(warctools.WarcRecord.CONTENT_LENGTH), + record.get_header(warctools.WarcRecord.URL), + self._fpath, offset)) + + self._f.flush() + + self._final_tasks(recorded_url, recordset, recordset_offset) + + except queue.Empty: + if (self._fpath is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) + self._close_writer() + + if time.time() - self._last_sync > 60: + if self.dedup_db: + self.dedup_db.sync() + if self.playback_index_db: + self.playback_index_db.sync() + self._last_sync = time.time() + + self.logger.info('WarcWriterThread shutting down') + self._close_writer(); + +