split into even more source files

This commit is contained in:
Noah Levitt 2014-11-20 00:04:43 -08:00
parent 9b8ffbbb51
commit a2c25d4242
6 changed files with 239 additions and 216 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
# vim: set sw=4 et:
from warcprox import warcprox
import warcprox.main
warcprox.main()
warcprox.main.main()

View File

@ -7,7 +7,7 @@
envlist = py34
[testenv]
commands = py.test
commands = py.test warcprox
deps =
pytest
requests

84
warcprox/controller.py Normal file
View File

@ -0,0 +1,84 @@
# vim: set sw=4 et:
import logging
import threading
import signal
import time
import warcprox.warcprox
import warcprox.warcwriter
class WarcproxController(object):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None):
"""
Create warcprox controller.
If supplied, proxy should be an instance of WarcProxy, and
warc_writer_thread should be an instance of WarcWriterThread. If not
supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If
not supplied, no playback proxy will run.
"""
if proxy is not None:
self.proxy = proxy
else:
self.proxy = warcprox.warcprox.WarcProxy()
if warc_writer_thread is not None:
self.warc_writer_thread = warc_writer_thread
else:
self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q)
self.playback_proxy = playback_proxy
def run_until_shutdown(self):
"""Start warcprox and run until shut down.
If running in the main thread, SIGTERM initiates a graceful shutdown.
Otherwise, call warcprox_controller.stop.set().
"""
proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
self.warc_writer_thread.start()
if self.playback_proxy is not None:
playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread')
playback_proxy_thread.start()
self.stop = threading.Event()
try:
signal.signal(signal.SIGTERM, self.stop.set)
self.logger.info('SIGTERM will initiate graceful shutdown')
except ValueError:
pass
try:
while not self.stop.is_set():
time.sleep(0.5)
except:
pass
finally:
self.warc_writer_thread.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
if self.warc_writer_thread.warc_writer.dedup_db is not None:
self.warc_writer_thread.warc_writer.dedup_db.close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()
self.playback_proxy.server_close()
if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close()
# wait for threads to finish
self.warc_writer_thread.join()
proxy_thread.join()
if self.playback_proxy is not None:
playback_proxy_thread.join()

138
warcprox/main.py Normal file
View File

@ -0,0 +1,138 @@
#!/usr/bin/env python
# vim:set sw=4 et:
from __future__ import absolute_import
try:
import queue
except ImportError:
import Queue as queue
import logging
import sys
import hashlib
import argparse
import os
import socket
import warcprox.certauth
import warcprox.playback
import warcprox.dedup
import warcprox.warcwriter
import warcprox.warcprox
import warcprox.controller
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address',
default='localhost', help='address to listen on')
arg_parser.add_argument('-c', '--cacert', dest='cacert',
default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
help='CA certificate file; if file does not exist, it will be created')
arg_parser.add_argument('--certs-dir', dest='certs_dir',
default='./{0}-warcprox-ca'.format(socket.gethostname()),
help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory',
default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-s', '--size', dest='size',
default=1000*1000*1000,
help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
try:
hash_algos = hashlib.algorithms_guaranteed
except AttributeError:
hash_algos = hashlib.algorithms
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.version_str))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
# [--description=warcinfo description]
# [--operator=warcinfo operator]
# [--httpheader=warcinfo httpheader]
return arg_parser
def main(argv=sys.argv):
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
if args.verbose:
loglevel = logging.DEBUG
elif args.quiet:
loglevel = logging.WARNING
else:
loglevel = logging.INFO
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
try:
hashlib.new(args.digest_algorithm)
except Exception as e:
logging.fatal(e)
exit(1)
if args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file)
recorded_url_q = queue.Queue()
ca = warcprox.certauth.CertificateAuthority(args.cacert, args.certs_dir)
proxy = warcprox.warcprox.WarcProxy(
server_address=(args.address, int(args.port)), ca=ca,
recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm)
if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file)
playback_server_address=(args.address, int(args.playback_port))
playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address,
ca=ca, playback_index_db=playback_index_db,
warcs_dir=args.directory)
else:
playback_index_db = None
playback_proxy = None
warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory,
gzip=args.gzip, prefix=args.prefix, port=int(args.port),
rollover_size=int(args.size), base32=args.base32,
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
playback_index_db=playback_index_db)
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(
recorded_url_q=recorded_url_q, warc_writer=warc_writer,
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
controller.run_until_shutdown()
if __name__ == '__main__':
main()

View File

@ -1,8 +1,6 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import warcprox.warcprox
import warcprox.certauth
import unittest
import threading
import time
@ -17,18 +15,21 @@ import shutil
import requests
try:
import http.server
http_server = http.server
import http.server as http_server
except ImportError:
import BaseHTTPServer
http_server = BaseHTTPServer
import BaseHTTPServer as http_server
try:
import queue
except ImportError:
import Queue
queue = Queue
import Queue as queue
import warcprox.controller
import warcprox.warcprox
import warcprox.certauth
import warcprox.playback
import warcprox.warcwriter
import warcprox.dedup
class TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
logger = logging.getLogger('TestHttpRequestHandler')
@ -145,7 +146,7 @@ class WarcproxTest(unittest.TestCase):
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
warc_writer=warc_writer)
self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer_thread, playback_proxy)
self.warcprox = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
self.logger.info('starting warcprox')
self.warcprox_thread = threading.Thread(name='WarcproxThread',
target=self.warcprox.run_until_shutdown)

View File

@ -29,26 +29,14 @@ try:
except ImportError:
import httplib as http_client
import socket
import logging
import sys
import hashlib
import threading
import os
import argparse
import re
import signal
import time
import tempfile
import json
import traceback
import hashlib
import warcprox.certauth
import warcprox.mitmproxy
import warcprox.playback
import warcprox.dedup
import warcprox.warcwriter
class ProxyingRecorder(object):
"""
@ -71,7 +59,7 @@ class ProxyingRecorder(object):
self._prev_hunk_last_two_bytes = b''
self.len = 0
def _update(self, hunk):
def _update_payload_digest(self, hunk):
if self.payload_digest is None:
# convoluted handling of two newlines crossing hunks
# XXX write tests for this
@ -102,6 +90,8 @@ class ProxyingRecorder(object):
else:
self.payload_digest.update(hunk)
def _update(self, hunk):
self._update_payload_digest(hunk)
self.block_digest.update(hunk)
self.tempfile.write(hunk)
@ -116,7 +106,6 @@ class ProxyingRecorder(object):
self.len += len(hunk)
def read(self, size=-1):
hunk = self.fp.read(size)
self._update(hunk)
@ -257,192 +246,3 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
self.logger.info('WarcProxy shutting down')
http_server.HTTPServer.server_close(self)
class WarcproxController(object):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None):
"""
Create warcprox controller.
If supplied, proxy should be an instance of WarcProxy, and
warc_writer_thread should be an instance of WarcWriterThread. If not
supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If not
supplied, no playback proxy will run.
"""
if proxy is not None:
self.proxy = proxy
else:
self.proxy = WarcProxy()
if warc_writer_thread is not None:
self.warc_writer_thread = warc_writer_thread
else:
self.warc_writer_thread = WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q)
self.playback_proxy = playback_proxy
def run_until_shutdown(self):
"""Start warcprox and run until shut down.
If running in the main thread, SIGTERM initiates a graceful shutdown.
Otherwise, call warcprox_controller.stop.set().
"""
proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
self.warc_writer_thread.start()
if self.playback_proxy is not None:
playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread')
playback_proxy_thread.start()
self.stop = threading.Event()
try:
signal.signal(signal.SIGTERM, self.stop.set)
self.logger.info('SIGTERM will initiate graceful shutdown')
except ValueError:
pass
try:
while not self.stop.is_set():
time.sleep(0.5)
except:
pass
finally:
self.warc_writer_thread.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
if self.warc_writer_thread.warc_writer.dedup_db is not None:
self.warc_writer_thread.warc_writer.dedup_db.close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()
self.playback_proxy.server_close()
if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close()
# wait for threads to finish
self.warc_writer_thread.join()
proxy_thread.join()
if self.playback_proxy is not None:
playback_proxy_thread.join()
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address',
default='localhost', help='address to listen on')
arg_parser.add_argument('-c', '--cacert', dest='cacert',
default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
help='CA certificate file; if file does not exist, it will be created')
arg_parser.add_argument('--certs-dir', dest='certs_dir',
default='./{0}-warcprox-ca'.format(socket.gethostname()),
help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory',
default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-s', '--size', dest='size',
default=1000*1000*1000,
help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
try:
hash_algos = hashlib.algorithms_guaranteed
except AttributeError:
hash_algos = hashlib.algorithms
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox-dedup.db', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.version_str))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
# [--description=warcinfo description]
# [--operator=warcinfo operator]
# [--httpheader=warcinfo httpheader]
return arg_parser
def main(argv=sys.argv):
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
if args.verbose:
loglevel = logging.DEBUG
elif args.quiet:
loglevel = logging.WARNING
else:
loglevel = logging.INFO
logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
try:
hashlib.new(args.digest_algorithm)
except Exception as e:
logging.fatal(e)
exit(1)
if args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file)
recorded_url_q = queue.Queue()
ca = warcprox.certauth.CertificateAuthority(args.cacert, args.certs_dir)
proxy = WarcProxy(server_address=(args.address, int(args.port)),
ca=ca, recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm)
if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file)
playback_server_address=(args.address, int(args.playback_port))
playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address,
ca=ca, playback_index_db=playback_index_db,
warcs_dir=args.directory)
else:
playback_index_db = None
playback_proxy = None
warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory,
gzip=args.gzip, prefix=args.prefix, port=int(args.port),
rollover_size=int(args.size), base32=args.base32,
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
playback_index_db=playback_index_db)
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(
recorded_url_q=recorded_url_q, warc_writer=warc_writer,
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
controller = WarcproxController(proxy, warc_writer_thread, playback_proxy)
controller.run_until_shutdown()
if __name__ == '__main__':
main()