From 9af17ba7c339f435548b4a549628a60655260ab5 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 5 Nov 2015 02:23:36 +0000 Subject: [PATCH] update stats batch every 0.5 seconds, since rethinkdb updates were falling way behind sometimes --- warcprox/stats.py | 95 +++++++++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/warcprox/stats.py b/warcprox/stats.py index 750ba5d..44b724b 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -14,7 +14,8 @@ import json from hanzo import warctools import random import warcprox -import concurrent.futures +import threading +import rethinkdb as r def _empty_bucket(bucket): return { @@ -111,19 +112,58 @@ class StatsDb: self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8") class RethinkStatsDb: + """Updates database in batch every 0.5 seconds""" logger = logging.getLogger("warcprox.stats.RethinkStatsDb") - def __init__(self, r, table="stats", shards=None, replicas=None, options=warcprox.Options()): - self.r = r + def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()): + self.r = rethinker self.table = table - self.shards = shards or len(r.servers) - self.replicas = replicas or min(3, len(r.servers)) + self.shards = shards or 1 # 1 shard by default because it's probably a small table + self.replicas = replicas or min(3, len(self.r.servers)) self._ensure_db_table() self.options = options - # only one worker thread to ensure consistency, see - # https://rethinkdb.com/docs/consistency/ - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + self._stop = threading.Event() + self._batch_lock = threading.RLock() + with self._batch_lock: + self._batch = {} + + self._update_batch() # starts repeating timer + + def _update_batch(self): + with self._batch_lock: + if len(self._batch) > 0: + # XXX can this be done in one query? + # r.db("archiveit_brozzler").table("test00").get_all(*["foo01","foo"])... + # >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn) + for k in self._batch: + result = self.r.table(self.table).get(k).replace( + lambda old: r.branch(old.eq(None), self._batch[k], old.merge( + { + "total": { + "urls": old["total"]["urls"].add(self._batch[k]["total"]["urls"]), + "wire_bytes": old["total"]["wire_bytes"].add(self._batch[k]["total"]["wire_bytes"]), + }, + "new": { + "urls": old["new"]["urls"].add(self._batch[k]["new"]["urls"]), + "wire_bytes": old["new"]["wire_bytes"].add(self._batch[k]["new"]["wire_bytes"]), + }, + "revisit": { + "urls": old["revisit"]["urls"].add(self._batch[k]["revisit"]["urls"]), + "wire_bytes": old["revisit"]["wire_bytes"].add(self._batch[k]["revisit"]["wire_bytes"]), + }, + } + ))).run() + if not result["inserted"] and not result["replaced"] or sorted(result.values()) != [0,0,0,0,0,1]: + raise Exception("unexpected result %s updating stats %s" % (result, self._batch[k])) + self._batch = {} + + if not self._stop.is_set(): + self._timer = threading.Timer(0.5, self._update_batch) + self._timer.name = "RethinkCaptures-batch-insert-timer" + self._timer.start() + else: + self.logger.info("finished") def _ensure_db_table(self): dbs = self.r.db_list().run() @@ -137,18 +177,9 @@ class RethinkStatsDb: self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run() def close(self): - self._executor.shutdown(wait=False) - last_update = 0 - while True: - time.sleep(0.5) - remaining_estimate = self._executor._work_queue.qsize() + self._executor._max_workers/2 - if remaining_estimate < self._executor._max_workers: - break - if time.time() - last_update >= 30: - self.logger.info("waiting for ~%s tasks to finish", remaining_estimate) - last_update = time.time() - self._executor.shutdown(wait=True) - self.logger.info("all tasks finished") + self.logger.info("closing rethinkdb stats table") + self._stop.set() + self._timer.join() def sync(self): pass @@ -164,15 +195,10 @@ class RethinkStatsDb: return bucket0_stats[bucket1] return bucket0_stats - - # >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn) - def _tally(self, buckets, size, is_revisit): - try: - threading.current_thread.name = 'RethinkStatsDb-futures-thread(tid={})'.format(warcprox.gettid()) - self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit)) + with self._batch_lock: for bucket in buckets: - bucket_stats = self.value(bucket) or _empty_bucket(bucket) + bucket_stats = self._batch.setdefault(bucket, _empty_bucket(bucket)) bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += size @@ -184,15 +210,6 @@ class RethinkStatsDb: bucket_stats["new"]["urls"] += 1 bucket_stats["new"]["wire_bytes"] += size - self.logger.debug("saving %s", bucket_stats) - result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run() - if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: - raise Exception("unexpected result %s saving %s", result, record) - - self.logger.debug("finished task self._tally(%s)", (buckets, size, is_revisit)) - except: - self.logger.error("unexpected problem tallying stats", exc_info=True) - def _extract_stats_info(self, recorded_url, records): buckets = ["__all__"] @@ -208,10 +225,8 @@ class RethinkStatsDb: return buckets, recorded_url.size, is_revisit def tally(self, recorded_url, records): - self._tally(self._extract_stats_info(recorded_url, records)) + self._tally(*self._extract_stats_info(recorded_url, records)) def notify(self, recorded_url, records): - args = self._extract_stats_info(recorded_url, records) - self.logger.debug("submitting task self._tally(%s)", args) - self._executor.submit(self._tally, *args) + self.tally(recorded_url, records)