diff --git a/bin/warcprox b/bin/warcprox index 10e30fa..e1f8179 100755 --- a/bin/warcprox +++ b/bin/warcprox @@ -1,6 +1,6 @@ #!/usr/bin/env python # vim: set sw=4 et: -from warcprox import warcprox +import warcprox.main -warcprox.main() +warcprox.main.main() diff --git a/tox.ini b/tox.ini index 4fce66b..3e0cf7d 100644 --- a/tox.ini +++ b/tox.ini @@ -7,7 +7,7 @@ envlist = py34 [testenv] -commands = py.test +commands = py.test warcprox deps = pytest requests diff --git a/warcprox/controller.py b/warcprox/controller.py new file mode 100644 index 0000000..3e75de6 --- /dev/null +++ b/warcprox/controller.py @@ -0,0 +1,84 @@ +# vim: set sw=4 et: + +import logging +import threading +import signal +import time + +import warcprox.warcprox +import warcprox.warcwriter + +class WarcproxController(object): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None): + """ + Create warcprox controller. + + If supplied, proxy should be an instance of WarcProxy, and + warc_writer_thread should be an instance of WarcWriterThread. If not + supplied, they are created with default values. + + If supplied, playback_proxy should be an instance of PlaybackProxy. If + not supplied, no playback proxy will run. + """ + if proxy is not None: + self.proxy = proxy + else: + self.proxy = warcprox.warcprox.WarcProxy() + + if warc_writer_thread is not None: + self.warc_writer_thread = warc_writer_thread + else: + self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) + + self.playback_proxy = playback_proxy + + + def run_until_shutdown(self): + """Start warcprox and run until shut down. + + If running in the main thread, SIGTERM initiates a graceful shutdown. + Otherwise, call warcprox_controller.stop.set(). + """ + proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') + proxy_thread.start() + self.warc_writer_thread.start() + + if self.playback_proxy is not None: + playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread') + playback_proxy_thread.start() + + self.stop = threading.Event() + + try: + signal.signal(signal.SIGTERM, self.stop.set) + self.logger.info('SIGTERM will initiate graceful shutdown') + except ValueError: + pass + + try: + while not self.stop.is_set(): + time.sleep(0.5) + except: + pass + finally: + self.warc_writer_thread.stop.set() + self.proxy.shutdown() + self.proxy.server_close() + + if self.warc_writer_thread.warc_writer.dedup_db is not None: + self.warc_writer_thread.warc_writer.dedup_db.close() + + if self.playback_proxy is not None: + self.playback_proxy.shutdown() + self.playback_proxy.server_close() + if self.playback_proxy.playback_index_db is not None: + self.playback_proxy.playback_index_db.close() + + # wait for threads to finish + self.warc_writer_thread.join() + proxy_thread.join() + if self.playback_proxy is not None: + playback_proxy_thread.join() + diff --git a/warcprox/main.py b/warcprox/main.py new file mode 100644 index 0000000..d131b4b --- /dev/null +++ b/warcprox/main.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python +# vim:set sw=4 et: + +from __future__ import absolute_import + +try: + import queue +except ImportError: + import Queue as queue + +import logging +import sys +import hashlib +import argparse +import os +import socket + +import warcprox.certauth +import warcprox.playback +import warcprox.dedup +import warcprox.warcwriter +import warcprox.warcprox +import warcprox.controller + +def _build_arg_parser(prog=os.path.basename(sys.argv[0])): + arg_parser = argparse.ArgumentParser(prog=prog, + description='warcprox - WARC writing MITM HTTP/S proxy', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + arg_parser.add_argument('-p', '--port', dest='port', default='8000', + help='port to listen on') + arg_parser.add_argument('-b', '--address', dest='address', + default='localhost', help='address to listen on') + arg_parser.add_argument('-c', '--cacert', dest='cacert', + default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), + help='CA certificate file; if file does not exist, it will be created') + arg_parser.add_argument('--certs-dir', dest='certs_dir', + default='./{0}-warcprox-ca'.format(socket.gethostname()), + help='where to store and load generated certificates') + arg_parser.add_argument('-d', '--dir', dest='directory', + default='./warcs', help='where to write warcs') + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', + help='write gzip-compressed warc records') + arg_parser.add_argument('-n', '--prefix', dest='prefix', + default='WARCPROX', help='WARC filename prefix') + arg_parser.add_argument('-s', '--size', dest='size', + default=1000*1000*1000, + help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, + help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") + try: + hash_algos = hashlib.algorithms_guaranteed + except AttributeError: + hash_algos = hashlib.algorithms + arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', + default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) + arg_parser.add_argument('--base32', dest='base32', action='store_true', + default=False, help='write digests in Base32 instead of hex') + arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', + default=None, help='port to listen on for instant playback') + arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + default='./warcprox-playback-index.db', + help='playback index database file (only used if --playback-port is specified)') + arg_parser.add_argument('--version', action='version', + version="warcprox {}".format(warcprox.version_str)) + arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') + arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') + # [--ispartof=warcinfo ispartof] + # [--description=warcinfo description] + # [--operator=warcinfo operator] + # [--httpheader=warcinfo httpheader] + + return arg_parser + + +def main(argv=sys.argv): + arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) + args = arg_parser.parse_args(args=argv[1:]) + + if args.verbose: + loglevel = logging.DEBUG + elif args.quiet: + loglevel = logging.WARNING + else: + loglevel = logging.INFO + + logging.basicConfig(stream=sys.stdout, level=loglevel, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + + try: + hashlib.new(args.digest_algorithm) + except Exception as e: + logging.fatal(e) + exit(1) + + if args.dedup_db_file in (None, '', '/dev/null'): + logging.info('deduplication disabled') + dedup_db = None + else: + dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file) + + recorded_url_q = queue.Queue() + + ca = warcprox.certauth.CertificateAuthority(args.cacert, args.certs_dir) + + proxy = warcprox.warcprox.WarcProxy( + server_address=(args.address, int(args.port)), ca=ca, + recorded_url_q=recorded_url_q, + digest_algorithm=args.digest_algorithm) + + if args.playback_port is not None: + playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file) + playback_server_address=(args.address, int(args.playback_port)) + playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address, + ca=ca, playback_index_db=playback_index_db, + warcs_dir=args.directory) + else: + playback_index_db = None + playback_proxy = None + + warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory, + gzip=args.gzip, prefix=args.prefix, port=int(args.port), + rollover_size=int(args.size), base32=args.base32, + dedup_db=dedup_db, digest_algorithm=args.digest_algorithm, + playback_index_db=playback_index_db) + warc_writer_thread = warcprox.warcwriter.WarcWriterThread( + recorded_url_q=recorded_url_q, warc_writer=warc_writer, + rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) + + controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) + controller.run_until_shutdown() + + +if __name__ == '__main__': + main() + diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index 35f48e4..c3ef3fb 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -1,8 +1,6 @@ #!/usr/bin/env python # vim: set sw=4 et: -import warcprox.warcprox -import warcprox.certauth import unittest import threading import time @@ -17,18 +15,21 @@ import shutil import requests try: - import http.server - http_server = http.server + import http.server as http_server except ImportError: - import BaseHTTPServer - http_server = BaseHTTPServer + import BaseHTTPServer as http_server try: import queue except ImportError: - import Queue - queue = Queue + import Queue as queue +import warcprox.controller +import warcprox.warcprox +import warcprox.certauth +import warcprox.playback +import warcprox.warcwriter +import warcprox.dedup class TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): logger = logging.getLogger('TestHttpRequestHandler') @@ -145,7 +146,7 @@ class WarcproxTest(unittest.TestCase): warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, warc_writer=warc_writer) - self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer_thread, playback_proxy) + self.warcprox = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) self.logger.info('starting warcprox') self.warcprox_thread = threading.Thread(name='WarcproxThread', target=self.warcprox.run_until_shutdown) diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 76f7c78..9314669 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -29,26 +29,14 @@ try: except ImportError: import httplib as http_client -import socket import logging -import sys -import hashlib -import threading -import os -import argparse import re -import signal -import time import tempfile -import json import traceback +import hashlib import warcprox.certauth import warcprox.mitmproxy -import warcprox.playback -import warcprox.dedup -import warcprox.warcwriter - class ProxyingRecorder(object): """ @@ -71,7 +59,7 @@ class ProxyingRecorder(object): self._prev_hunk_last_two_bytes = b'' self.len = 0 - def _update(self, hunk): + def _update_payload_digest(self, hunk): if self.payload_digest is None: # convoluted handling of two newlines crossing hunks # XXX write tests for this @@ -102,6 +90,8 @@ class ProxyingRecorder(object): else: self.payload_digest.update(hunk) + def _update(self, hunk): + self._update_payload_digest(hunk) self.block_digest.update(hunk) self.tempfile.write(hunk) @@ -116,7 +106,6 @@ class ProxyingRecorder(object): self.len += len(hunk) - def read(self, size=-1): hunk = self.fp.read(size) self._update(hunk) @@ -257,192 +246,3 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): self.logger.info('WarcProxy shutting down') http_server.HTTPServer.server_close(self) - -class WarcproxController(object): - logger = logging.getLogger(__module__ + "." + __qualname__) - - def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None): - """ - Create warcprox controller. - - If supplied, proxy should be an instance of WarcProxy, and - warc_writer_thread should be an instance of WarcWriterThread. If not - supplied, they are created with default values. - - If supplied, playback_proxy should be an instance of PlaybackProxy. If not - supplied, no playback proxy will run. - """ - if proxy is not None: - self.proxy = proxy - else: - self.proxy = WarcProxy() - - if warc_writer_thread is not None: - self.warc_writer_thread = warc_writer_thread - else: - self.warc_writer_thread = WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) - - self.playback_proxy = playback_proxy - - - def run_until_shutdown(self): - """Start warcprox and run until shut down. - - If running in the main thread, SIGTERM initiates a graceful shutdown. - Otherwise, call warcprox_controller.stop.set(). - """ - proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') - proxy_thread.start() - self.warc_writer_thread.start() - - if self.playback_proxy is not None: - playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread') - playback_proxy_thread.start() - - self.stop = threading.Event() - - try: - signal.signal(signal.SIGTERM, self.stop.set) - self.logger.info('SIGTERM will initiate graceful shutdown') - except ValueError: - pass - - try: - while not self.stop.is_set(): - time.sleep(0.5) - except: - pass - finally: - self.warc_writer_thread.stop.set() - self.proxy.shutdown() - self.proxy.server_close() - - if self.warc_writer_thread.warc_writer.dedup_db is not None: - self.warc_writer_thread.warc_writer.dedup_db.close() - - if self.playback_proxy is not None: - self.playback_proxy.shutdown() - self.playback_proxy.server_close() - if self.playback_proxy.playback_index_db is not None: - self.playback_proxy.playback_index_db.close() - - # wait for threads to finish - self.warc_writer_thread.join() - proxy_thread.join() - if self.playback_proxy is not None: - playback_proxy_thread.join() - - -def _build_arg_parser(prog=os.path.basename(sys.argv[0])): - arg_parser = argparse.ArgumentParser(prog=prog, - description='warcprox - WARC writing MITM HTTP/S proxy', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-p', '--port', dest='port', default='8000', - help='port to listen on') - arg_parser.add_argument('-b', '--address', dest='address', - default='localhost', help='address to listen on') - arg_parser.add_argument('-c', '--cacert', dest='cacert', - default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), - help='CA certificate file; if file does not exist, it will be created') - arg_parser.add_argument('--certs-dir', dest='certs_dir', - default='./{0}-warcprox-ca'.format(socket.gethostname()), - help='where to store and load generated certificates') - arg_parser.add_argument('-d', '--dir', dest='directory', - default='./warcs', help='where to write warcs') - arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', - help='write gzip-compressed warc records') - arg_parser.add_argument('-n', '--prefix', dest='prefix', - default='WARCPROX', help='WARC filename prefix') - arg_parser.add_argument('-s', '--size', dest='size', - default=1000*1000*1000, - help='WARC file rollover size threshold in bytes') - arg_parser.add_argument('--rollover-idle-time', - dest='rollover_idle_time', default=None, - help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") - try: - hash_algos = hashlib.algorithms_guaranteed - except AttributeError: - hash_algos = hashlib.algorithms - arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', - default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) - arg_parser.add_argument('--base32', dest='base32', action='store_true', - default=False, help='write digests in Base32 instead of hex') - arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', - default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication') - arg_parser.add_argument('-P', '--playback-port', dest='playback_port', - default=None, help='port to listen on for instant playback') - arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', - default='./warcprox-playback-index.db', - help='playback index database file (only used if --playback-port is specified)') - arg_parser.add_argument('--version', action='version', - version="warcprox {}".format(warcprox.version_str)) - arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') - arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') - # [--ispartof=warcinfo ispartof] - # [--description=warcinfo description] - # [--operator=warcinfo operator] - # [--httpheader=warcinfo httpheader] - - return arg_parser - - -def main(argv=sys.argv): - arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) - args = arg_parser.parse_args(args=argv[1:]) - - if args.verbose: - loglevel = logging.DEBUG - elif args.quiet: - loglevel = logging.WARNING - else: - loglevel = logging.INFO - - logging.basicConfig(stream=sys.stdout, level=loglevel, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - - try: - hashlib.new(args.digest_algorithm) - except Exception as e: - logging.fatal(e) - exit(1) - - if args.dedup_db_file in (None, '', '/dev/null'): - logging.info('deduplication disabled') - dedup_db = None - else: - dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file) - - recorded_url_q = queue.Queue() - - ca = warcprox.certauth.CertificateAuthority(args.cacert, args.certs_dir) - - proxy = WarcProxy(server_address=(args.address, int(args.port)), - ca=ca, recorded_url_q=recorded_url_q, - digest_algorithm=args.digest_algorithm) - - if args.playback_port is not None: - playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file) - playback_server_address=(args.address, int(args.playback_port)) - playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address, - ca=ca, playback_index_db=playback_index_db, - warcs_dir=args.directory) - else: - playback_index_db = None - playback_proxy = None - - warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory, - gzip=args.gzip, prefix=args.prefix, port=int(args.port), - rollover_size=int(args.size), base32=args.base32, - dedup_db=dedup_db, digest_algorithm=args.digest_algorithm, - playback_index_db=playback_index_db) - warc_writer_thread = warcprox.warcwriter.WarcWriterThread( - recorded_url_q=recorded_url_q, warc_writer=warc_writer, - rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) - - controller = WarcproxController(proxy, warc_writer_thread, playback_proxy) - controller.run_until_shutdown() - - -if __name__ == '__main__': - main() -