From f1362e4da037f83006998efaafff13549ddde268 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 28 Oct 2015 23:38:15 +0000 Subject: [PATCH] use only one worker thread for asynchronous rethinkdb stats updates, to fix race condition causing some numbers to be lost --- warcprox/stats.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/warcprox/stats.py b/warcprox/stats.py index 9be1b54..660d4c7 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -120,7 +120,10 @@ class RethinkStatsDb: self.replicas = replicas or min(3, len(r.servers)) self._ensure_db_table() self.options = options - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) + + # only one worker thread to ensure consistency, see + # https://rethinkdb.com/docs/consistency/ + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def _ensure_db_table(self): dbs = self.r.db_list().run() @@ -156,7 +159,7 @@ class RethinkStatsDb: def _tally(self, buckets, size, is_revisit): try: - self.logger.info("starting task self._tally(%s)", (buckets, size, is_revisit)) + self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit)) for bucket in buckets: bucket_stats = self.value(bucket) or _empty_bucket(bucket) @@ -175,7 +178,7 @@ class RethinkStatsDb: if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) - self.logger.info("finished task self._tally(%s)", (buckets, size, is_revisit)) + self.logger.debug("finished task self._tally(%s)", (buckets, size, is_revisit)) except: self.logger.error("unexpected problem tallying stats", exc_info=True) @@ -198,6 +201,6 @@ class RethinkStatsDb: def notify(self, recorded_url, records): args = self._extract_stats_info(recorded_url, records) - self.logger.info("submitting task self._tally(%s)", args) + self.logger.debug("submitting task self._tally(%s)", args) self._executor.submit(self._tally, *args)