more stats available from /status (and in rethindkb services table)

This commit is contained in:
Noah Levitt 2017-12-28 17:07:02 -08:00
parent a85c665ce9
commit c966f7f6e8
6 changed files with 151 additions and 27 deletions

View File

@ -52,7 +52,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.3.1b4.dev132',
version='2.3.1b4.dev133',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -149,12 +149,7 @@ class WarcproxController(object):
'ttl': self.HEARTBEAT_INTERVAL * 3,
'port': self.proxy.server_port,
}
status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / (
self.proxy.recorded_url_q.maxsize or 100)
status_info['queued_urls'] = self.proxy.recorded_url_q.qsize()
status_info['queue_max_size'] = self.proxy.recorded_url_q.maxsize
status_info['seconds_behind'] = self.proxy.recorded_url_q.seconds_behind()
status_info['threads'] = self.proxy.pool._max_workers
status_info.update(self.proxy.status())
self.status_info = self.service_registry.heartbeat(status_info)
self.logger.log(
@ -241,6 +236,9 @@ class WarcproxController(object):
try:
while not self.stop.is_set():
if self.proxy.running_stats:
self.proxy.running_stats.snap()
if self.service_registry and (
not hasattr(self, "status_info") or (
datetime.datetime.now(utc)

View File

@ -212,6 +212,8 @@ def init_controller(args):
exit(1)
listeners = []
running_stats = warcprox.stats.RunningStats()
listeners.append(running_stats)
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
@ -245,8 +247,9 @@ def init_controller(args):
ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir,
ca_name=ca_name)
proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q,
stats_db=stats_db, options=options)
proxy = warcprox.warcproxy.WarcProxy(
ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db,
running_stats=running_stats, options=options)
if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(

View File

@ -451,6 +451,7 @@ class PooledMixIn(socketserver.ThreadingMixIn):
on system resource limits.
'''
self.active_requests = set()
self.unaccepted_requests = 0
if not max_threads:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
@ -475,6 +476,17 @@ class PooledMixIn(socketserver.ThreadingMixIn):
self.max_threads = max_threads
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)
def status(self):
if hasattr(super(), 'status'):
result = super().status()
else:
result = {}
result.update({
'threads': self.pool._max_workers,
'active_requests': len(self.active_requests),
'unaccepted_requests': self.unaccepted_requests})
return result
def process_request(self, request, client_address):
self.active_requests.add(request)
future = self.pool.submit(
@ -503,12 +515,14 @@ class PooledMixIn(socketserver.ThreadingMixIn):
self.logger.trace(
'someone is connecting active_requests=%s',
len(self.active_requests))
self.unaccepted_requests += 1
while len(self.active_requests) > self.max_threads:
time.sleep(0.05)
res = self.socket.accept()
self.logger.trace(
'accepted after %.1f sec active_requests=%s socket=%s',
time.time() - start, len(self.active_requests), res[0])
self.unaccepted_requests -= 1
return res
class MitmProxy(http_server.HTTPServer):

View File

@ -21,18 +21,20 @@ USA.
from __future__ import absolute_import
from hanzo import warctools
import collections
import copy
import datetime
import doublethink
import json
import logging
import os
import json
from hanzo import warctools
import warcprox
import threading
import rethinkdb as r
import datetime
import urlcanon
import sqlite3
import copy
import doublethink
import threading
import time
import urlcanon
import warcprox
def _empty_bucket(bucket):
return {
@ -338,3 +340,83 @@ class RethinkStatsDb(StatsDb):
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
class RunningStats:
'''
In-memory stats for measuring overall warcprox performance.
'''
def __init__(self):
self.urls = 0
self.warc_bytes = 0
self._lock = threading.RLock()
self.first_snap_time = time.time()
# snapshot every minute since the beginning of time
self.minute_snaps = [(self.first_snap_time, 0, 0)]
# snapshot every 10 seconds for the last 2 minutes (fill with zeroes)
self.ten_sec_snaps = collections.deque()
for i in range(0, 13):
self.ten_sec_snaps.append(
(self.first_snap_time - 120 + i * 10, 0, 0))
def notify(self, recorded_url, records):
with self._lock:
self.urls += 1
if records:
self.warc_bytes += records[-1].offset + records[-1].length - records[0].offset
def snap(self):
now = time.time()
last_snap_time = self.minute_snaps[-1][0]
need_minute_snap = (now - self.first_snap_time) // 60 > (self.minute_snaps[-1][0] - self.first_snap_time) // 60
need_ten_sec_snap = (now - self.ten_sec_snaps[0][0]) // 10 > (self.ten_sec_snaps[-1][0] - self.ten_sec_snaps[0][0]) // 10
if need_minute_snap:
self.minute_snaps.append((now, self.urls, self.warc_bytes))
logging.debug('added minute snap %r', self.minute_snaps[-1])
if need_ten_sec_snap:
self.ten_sec_snaps.popleft()
self.ten_sec_snaps.append((now, self.urls, self.warc_bytes))
logging.trace('rotated in ten second snap %r', self.ten_sec_snaps[-1])
def _closest_ten_sec_snap(self, t):
# it's a deque so iterating over it is faster than indexed lookup
closest_snap = (0, 0, 0)
for snap in self.ten_sec_snaps:
if abs(t - snap[0]) < abs(t - closest_snap[0]):
closest_snap = snap
return closest_snap
def _closest_minute_snap(self, t):
minutes_ago = int((time.time() - t) / 60)
# jump to approximately where we expect the closest snap
i = max(0, len(self.minute_snaps) - minutes_ago)
# move back to the last one earlier than `t`
while self.minute_snaps[i][0] > t and i > 0:
i -= 1
closest_snap = self.minute_snaps[i]
# move forward until we start getting farther away from `t`
while i < len(self.minute_snaps):
if abs(t - self.minute_snaps[i][0]) <= abs(t - closest_snap[0]):
closest_snap = self.minute_snaps[i]
else:
break
i += 1
return closest_snap
def current_rates(self, time_period_minutes):
assert time_period_minutes > 0
with self._lock:
now = time.time()
urls = self.urls
warc_bytes = self.warc_bytes
t = now - time_period_minutes * 60
if time_period_minutes <= 2:
start_snap = self._closest_ten_sec_snap(t)
else:
start_snap = self._closest_minute_snap(t)
elapsed = now - start_snap[0]
logging.trace(
'elapsed=%0.1fs urls=%s warc_bytes=%s', elapsed,
urls - start_snap[1], warc_bytes - start_snap[2])
return elapsed, (urls - start_snap[1]) / elapsed, (warc_bytes - start_snap[2]) / elapsed

View File

@ -232,14 +232,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'host': socket.gethostname(),
'address': self.connection.getsockname()[0],
'port': self.connection.getsockname()[1],
'load': 1.0 * self.server.recorded_url_q.qsize() / (
self.server.recorded_url_q.maxsize or 100),
'queued_urls': self.server.recorded_url_q.qsize(),
'queue_max_size': self.server.recorded_url_q.maxsize,
'seconds_behind': self.server.recorded_url_q.seconds_behind(),
'pid': os.getpid(),
'threads': self.server.pool._max_workers,
}
status_info.update(self.server.status())
payload = json.dumps(
status_info, indent=2).encode('utf-8') + b'\n'
self.send_response(200, 'OK')
@ -381,7 +376,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
options=warcprox.Options()):
running_stats=None, options=warcprox.Options()):
server_address = (
options.address or 'localhost',
options.port if options.port is not None else 8000)
@ -415,15 +410,47 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
maxsize=options.queue_size or 1000)
self.stats_db = stats_db
self.running_stats = running_stats
self.options = options
def status(self):
if hasattr(super(), 'status'):
result = super().status()
else:
result = {}
result.update({
'load': 1.0 * self.recorded_url_q.qsize() / (
self.recorded_url_q.maxsize or 100),
'queued_urls': self.recorded_url_q.qsize(),
'queue_max_size': self.recorded_url_q.maxsize,
'seconds_behind': self.recorded_url_q.seconds_behind(),
})
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1)
result['rates_1min'] = {
'actual_elapsed': elapsed,
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(5)
result['rates_5min'] = {
'actual_elapsed': elapsed,
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(15)
result['rates_15min'] = {
'actual_elapsed': elapsed,
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
return result
class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
options=warcprox.Options()):
running_stats=None, options=warcprox.Options()):
if options.max_threads:
self.logger.info(
"max_threads=%s set by command line option",
@ -431,7 +458,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
warcprox.mitmproxy.PooledMitmProxy.__init__(
self, options.max_threads, options)
SingleThreadedWarcProxy.__init__(
self, ca, recorded_url_q, stats_db, options)
self, ca, recorded_url_q, stats_db, running_stats, options)
def server_activate(self):
http_server.HTTPServer.server_activate(self)