update stats batch every 0.5 seconds, since rethinkdb updates were falling way behind sometimes

This commit is contained in:
Noah Levitt 2015-11-05 02:23:36 +00:00
parent 783e730e52
commit 9af17ba7c3

View File

@ -14,7 +14,8 @@ import json
from hanzo import warctools
import random
import warcprox
import concurrent.futures
import threading
import rethinkdb as r
def _empty_bucket(bucket):
return {
@ -111,19 +112,58 @@ class StatsDb:
self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8")
class RethinkStatsDb:
"""Updates database in batch every 0.5 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, r, table="stats", shards=None, replicas=None, options=warcprox.Options()):
self.r = r
def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()):
self.r = rethinker
self.table = table
self.shards = shards or len(r.servers)
self.replicas = replicas or min(3, len(r.servers))
self.shards = shards or 1 # 1 shard by default because it's probably a small table
self.replicas = replicas or min(3, len(self.r.servers))
self._ensure_db_table()
self.options = options
# only one worker thread to ensure consistency, see
# https://rethinkdb.com/docs/consistency/
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
self._stop = threading.Event()
self._batch_lock = threading.RLock()
with self._batch_lock:
self._batch = {}
self._update_batch() # starts repeating timer
def _update_batch(self):
with self._batch_lock:
if len(self._batch) > 0:
# XXX can this be done in one query?
# r.db("archiveit_brozzler").table("test00").get_all(*["foo01","foo"])...
# >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn)
for k in self._batch:
result = self.r.table(self.table).get(k).replace(
lambda old: r.branch(old.eq(None), self._batch[k], old.merge(
{
"total": {
"urls": old["total"]["urls"].add(self._batch[k]["total"]["urls"]),
"wire_bytes": old["total"]["wire_bytes"].add(self._batch[k]["total"]["wire_bytes"]),
},
"new": {
"urls": old["new"]["urls"].add(self._batch[k]["new"]["urls"]),
"wire_bytes": old["new"]["wire_bytes"].add(self._batch[k]["new"]["wire_bytes"]),
},
"revisit": {
"urls": old["revisit"]["urls"].add(self._batch[k]["revisit"]["urls"]),
"wire_bytes": old["revisit"]["wire_bytes"].add(self._batch[k]["revisit"]["wire_bytes"]),
},
}
))).run()
if not result["inserted"] and not result["replaced"] or sorted(result.values()) != [0,0,0,0,0,1]:
raise Exception("unexpected result %s updating stats %s" % (result, self._batch[k]))
self._batch = {}
if not self._stop.is_set():
self._timer = threading.Timer(0.5, self._update_batch)
self._timer.name = "RethinkCaptures-batch-insert-timer"
self._timer.start()
else:
self.logger.info("finished")
def _ensure_db_table(self):
dbs = self.r.db_list().run()
@ -137,18 +177,9 @@ class RethinkStatsDb:
self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run()
def close(self):
self._executor.shutdown(wait=False)
last_update = 0
while True:
time.sleep(0.5)
remaining_estimate = self._executor._work_queue.qsize() + self._executor._max_workers/2
if remaining_estimate < self._executor._max_workers:
break
if time.time() - last_update >= 30:
self.logger.info("waiting for ~%s tasks to finish", remaining_estimate)
last_update = time.time()
self._executor.shutdown(wait=True)
self.logger.info("all tasks finished")
self.logger.info("closing rethinkdb stats table")
self._stop.set()
self._timer.join()
def sync(self):
pass
@ -164,15 +195,10 @@ class RethinkStatsDb:
return bucket0_stats[bucket1]
return bucket0_stats
# >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn)
def _tally(self, buckets, size, is_revisit):
try:
threading.current_thread.name = 'RethinkStatsDb-futures-thread(tid={})'.format(warcprox.gettid())
self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit))
with self._batch_lock:
for bucket in buckets:
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
bucket_stats = self._batch.setdefault(bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += size
@ -184,15 +210,6 @@ class RethinkStatsDb:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += size
self.logger.debug("saving %s", bucket_stats)
result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run()
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug("finished task self._tally(%s)", (buckets, size, is_revisit))
except:
self.logger.error("unexpected problem tallying stats", exc_info=True)
def _extract_stats_info(self, recorded_url, records):
buckets = ["__all__"]
@ -208,10 +225,8 @@ class RethinkStatsDb:
return buckets, recorded_url.size, is_revisit
def tally(self, recorded_url, records):
self._tally(self._extract_stats_info(recorded_url, records))
self._tally(*self._extract_stats_info(recorded_url, records))
def notify(self, recorded_url, records):
args = self._extract_stats_info(recorded_url, records)
self.logger.debug("submitting task self._tally(%s)", args)
self._executor.submit(self._tally, *args)
self.tally(recorded_url, records)