Merge branch 'master' into qa

* master:
  fix test in py<=3.4
  fix failing test, and change response code from 500 to more appropriate 502
  failing test for correct handling of "http.client.RemoteDisconnected: Remote end closed connection without response" from remote server
  fix oops
  better error message for bad WARCPROX_WRITE_RECORD request
  fix mistakes in warc write thread profile aggregation
  aggregate warc writer thread profiles much like we do for proxy threads
  have --profile profile proxy threads as well as warc writer threads
  hacky way to fix problem of benchmarks arguments getting stale
This commit is contained in:
Noah Levitt 2017-11-22 14:59:40 -08:00
commit 57b54885f5
9 changed files with 122 additions and 101 deletions

View File

@ -160,90 +160,9 @@ with different options.
Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
''' '''
arg_parser = argparse.ArgumentParser( arg_parser = warcprox.main._build_arg_parser()
prog=prog, description=desc, arg_parser.description = desc
formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter)
### these warcprox options are not configurable for the benchmarks
# arg_parser.add_argument('-p', '--port', dest='port', default='8000',
# type=int, help='port to listen on')
# arg_parser.add_argument('-b', '--address', dest='address',
# default='localhost', help='address to listen on')
# arg_parser.add_argument('-c', '--cacert', dest='cacert',
# default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
# help='CA certificate file; if file does not exist, it will be created')
# arg_parser.add_argument('--certs-dir', dest='certs_dir',
# default='./{0}-warcprox-ca'.format(socket.gethostname()),
# help='where to store and load generated certificates')
# arg_parser.add_argument('-d', '--dir', dest='directory',
# default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument(
'-s', '--size', dest='rollover_size', default=1000*1000*1000,
type=int, help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None, type=int,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
try:
hash_algos = hashlib.algorithms_guaranteed
except AttributeError:
hash_algos = hashlib.algorithms
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument(
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name',
default='captures', help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS)
arg_parser.add_argument(
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=(
'host:port of tor socks proxy, used only to connect to '
'.onion sites'))
arg_parser.add_argument(
'--plugin', metavar='PLUGIN_CLASS', dest='plugins',
action='append', help=(
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
'May be used multiple times to register multiple plugins. '
'Plugin classes are loaded from the regular python module '
'search path. They will be instantiated with no arguments and '
'must have a method `notify(self, recorded_url, records)` '
'which will be called for each url, after warc records have '
'been written.'))
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.__version__))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('--trace', dest='trace', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
arg_parser.add_argument( arg_parser.add_argument(
'--requests', dest='requests', type=int, default=200, '--requests', dest='requests', type=int, default=200,
help='number of urls to fetch') help='number of urls to fetch')
@ -253,6 +172,15 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
arg_parser.add_argument( arg_parser.add_argument(
'--skip-baseline', dest='skip_baseline', action='store_true', '--skip-baseline', dest='skip_baseline', action='store_true',
help='skip the baseline bechmarks') help='skip the baseline bechmarks')
# filter out options that are not configurable for the benchmarks
filtered = []
for action in arg_parser._action_groups[1]._group_actions:
if action.dest not in (
'port', 'address', 'cacert', 'certs_dir', 'directory'):
filtered.append(action)
arg_parser._action_groups[1]._group_actions = filtered
return arg_parser return arg_parser
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -52,7 +52,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.2.1b2.dev115', version='2.2.1b2.dev123',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -238,6 +238,9 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
raise Exception('bad path') raise Exception('bad path')
headers = b'HTTP/1.1 200 OK\r\n' + actual_headers + b'\r\n' headers = b'HTTP/1.1 200 OK\r\n' + actual_headers + b'\r\n'
logging.info('headers=%r payload=%r', headers, payload) logging.info('headers=%r payload=%r', headers, payload)
elif self.path == '/empty-response':
headers = b''
payload = b''
else: else:
payload = b'404 Not Found\n' payload = b'404 Not Found\n'
headers = (b'HTTP/1.1 404 Not Found\r\n' headers = (b'HTTP/1.1 404 Not Found\r\n'
@ -1661,6 +1664,20 @@ def test_long_warcprox_meta(
with pytest.raises(StopIteration): with pytest.raises(StopIteration):
next(rec_iter) next(rec_iter)
def test_empty_response(
warcprox_, http_daemon, https_daemon, archiving_proxies,
playback_proxies):
url = 'http://localhost:%s/empty-response' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies, verify=False)
assert response.status_code == 502
# this is the reason in python >= 3.5 but not in 3.4 and 2.7
# assert response.reason == 'Remote end closed connection without response'
url = 'https://localhost:%s/empty-response' % https_daemon.server_port
response = requests.get(url, proxies=archiving_proxies, verify=False)
assert response.status_code == 502
def test_payload_digest(warcprox_, http_daemon): def test_payload_digest(warcprox_, http_daemon):
''' '''
Tests that digest is of RFC2616 "entity body" Tests that digest is of RFC2616 "entity body"

View File

@ -92,7 +92,7 @@ class RequestBlockedByRule(Exception):
def __str__(self): def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.msg) return "%s: %s" % (self.__class__.__name__, self.msg)
# monkey-patch log level TRACE # monkey-patch log levels TRACE and NOTICE
TRACE = 5 TRACE = 5
import logging import logging
def _logging_trace(msg, *args, **kwargs): def _logging_trace(msg, *args, **kwargs):
@ -104,6 +104,17 @@ logging.trace = _logging_trace
logging.Logger.trace = _logger_trace logging.Logger.trace = _logger_trace
logging.addLevelName(TRACE, 'TRACE') logging.addLevelName(TRACE, 'TRACE')
NOTICE = (logging.INFO + logging.WARN) // 2
import logging
def _logging_notice(msg, *args, **kwargs):
logging.root.notice(msg, *args, **kwargs)
def _logger_notice(self, msg, *args, **kwargs):
if self.isEnabledFor(NOTICE):
self._log(NOTICE, msg, args, **kwargs)
logging.notice = _logging_notice
logging.Logger.notice = _logger_notice
logging.addLevelName(NOTICE, 'NOTICE')
import warcprox.controller as controller import warcprox.controller as controller
import warcprox.playback as playback import warcprox.playback as playback
import warcprox.dedup as dedup import warcprox.dedup as dedup

View File

@ -44,7 +44,7 @@ class WarcproxController(object):
Create warcprox controller. Create warcprox controller.
If supplied, `proxy` should be an instance of WarcProxy, and If supplied, `proxy` should be an instance of WarcProxy, and
`warc_writer_threads` should be an list of WarcWriterThread instances. `warc_writer_threads` should be a list of WarcWriterThread instances.
If not supplied, they are created with default values. If not supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If If supplied, playback_proxy should be an instance of PlaybackProxy. If
@ -254,6 +254,9 @@ class WarcproxController(object):
# last_mem_dbg = datetime.datetime.utcnow() # last_mem_dbg = datetime.datetime.utcnow()
time.sleep(0.5) time.sleep(0.5)
if self.options.profile:
self._dump_profiling()
except: except:
self.logger.critical( self.logger.critical(
"shutting down in response to fatal exception", "shutting down in response to fatal exception",
@ -262,3 +265,36 @@ class WarcproxController(object):
finally: finally:
self.shutdown() self.shutdown()
def _dump_profiling(self):
import pstats, tempfile, os, io
with tempfile.TemporaryDirectory() as tmpdir:
# proxy threads
files = []
for th_id, profiler in self.proxy.profilers.items():
file = os.path.join(tmpdir, '%s.dat' % th_id)
profiler.dump_stats(file)
files.append(file)
buf = io.StringIO()
stats = pstats.Stats(*files, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'aggregate performance profile of %s proxy threads:\n%s',
len(files), buf.getvalue())
# warc writer threads
files = []
for wwt in self.warc_writer_threads:
file = os.path.join(tmpdir, '%s.dat' % wwt.ident)
wwt.profiler.dump_stats(file)
files.append(file)
buf = io.StringIO()
stats = pstats.Stats(*files, stream=buf)
stats.sort_stats('cumulative')
stats.print_stats(0.1)
self.logger.notice(
'aggregate performance profile of %s warc writer threads:\n%s',
len(self.warc_writer_threads), buf.getvalue())

View File

@ -147,6 +147,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
help=argparse.SUPPRESS) help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False, arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS) help=argparse.SUPPRESS)
arg_parser.add_argument(
'--writer-threads', dest='writer_threads', type=int, default=None,
help=argparse.SUPPRESS)
arg_parser.add_argument( arg_parser.add_argument(
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=( default=None, help=(
@ -273,12 +276,14 @@ def init_controller(args):
# number of warc writer threads = sqrt(proxy.max_threads) # number of warc writer threads = sqrt(proxy.max_threads)
# I came up with this out of thin air because it strikes me as reasonable # I came up with this out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5)
logging.debug('initializing %d warc writer threads', num_writer_threads)
warc_writer_threads = [ warc_writer_threads = [
warcprox.writerthread.WarcWriterThread( warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q, name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
writer_pool=writer_pool, dedup_db=dedup_db, writer_pool=writer_pool, dedup_db=dedup_db,
listeners=listeners, options=options) listeners=listeners, options=options)
for i in range(int(proxy.max_threads ** 0.5))] for i in range(num_writer_threads)]
if args.rethinkdb_services_url: if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url( parsed = doublethink.parse_rethinkdb_url(

View File

@ -363,9 +363,12 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
try: try:
return self._proxy_request() return self._proxy_request()
except: except Exception as e:
self.logger.error("exception proxying request", exc_info=True) self.logger.error(
raise 'error from remote server(?) %r: %r',
self.requestline, e, exc_info=True)
self.send_error(502, str(e))
return
def _proxy_request(self, extra_response_headers={}): def _proxy_request(self, extra_response_headers={}):
''' '''
@ -425,10 +428,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
buf = prox_rec_res.read(65536) buf = prox_rec_res.read(65536)
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
except Exception as e:
self.logger.error(
"%r proxying %s %s", e, self.command, self.url,
exc_info=True)
finally: finally:
# Let's close off the remote end # Let's close off the remote end
if prox_rec_res: if prox_rec_res:
@ -541,14 +540,32 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
# This value is passed as the "backlog" argument to listen(2). The default # This value is passed as the "backlog" argument to listen(2). The default
# value from socketserver.TCPServer is 5. Increasing this value is part of # value from socketserver.TCPServer is 5. Increasing this value is part of
# the solution to client connections being closed suddenly and this message # the solution to client connections being closed suddenly and this message
# appearing in kernel log on linux: "TCP: request_sock_TCP: # Possible SYN # appearing in kernel log on linux: "TCP: request_sock_TCP: Possible SYN
# flooding on port 8000. Sending cookies. Check SNMP # counters." I think # flooding on port 8000. Sending cookies. Check SNMP counters." I think
# this comes into play because we don't always accept(2) immediately (see # this comes into play because we don't always accept(2) immediately (see
# PooledMixIn.get_request()). # PooledMixIn.get_request()).
# See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html # See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html
request_queue_size = 4096 request_queue_size = 4096
def process_request_thread(self, request, client_address): def __init__(self, max_threads, options=warcprox.Options()):
PooledMixIn.__init__(self, max_threads)
self.profilers = {}
if options.profile:
self.process_request_thread = self._profile_process_request_thread
else:
self.process_request_thread = self._process_request_thread
def _profile_process_request_thread(self, request, client_address):
if not threading.current_thread().ident in self.profilers:
import cProfile
self.profilers[threading.current_thread().ident] = cProfile.Profile()
profiler = self.profilers[threading.current_thread().ident]
profiler.enable()
self._process_request_thread(request, client_address)
profiler.disable()
def _process_request_thread(self, request, client_address):
''' '''
This an almost verbatim copy/paste of This an almost verbatim copy/paste of
socketserver.ThreadingMixIn.process_request_thread. socketserver.ThreadingMixIn.process_request_thread.

View File

@ -311,7 +311,10 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self.server.recorded_url_q.put(rec_custom) self.server.recorded_url_q.put(rec_custom)
self.send_response(204, 'OK') self.send_response(204, 'OK')
else: else:
self.send_error(400, 'Bad request') self.send_error(400, message='Bad request', explain=(
'Bad request. WARC-Type, Content-Length, and Content-Type '
'request headers required for WARCPROX_WRITE_RECORD '
'request.'))
self.end_headers() self.end_headers()
except: except:
@ -425,7 +428,8 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
self.logger.info( self.logger.info(
"max_threads=%s set by command line option", "max_threads=%s set by command line option",
options.max_threads) options.max_threads)
warcprox.mitmproxy.PooledMitmProxy.__init__(self, options.max_threads) warcprox.mitmproxy.PooledMitmProxy.__init__(
self, options.max_threads, options)
SingleThreadedWarcProxy.__init__( SingleThreadedWarcProxy.__init__(
self, ca, recorded_url_q, stats_db, options) self, ca, recorded_url_q, stats_db, options)

View File

@ -33,7 +33,6 @@ import time
from datetime import datetime from datetime import datetime
from hanzo import warctools from hanzo import warctools
import warcprox import warcprox
import cProfile
import sys import sys
class WarcWriterThread(threading.Thread): class WarcWriterThread(threading.Thread):
@ -59,7 +58,11 @@ class WarcWriterThread(threading.Thread):
def run(self): def run(self):
if self.options.profile: if self.options.profile:
cProfile.runctx('self._run()', globals(), locals(), sort='cumulative') import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else: else:
self._run() self._run()