Merge pull request #103 from nlevitt/love

Love
This commit is contained in:
Noah Levitt 2018-08-20 14:26:02 -07:00 committed by GitHub
commit ea7257a2b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 122 additions and 64 deletions

View File

@ -50,10 +50,10 @@ before_script:
- docker ps
script:
- py.test -v tests
- py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests
- py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests
- py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests
- py.test -v --tb=native tests
- py.test -v --tb=native --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests
- py.test -v --tb=native --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests
- py.test -v --tb=native --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests
after_script:
- ps ww -fHe

View File

@ -149,6 +149,28 @@ specifying ``--plugin`` multiples times.
See a minimal example `here
<https://github.com/internetarchive/warcprox/blob/318405e795ac0ab8760988a1a482cf0a17697148/warcprox/__init__.py#L165>`__.
Architecture
============
.. image:: arch.jpg
Warcprox is multithreaded. It has pool of http proxy threads (100 by default).
When handling a request, a proxy thread records data from the remote server to
an in-memory buffer that spills over to disk if necessary (after 512k by
default), while it streams the data to the proxy client. Once the HTTP
transaction is complete, it puts the recorded URL in a thread-safe queue, to be
picked up by the first processor in the postfetch chain.
The postfetch chain normally includes processors for loading deduplication
information, writing records to the WARC, saving deduplication information, and
updating statistics. The exact set of processors in the chain depends on
command line arguments; for example, plugins specified with ``--plugin`` are
processors in the postfetch chain. Each postfetch processor has its own thread
or threads. Thus the processors are able to run in parallel, independent of one
another. This design also enables them to process URLs in batch. For example,
the statistics processor gathers statistics for up to 10 seconds or 500 URLs,
whichever comes first, then updates the statistics database with just a few
queries.
License
=======

BIN
arch.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View File

@ -194,7 +194,7 @@ if __name__ == '__main__':
args = arg_parser.parse_args(args=sys.argv[1:])
if args.trace:
loglevel = warcprox.TRACE
loglevel = logging.TRACE
elif args.verbose:
loglevel = logging.DEBUG
else:

View File

@ -30,7 +30,7 @@ import logging
import sys
logging.basicConfig(
stream=sys.stdout, level=warcprox.TRACE,
stream=sys.stdout, level=logging.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')

View File

@ -90,8 +90,7 @@ def _send(self, data):
# http_client.HTTPConnection.send = _send
logging.basicConfig(
# stream=sys.stdout, level=logging.DEBUG, # level=warcprox.TRACE,
stream=sys.stdout, level=warcprox.TRACE,
stream=sys.stdout, level=logging.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)

View File

@ -266,21 +266,21 @@ def timestamp14():
return '{:%Y%m%d%H%M%S}'.format(now)
# monkey-patch log levels TRACE and NOTICE
TRACE = 5
logging.TRACE = (logging.NOTSET + logging.DEBUG) // 2
def _logger_trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs)
if self.isEnabledFor(logging.TRACE):
self._log(logging.TRACE, msg, args, **kwargs)
logging.Logger.trace = _logger_trace
logging.trace = logging.root.trace
logging.addLevelName(TRACE, 'TRACE')
logging.addLevelName(logging.TRACE, 'TRACE')
NOTICE = (logging.INFO + logging.WARN) // 2
logging.NOTICE = (logging.INFO + logging.WARN) // 2
def _logger_notice(self, msg, *args, **kwargs):
if self.isEnabledFor(NOTICE):
self._log(NOTICE, msg, args, **kwargs)
if self.isEnabledFor(logging.NOTICE):
self._log(logging.NOTICE, msg, args, **kwargs)
logging.Logger.notice = _logger_notice
logging.notice = logging.root.notice
logging.addLevelName(NOTICE, 'NOTICE')
logging.addLevelName(logging.NOTICE, 'NOTICE')
import warcprox.controller as controller
import warcprox.playback as playback

View File

@ -299,9 +299,7 @@ class WarcproxController(object):
status_info.update(self.proxy.status())
self.status_info = self.service_registry.heartbeat(status_info)
self.logger.log(
warcprox.TRACE, "status in service registry: %s",
self.status_info)
self.logger.trace('status in service registry: %s', self.status_info)
def start(self):
with self._start_stop_lock:

View File

@ -60,10 +60,23 @@ class BetterArgumentDefaultsHelpFormatter(
else:
return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action)
def _build_arg_parser(prog='warcprox'):
def _build_arg_parser(prog='warcprox', show_hidden=False):
if show_hidden:
def suppress(msg):
return msg
else:
def suppress(msg):
return argparse.SUPPRESS
arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=BetterArgumentDefaultsHelpFormatter)
hidden = arg_parser.add_argument_group('hidden options')
arg_parser.add_argument(
'--help-hidden', action='help', default=argparse.SUPPRESS,
help='show help message, including help on hidden options, and exit')
arg_parser.add_argument('-p', '--port', dest='port', default='8000',
type=int, help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address',
@ -81,8 +94,12 @@ def _build_arg_parser(prog='warcprox'):
help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix',
default=False, action='store_true', help=argparse.SUPPRESS)
hidden.add_argument(
'--no-warc-open-suffix', dest='no_warc_open_suffix',
default=False, action='store_true',
help=suppress(
'do not name warc files with suffix ".open" while writing to '
'them, but lock them with lockf(3) intead'))
# not mentioned in --help: special value for '-' for --prefix means don't
# archive the capture, unless prefix set in warcprox-meta header
arg_parser.add_argument(
@ -146,43 +163,60 @@ def _build_arg_parser(prog='warcprox'):
'rethinkdb service registry table url; if provided, warcprox '
'will create and heartbeat entry for itself'))
# optional cookie values to pass to CDX Server; e.g. "cookie1=val1;cookie2=val2"
arg_parser.add_argument('--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies',
help=argparse.SUPPRESS)
hidden.add_argument(
'--cdxserver-dedup-cookies', dest='cdxserver_dedup_cookies',
help=suppress(
'value of Cookie header to include in requests to the cdx '
'server, when using --cdxserver-dedup'))
arg_parser.add_argument('--dedup-min-text-size', dest='dedup_min_text_size',
type=int, default=0,
help=('try to dedup text resources with payload size over this limit in bytes'))
arg_parser.add_argument('--dedup-min-binary-size', dest='dedup_min_binary_size',
type=int, default=0, help=(
'try to dedup binary resources with payload size over this limit in bytes'))
# optionally, dedup request only when `dedup-bucket` is available in
# Warcprox-Meta HTTP header. By default, we dedup all requests.
arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket',
action='store_true', default=False, help=argparse.SUPPRESS)
hidden.add_argument(
'--dedup-only-with-bucket', dest='dedup_only_with_bucket',
action='store_true', default=False, help=suppress(
'only deduplicate captures if "dedup-bucket" is set in '
'the Warcprox-Meta request header'))
arg_parser.add_argument('--blackout-period', dest='blackout_period',
type=int, default=0,
help='skip writing a revisit record if its too close to the original capture')
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS)
arg_parser.add_argument(
'--writer-threads', dest='writer_threads', type=int, default=None,
help=argparse.SUPPRESS)
hidden.add_argument(
'--queue-size', dest='queue_size', type=int, default=500,
help=suppress(
'maximum number of urls that can be queued at each '
'step of the processing chain (see the section on warcprox '
'architecture in README.rst)'))
hidden.add_argument(
'--max-threads', dest='max_threads', type=int, default=100,
help=suppress('maximum number of http worker threads'))
hidden.add_argument(
'--profile', action='store_true', default=False,
help=suppress(
'turn on performance profiling; summary statistics are dumped '
'every 10 minutes and at shutdown'))
hidden.add_argument(
'--writer-threads', dest='writer_threads', type=int, default=1,
help=suppress(
'number of warc writer threads; caution, see '
'https://github.com/internetarchive/warcprox/issues/101'))
arg_parser.add_argument(
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=(
'host:port of tor socks proxy, used only to connect to '
'.onion sites'))
# Configurable connection socket timeout, default is 60 sec.
arg_parser.add_argument(
'--socket-timeout', dest='socket_timeout', type=float,
default=None, help=argparse.SUPPRESS)
hidden.add_argument(
'--socket-timeout', dest='socket_timeout', type=float, default=60,
help=suppress(
'socket timeout, used for proxy client connection and for '
'connection to remote server'))
# Increasing this value increases memory usage but reduces /tmp disk I/O.
arg_parser.add_argument(
hidden.add_argument(
'--tmp-file-max-memory-size', dest='tmp_file_max_memory_size',
type=int, default=512*1024, help=argparse.SUPPRESS)
type=int, default=512*1024, help=suppress(
'size of in-memory buffer for each url being processed '
'(spills over to temp space on disk if exceeded)'))
arg_parser.add_argument(
'--max-resource-size', dest='max_resource_size', type=int,
default=None, help='maximum resource size limit in bytes')
@ -197,11 +231,18 @@ def _build_arg_parser(prog='warcprox'):
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
'May be used multiple times to register multiple plugins. '
'See README.rst for more information.'))
arg_parser.add_argument('--version', action='version',
arg_parser.add_argument(
'-q', '--quiet', dest='quiet', action='store_true',
help='less verbose logging')
arg_parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='verbose logging')
arg_parser.add_argument(
'--trace', dest='trace', action='store_true',
help='very verbose logging')
arg_parser.add_argument(
'--version', action='version',
version="warcprox {}".format(warcprox.__version__))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('--trace', dest='trace', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
return arg_parser
@ -227,7 +268,11 @@ def parse_args(argv):
'''
Parses command line arguments with argparse.
'''
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
show_hidden = False
if '--help-hidden' in argv:
show_hidden = True
argv = [argv[0], '--help-hidden']
arg_parser = _build_arg_parser(os.path.basename(argv[0]), show_hidden)
args = arg_parser.parse_args(args=argv[1:])
try:
@ -245,11 +290,11 @@ def main(argv=None):
args = parse_args(argv or sys.argv)
if args.trace:
loglevel = warcprox.TRACE
loglevel = logging.TRACE
elif args.verbose:
loglevel = logging.DEBUG
elif args.quiet:
loglevel = logging.WARNING
loglevel = logging.NOTICE
else:
loglevel = logging.INFO

View File

@ -250,7 +250,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
'''
self._conn_pool = self.server.remote_connection_pool.connection_from_host(
host=self.hostname, port=int(self.port), scheme='http',
pool_kwargs={'maxsize': 6})
pool_kwargs={'maxsize': 6, 'timeout': self._socket_timeout})
self._remote_server_conn = self._conn_pool._get_conn()
if is_connection_dropped(self._remote_server_conn):
@ -263,10 +263,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._remote_server_conn.sock.set_proxy(
socks.SOCKS5, addr=self.onion_tor_socks_proxy_host,
port=self.onion_tor_socks_proxy_port, rdns=True)
self._remote_server_conn.timeout = self._socket_timeout
self._remote_server_conn.sock.settimeout(self._socket_timeout)
self._remote_server_conn.sock.connect((self.hostname, int(self.port)))
else:
self._remote_server_conn.timeout = self._socket_timeout
self._remote_server_conn.connect()
# Wrap socket if SSL is required
@ -276,7 +275,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self._remote_server_conn.sock = context.wrap_socket(
self._remote_server_conn.sock, server_hostname=self.hostname)
self._remote_server_conn.sock,
server_hostname=self.hostname)
except AttributeError:
try:
self._remote_server_conn.sock = ssl.wrap_socket(
@ -502,10 +502,7 @@ class PooledMixIn(socketserver.ThreadingMixIn):
def __init__(self, max_threads=None):
self.active_requests = set()
self.unaccepted_requests = 0
if max_threads:
self.max_threads = max_threads
else:
self.max_threads = 100
self.max_threads = max_threads or 100
self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads)
self.logger.info("%s proxy threads", self.max_threads)
@ -595,11 +592,6 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
request_queue_size = 4096
def __init__(self, options=warcprox.Options()):
if options.max_threads:
self.logger.info(
'max_threads=%s set by command line option',
options.max_threads)
PooledMixIn.__init__(self, options.max_threads)
self.profilers = collections.defaultdict(cProfile.Profile)

View File

@ -287,9 +287,11 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
and (warc_type or 'WARC-Type' in self.headers)):
timestamp = datetime.datetime.utcnow()
request_data = tempfile.SpooledTemporaryFile(max_size=524288)
request_data = tempfile.SpooledTemporaryFile(
max_size=self._tmp_file_max_memory_size)
payload_digest = hashlib.new(self.server.digest_algorithm)
# XXX we don't support chunked uploads for now
length = int(self.headers['Content-Length'])
buf = self.rfile.read(min(65536, length - request_data.tell()))
while buf != b'':