From c966f7f6e8afdc20a3183a73a61213e61504be23 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 28 Dec 2017 17:07:02 -0800 Subject: [PATCH] more stats available from /status (and in rethindkb services table) --- setup.py | 2 +- warcprox/controller.py | 10 ++--- warcprox/main.py | 7 ++- warcprox/mitmproxy.py | 14 ++++++ warcprox/stats.py | 98 ++++++++++++++++++++++++++++++++++++++---- warcprox/warcproxy.py | 47 +++++++++++++++----- 6 files changed, 151 insertions(+), 27 deletions(-) diff --git a/setup.py b/setup.py index 86fdc12..9c63748 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.3.1b4.dev132', + version='2.3.1b4.dev133', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index 22e0328..2b0c8a8 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -149,12 +149,7 @@ class WarcproxController(object): 'ttl': self.HEARTBEAT_INTERVAL * 3, 'port': self.proxy.server_port, } - status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / ( - self.proxy.recorded_url_q.maxsize or 100) - status_info['queued_urls'] = self.proxy.recorded_url_q.qsize() - status_info['queue_max_size'] = self.proxy.recorded_url_q.maxsize - status_info['seconds_behind'] = self.proxy.recorded_url_q.seconds_behind() - status_info['threads'] = self.proxy.pool._max_workers + status_info.update(self.proxy.status()) self.status_info = self.service_registry.heartbeat(status_info) self.logger.log( @@ -241,6 +236,9 @@ class WarcproxController(object): try: while not self.stop.is_set(): + if self.proxy.running_stats: + self.proxy.running_stats.snap() + if self.service_registry and ( not hasattr(self, "status_info") or ( datetime.datetime.now(utc) diff --git a/warcprox/main.py b/warcprox/main.py index 47c4bb4..348dfbf 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -212,6 +212,8 @@ def init_controller(args): exit(1) listeners = [] + running_stats = warcprox.stats.RunningStats() + listeners.append(running_stats) if args.rethinkdb_dedup_url: dedup_db = warcprox.dedup.RethinkDedupDb(options=options) @@ -245,8 +247,9 @@ def init_controller(args): ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, ca_name=ca_name) - proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q, - stats_db=stats_db, options=options) + proxy = warcprox.warcproxy.WarcProxy( + ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db, + running_stats=running_stats, options=options) if args.playback_port is not None: playback_index_db = warcprox.playback.PlaybackIndexDb( diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index bfe3a7d..130196a 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -451,6 +451,7 @@ class PooledMixIn(socketserver.ThreadingMixIn): on system resource limits. ''' self.active_requests = set() + self.unaccepted_requests = 0 if not max_threads: # man getrlimit: "RLIMIT_NPROC The maximum number of processes (or, # more precisely on Linux, threads) that can be created for the @@ -475,6 +476,17 @@ class PooledMixIn(socketserver.ThreadingMixIn): self.max_threads = max_threads self.pool = concurrent.futures.ThreadPoolExecutor(max_threads) + def status(self): + if hasattr(super(), 'status'): + result = super().status() + else: + result = {} + result.update({ + 'threads': self.pool._max_workers, + 'active_requests': len(self.active_requests), + 'unaccepted_requests': self.unaccepted_requests}) + return result + def process_request(self, request, client_address): self.active_requests.add(request) future = self.pool.submit( @@ -503,12 +515,14 @@ class PooledMixIn(socketserver.ThreadingMixIn): self.logger.trace( 'someone is connecting active_requests=%s', len(self.active_requests)) + self.unaccepted_requests += 1 while len(self.active_requests) > self.max_threads: time.sleep(0.05) res = self.socket.accept() self.logger.trace( 'accepted after %.1f sec active_requests=%s socket=%s', time.time() - start, len(self.active_requests), res[0]) + self.unaccepted_requests -= 1 return res class MitmProxy(http_server.HTTPServer): diff --git a/warcprox/stats.py b/warcprox/stats.py index 910c794..6047443 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -21,18 +21,20 @@ USA. from __future__ import absolute_import +from hanzo import warctools +import collections +import copy +import datetime +import doublethink +import json import logging import os -import json -from hanzo import warctools -import warcprox -import threading import rethinkdb as r -import datetime -import urlcanon import sqlite3 -import copy -import doublethink +import threading +import time +import urlcanon +import warcprox def _empty_bucket(bucket): return { @@ -338,3 +340,83 @@ class RethinkStatsDb(StatsDb): def notify(self, recorded_url, records): self.tally(recorded_url, records) +class RunningStats: + ''' + In-memory stats for measuring overall warcprox performance. + ''' + def __init__(self): + self.urls = 0 + self.warc_bytes = 0 + self._lock = threading.RLock() + self.first_snap_time = time.time() + # snapshot every minute since the beginning of time + self.minute_snaps = [(self.first_snap_time, 0, 0)] + # snapshot every 10 seconds for the last 2 minutes (fill with zeroes) + self.ten_sec_snaps = collections.deque() + for i in range(0, 13): + self.ten_sec_snaps.append( + (self.first_snap_time - 120 + i * 10, 0, 0)) + + def notify(self, recorded_url, records): + with self._lock: + self.urls += 1 + if records: + self.warc_bytes += records[-1].offset + records[-1].length - records[0].offset + + def snap(self): + now = time.time() + last_snap_time = self.minute_snaps[-1][0] + need_minute_snap = (now - self.first_snap_time) // 60 > (self.minute_snaps[-1][0] - self.first_snap_time) // 60 + need_ten_sec_snap = (now - self.ten_sec_snaps[0][0]) // 10 > (self.ten_sec_snaps[-1][0] - self.ten_sec_snaps[0][0]) // 10 + if need_minute_snap: + self.minute_snaps.append((now, self.urls, self.warc_bytes)) + logging.debug('added minute snap %r', self.minute_snaps[-1]) + if need_ten_sec_snap: + self.ten_sec_snaps.popleft() + self.ten_sec_snaps.append((now, self.urls, self.warc_bytes)) + logging.trace('rotated in ten second snap %r', self.ten_sec_snaps[-1]) + + def _closest_ten_sec_snap(self, t): + # it's a deque so iterating over it is faster than indexed lookup + closest_snap = (0, 0, 0) + for snap in self.ten_sec_snaps: + if abs(t - snap[0]) < abs(t - closest_snap[0]): + closest_snap = snap + return closest_snap + + def _closest_minute_snap(self, t): + minutes_ago = int((time.time() - t) / 60) + # jump to approximately where we expect the closest snap + i = max(0, len(self.minute_snaps) - minutes_ago) + # move back to the last one earlier than `t` + while self.minute_snaps[i][0] > t and i > 0: + i -= 1 + closest_snap = self.minute_snaps[i] + # move forward until we start getting farther away from `t` + while i < len(self.minute_snaps): + if abs(t - self.minute_snaps[i][0]) <= abs(t - closest_snap[0]): + closest_snap = self.minute_snaps[i] + else: + break + i += 1 + return closest_snap + + def current_rates(self, time_period_minutes): + assert time_period_minutes > 0 + with self._lock: + now = time.time() + urls = self.urls + warc_bytes = self.warc_bytes + + t = now - time_period_minutes * 60 + if time_period_minutes <= 2: + start_snap = self._closest_ten_sec_snap(t) + else: + start_snap = self._closest_minute_snap(t) + + elapsed = now - start_snap[0] + logging.trace( + 'elapsed=%0.1fs urls=%s warc_bytes=%s', elapsed, + urls - start_snap[1], warc_bytes - start_snap[2]) + return elapsed, (urls - start_snap[1]) / elapsed, (warc_bytes - start_snap[2]) / elapsed + diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index d0e9520..eec5e50 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -232,14 +232,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): 'host': socket.gethostname(), 'address': self.connection.getsockname()[0], 'port': self.connection.getsockname()[1], - 'load': 1.0 * self.server.recorded_url_q.qsize() / ( - self.server.recorded_url_q.maxsize or 100), - 'queued_urls': self.server.recorded_url_q.qsize(), - 'queue_max_size': self.server.recorded_url_q.maxsize, - 'seconds_behind': self.server.recorded_url_q.seconds_behind(), 'pid': os.getpid(), - 'threads': self.server.pool._max_workers, } + status_info.update(self.server.status()) payload = json.dumps( status_info, indent=2).encode('utf-8') + b'\n' self.send_response(200, 'OK') @@ -381,7 +376,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): def __init__( self, ca=None, recorded_url_q=None, stats_db=None, - options=warcprox.Options()): + running_stats=None, options=warcprox.Options()): server_address = ( options.address or 'localhost', options.port if options.port is not None else 8000) @@ -415,15 +410,47 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): maxsize=options.queue_size or 1000) self.stats_db = stats_db - + self.running_stats = running_stats self.options = options + def status(self): + if hasattr(super(), 'status'): + result = super().status() + else: + result = {} + result.update({ + 'load': 1.0 * self.recorded_url_q.qsize() / ( + self.recorded_url_q.maxsize or 100), + 'queued_urls': self.recorded_url_q.qsize(), + 'queue_max_size': self.recorded_url_q.maxsize, + 'seconds_behind': self.recorded_url_q.seconds_behind(), + }) + elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1) + result['rates_1min'] = { + 'actual_elapsed': elapsed, + 'urls_per_sec': urls_per_sec, + 'warc_bytes_per_sec': warc_bytes_per_sec, + } + elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(5) + result['rates_5min'] = { + 'actual_elapsed': elapsed, + 'urls_per_sec': urls_per_sec, + 'warc_bytes_per_sec': warc_bytes_per_sec, + } + elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(15) + result['rates_15min'] = { + 'actual_elapsed': elapsed, + 'urls_per_sec': urls_per_sec, + 'warc_bytes_per_sec': warc_bytes_per_sec, + } + return result + class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__( self, ca=None, recorded_url_q=None, stats_db=None, - options=warcprox.Options()): + running_stats=None, options=warcprox.Options()): if options.max_threads: self.logger.info( "max_threads=%s set by command line option", @@ -431,7 +458,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): warcprox.mitmproxy.PooledMitmProxy.__init__( self, options.max_threads, options) SingleThreadedWarcProxy.__init__( - self, ca, recorded_url_q, stats_db, options) + self, ca, recorded_url_q, stats_db, running_stats, options) def server_activate(self): http_server.HTTPServer.server_activate(self)