mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
use only one worker thread for asynchronous rethinkdb stats updates, to fix race condition causing some numbers to be lost
This commit is contained in:
parent
4930cc2d24
commit
f1362e4da0
@ -120,7 +120,10 @@ class RethinkStatsDb:
|
|||||||
self.replicas = replicas or min(3, len(r.servers))
|
self.replicas = replicas or min(3, len(r.servers))
|
||||||
self._ensure_db_table()
|
self._ensure_db_table()
|
||||||
self.options = options
|
self.options = options
|
||||||
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
||||||
|
# only one worker thread to ensure consistency, see
|
||||||
|
# https://rethinkdb.com/docs/consistency/
|
||||||
|
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||||
|
|
||||||
def _ensure_db_table(self):
|
def _ensure_db_table(self):
|
||||||
dbs = self.r.db_list().run()
|
dbs = self.r.db_list().run()
|
||||||
@ -156,7 +159,7 @@ class RethinkStatsDb:
|
|||||||
|
|
||||||
def _tally(self, buckets, size, is_revisit):
|
def _tally(self, buckets, size, is_revisit):
|
||||||
try:
|
try:
|
||||||
self.logger.info("starting task self._tally(%s)", (buckets, size, is_revisit))
|
self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit))
|
||||||
for bucket in buckets:
|
for bucket in buckets:
|
||||||
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
||||||
|
|
||||||
@ -175,7 +178,7 @@ class RethinkStatsDb:
|
|||||||
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||||
raise Exception("unexpected result %s saving %s", result, record)
|
raise Exception("unexpected result %s saving %s", result, record)
|
||||||
|
|
||||||
self.logger.info("finished task self._tally(%s)", (buckets, size, is_revisit))
|
self.logger.debug("finished task self._tally(%s)", (buckets, size, is_revisit))
|
||||||
except:
|
except:
|
||||||
self.logger.error("unexpected problem tallying stats", exc_info=True)
|
self.logger.error("unexpected problem tallying stats", exc_info=True)
|
||||||
|
|
||||||
@ -198,6 +201,6 @@ class RethinkStatsDb:
|
|||||||
|
|
||||||
def notify(self, recorded_url, records):
|
def notify(self, recorded_url, records):
|
||||||
args = self._extract_stats_info(recorded_url, records)
|
args = self._extract_stats_info(recorded_url, records)
|
||||||
self.logger.info("submitting task self._tally(%s)", args)
|
self.logger.debug("submitting task self._tally(%s)", args)
|
||||||
self._executor.submit(self._tally, *args)
|
self._executor.submit(self._tally, *args)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user