diff --git a/.travis.yml b/.travis.yml index c427b37..0ad15d4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,10 +50,10 @@ before_script: - docker ps script: -- py.test -v tests -- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests -- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests -- py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests +- py.test -v --tb=native tests +- py.test -v --tb=native --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests +- py.test -v --tb=native --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests +- py.test -v --tb=native --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests after_script: - ps ww -fHe diff --git a/README.rst b/README.rst index d76e2191..a026937 100644 --- a/README.rst +++ b/README.rst @@ -149,6 +149,28 @@ specifying ``--plugin`` multiples times. See a minimal example `here `__. +Architecture +============ +.. image:: arch.jpg + +Warcprox is multithreaded. It has pool of http proxy threads (100 by default). +When handling a request, a proxy thread records data from the remote server to +an in-memory buffer that spills over to disk if necessary (after 512k by +default), while it streams the data to the proxy client. Once the HTTP +transaction is complete, it puts the recorded URL in a thread-safe queue, to be +picked up by the first processor in the postfetch chain. + +The postfetch chain normally includes processors for loading deduplication +information, writing records to the WARC, saving deduplication information, and +updating statistics. The exact set of processors in the chain depends on +command line arguments; for example, plugins specified with ``--plugin`` are +processors in the postfetch chain. Each postfetch processor has its own thread +or threads. Thus the processors are able to run in parallel, independent of one +another. This design also enables them to process URLs in batch. For example, +the statistics processor gathers statistics for up to 10 seconds or 500 URLs, +whichever comes first, then updates the statistics database with just a few +queries. + License ======= diff --git a/arch.jpg b/arch.jpg new file mode 100644 index 0000000..f3c855b Binary files /dev/null and b/arch.jpg differ diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index 4491a8b..f273e96 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -194,7 +194,7 @@ if __name__ == '__main__': args = arg_parser.parse_args(args=sys.argv[1:]) if args.trace: - loglevel = warcprox.TRACE + loglevel = logging.TRACE elif args.verbose: loglevel = logging.DEBUG else: diff --git a/tests/test_ensure_rethinkdb_tables.py b/tests/test_ensure_rethinkdb_tables.py index 030cddb..f0649f4 100644 --- a/tests/test_ensure_rethinkdb_tables.py +++ b/tests/test_ensure_rethinkdb_tables.py @@ -30,7 +30,7 @@ import logging import sys logging.basicConfig( - stream=sys.stdout, level=warcprox.TRACE, + stream=sys.stdout, level=logging.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index fad7130..c41f457 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -90,8 +90,7 @@ def _send(self, data): # http_client.HTTPConnection.send = _send logging.basicConfig( - # stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE, - stream=sys.stdout, level=warcprox.TRACE, + stream=sys.stdout, level=logging.TRACE, format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 2dcc838..67cf654 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -266,21 +266,21 @@ def timestamp14(): return '{:%Y%m%d%H%M%S}'.format(now) # monkey-patch log levels TRACE and NOTICE -TRACE = 5 +logging.TRACE = (logging.NOTSET + logging.DEBUG) // 2 def _logger_trace(self, msg, *args, **kwargs): - if self.isEnabledFor(TRACE): - self._log(TRACE, msg, args, **kwargs) + if self.isEnabledFor(logging.TRACE): + self._log(logging.TRACE, msg, args, **kwargs) logging.Logger.trace = _logger_trace logging.trace = logging.root.trace -logging.addLevelName(TRACE, 'TRACE') +logging.addLevelName(logging.TRACE, 'TRACE') -NOTICE = (logging.INFO + logging.WARN) // 2 +logging.NOTICE = (logging.INFO + logging.WARN) // 2 def _logger_notice(self, msg, *args, **kwargs): - if self.isEnabledFor(NOTICE): - self._log(NOTICE, msg, args, **kwargs) + if self.isEnabledFor(logging.NOTICE): + self._log(logging.NOTICE, msg, args, **kwargs) logging.Logger.notice = _logger_notice logging.notice = logging.root.notice -logging.addLevelName(NOTICE, 'NOTICE') +logging.addLevelName(logging.NOTICE, 'NOTICE') import warcprox.controller as controller import warcprox.playback as playback diff --git a/warcprox/controller.py b/warcprox/controller.py index e89ecbb..9d20e71 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -299,9 +299,7 @@ class WarcproxController(object): status_info.update(self.proxy.status()) self.status_info = self.service_registry.heartbeat(status_info) - self.logger.log( - warcprox.TRACE, "status in service registry: %s", - self.status_info) + self.logger.trace('status in service registry: %s', self.status_info) def start(self): with self._start_stop_lock: diff --git a/warcprox/main.py b/warcprox/main.py index 84f5ab3..bf8d11e 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -60,10 +60,23 @@ class BetterArgumentDefaultsHelpFormatter( else: return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action) -def _build_arg_parser(prog='warcprox'): +def _build_arg_parser(prog='warcprox', show_hidden=False): + if show_hidden: + def suppress(msg): + return msg + else: + def suppress(msg): + return argparse.SUPPRESS + arg_parser = argparse.ArgumentParser(prog=prog, description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=BetterArgumentDefaultsHelpFormatter) + + hidden = arg_parser.add_argument_group('hidden options') + arg_parser.add_argument( + '--help-hidden', action='help', default=argparse.SUPPRESS, + help='show help message, including help on hidden options, and exit') + arg_parser.add_argument('-p', '--port', dest='port', default='8000', type=int, help='port to listen on') arg_parser.add_argument('-b', '--address', dest='address', @@ -81,8 +94,12 @@ def _build_arg_parser(prog='warcprox'): help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') - arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', - default=False, action='store_true', help=argparse.SUPPRESS) + hidden.add_argument( + '--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', + help=suppress( + 'do not name warc files with suffix ".open" while writing to ' + 'them, but lock them with lockf(3) intead')) # not mentioned in --help: special value for '-' for --prefix means don't # archive the capture, unless prefix set in warcprox-meta header arg_parser.add_argument( @@ -146,43 +163,60 @@ def _build_arg_parser(prog='warcprox'): 'rethinkdb service registry table url; if provided, warcprox ' 'will create and heartbeat entry for itself')) # optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2" - arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', - help=argparse.SUPPRESS) + hidden.add_argument( + '--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies', + help=suppress( + 'value of Cookie header to include in requests to the cdx ' + 'server, when using --cdxserver-dedup')) arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size', type=int, default=0, help=('try to dedup text resources with payload size over this limit in bytes')) arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size', type=int, default=0, help=( 'try to dedup binary resources with payload size over this limit in bytes')) - # optionally, dedup request only when `dedup-bucket` is available in - # Warcprox-Meta HTTP header. By default, we dedup all requests. - arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', - action='store_true', default=False, help=argparse.SUPPRESS) + hidden.add_argument( + '--dedup-only-with-bucket', dest='dedup_only_with_bucket', + action='store_true', default=False, help=suppress( + 'only deduplicate captures if "dedup-bucket" is set in ' + 'the Warcprox-Meta request header')) arg_parser.add_argument('--blackout-period', dest='blackout_period', type=int, default=0, help='skip writing a revisit record if its too close to the original capture') - arg_parser.add_argument('--queue-size', dest='queue_size', type=int, - default=500, help=argparse.SUPPRESS) - arg_parser.add_argument('--max-threads', dest='max_threads', type=int, - help=argparse.SUPPRESS) - arg_parser.add_argument('--profile', action='store_true', default=False, - help=argparse.SUPPRESS) - arg_parser.add_argument( - '--writer-threads', dest='writer_threads', type=int, default=None, - help=argparse.SUPPRESS) + hidden.add_argument( + '--queue-size', dest='queue_size', type=int, default=500, + help=suppress( + 'maximum number of urls that can be queued at each ' + 'step of the processing chain (see the section on warcprox ' + 'architecture in README.rst)')) + hidden.add_argument( + '--max-threads', dest='max_threads', type=int, default=100, + help=suppress('maximum number of http worker threads')) + hidden.add_argument( + '--profile', action='store_true', default=False, + help=suppress( + 'turn on performance profiling; summary statistics are dumped ' + 'every 10 minutes and at shutdown')) + hidden.add_argument( + '--writer-threads', dest='writer_threads', type=int, default=1, + help=suppress( + 'number of warc writer threads; caution, see ' + 'https://github.com/internetarchive/warcprox/issues/101')) arg_parser.add_argument( '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', default=None, help=( 'host:port of tor socks proxy, used only to connect to ' '.onion sites')) - # Configurable connection socket timeout, default is 60 sec. - arg_parser.add_argument( - '--socket-timeout', dest='socket_timeout', type=float, - default=None, help=argparse.SUPPRESS) + hidden.add_argument( + '--socket-timeout', dest='socket_timeout', type=float, default=60, + help=suppress( + 'socket timeout, used for proxy client connection and for ' + 'connection to remote server')) # Increasing this value increases memory usage but reduces /tmp disk I/O. - arg_parser.add_argument( + hidden.add_argument( '--tmp-file-max-memory-size', dest='tmp_file_max_memory_size', - type=int, default=512*1024, help=argparse.SUPPRESS) + type=int, default=512*1024, help=suppress( + 'size of in-memory buffer for each url being processed ' + '(spills over to temp space on disk if exceeded)')) arg_parser.add_argument( '--max-resource-size', dest='max_resource_size', type=int, default=None, help='maximum resource size limit in bytes') @@ -197,11 +231,18 @@ def _build_arg_parser(prog='warcprox'): 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' 'May be used multiple times to register multiple plugins. ' 'See README.rst for more information.')) - arg_parser.add_argument('--version', action='version', + arg_parser.add_argument( + '-q', '--quiet', dest='quiet', action='store_true', + help='less verbose logging') + arg_parser.add_argument( + '-v', '--verbose', dest='verbose', action='store_true', + help='verbose logging') + arg_parser.add_argument( + '--trace', dest='trace', action='store_true', + help='very verbose logging') + arg_parser.add_argument( + '--version', action='version', version="warcprox {}".format(warcprox.__version__)) - arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') - arg_parser.add_argument('--trace', dest='trace', action='store_true') - arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') return arg_parser @@ -227,7 +268,11 @@ def parse_args(argv): ''' Parses command line arguments with argparse. ''' - arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) + show_hidden = False + if '--help-hidden' in argv: + show_hidden = True + argv = [argv[0], '--help-hidden'] + arg_parser = _build_arg_parser(os.path.basename(argv[0]), show_hidden) args = arg_parser.parse_args(args=argv[1:]) try: @@ -245,11 +290,11 @@ def main(argv=None): args = parse_args(argv or sys.argv) if args.trace: - loglevel = warcprox.TRACE + loglevel = logging.TRACE elif args.verbose: loglevel = logging.DEBUG elif args.quiet: - loglevel = logging.WARNING + loglevel = logging.NOTICE else: loglevel = logging.INFO diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 21d5c3f..4153e54 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -250,7 +250,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): ''' self._conn_pool = self.server.remote_connection_pool.connection_from_host( host=self.hostname, port=int(self.port), scheme='http', - pool_kwargs={'maxsize': 6}) + pool_kwargs={'maxsize': 6, 'timeout': self._socket_timeout}) self._remote_server_conn = self._conn_pool._get_conn() if is_connection_dropped(self._remote_server_conn): @@ -263,10 +263,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_conn.sock.set_proxy( socks.SOCKS5, addr=self.onion_tor_socks_proxy_host, port=self.onion_tor_socks_proxy_port, rdns=True) - self._remote_server_conn.timeout = self._socket_timeout + self._remote_server_conn.sock.settimeout(self._socket_timeout) self._remote_server_conn.sock.connect((self.hostname, int(self.port))) else: - self._remote_server_conn.timeout = self._socket_timeout self._remote_server_conn.connect() # Wrap socket if SSL is required @@ -276,7 +275,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): context.check_hostname = False context.verify_mode = ssl.CERT_NONE self._remote_server_conn.sock = context.wrap_socket( - self._remote_server_conn.sock, server_hostname=self.hostname) + self._remote_server_conn.sock, + server_hostname=self.hostname) except AttributeError: try: self._remote_server_conn.sock = ssl.wrap_socket( @@ -502,10 +502,7 @@ class PooledMixIn(socketserver.ThreadingMixIn): def __init__(self, max_threads=None): self.active_requests = set() self.unaccepted_requests = 0 - if max_threads: - self.max_threads = max_threads - else: - self.max_threads = 100 + self.max_threads = max_threads or 100 self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads) self.logger.info("%s proxy threads", self.max_threads) @@ -595,11 +592,6 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): request_queue_size = 4096 def __init__(self, options=warcprox.Options()): - if options.max_threads: - self.logger.info( - 'max_threads=%s set by command line option', - options.max_threads) - PooledMixIn.__init__(self, options.max_threads) self.profilers = collections.defaultdict(cProfile.Profile) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 2ccfa13..f50691a 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -287,9 +287,11 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): and (warc_type or 'WARC-Type' in self.headers)): timestamp = datetime.datetime.utcnow() - request_data = tempfile.SpooledTemporaryFile(max_size=524288) + request_data = tempfile.SpooledTemporaryFile( + max_size=self._tmp_file_max_memory_size) payload_digest = hashlib.new(self.server.digest_algorithm) + # XXX we don't support chunked uploads for now length = int(self.headers['Content-Length']) buf = self.rfile.read(min(65536, length - request_data.tell())) while buf != b'':