recover properly from exception updating stats in rethinkdb

This commit is contained in:
Noah Levitt 2017-06-12 16:51:45 -07:00
parent 1500341875
commit 808950abb4
2 changed files with 23 additions and 20 deletions

View File

@ -50,7 +50,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.1b1.dev88', version='2.1b1.dev89',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -232,25 +232,28 @@ class RethinkStatsDb(StatsDb):
}))) })))
def _update_batch(self): def _update_batch(self):
with self._batch_lock: try:
if len(self._batch) > 0: with self._batch_lock:
# XXX can all the buckets be done in one query? if len(self._batch) > 0:
for bucket in self._batch: # XXX can all the buckets be done in one query?
result = self._bucket_batch_update_reql(bucket).run() for bucket in self._batch:
if (not result["inserted"] and not result["replaced"] result = self._bucket_batch_update_reql(bucket).run()
or sorted(result.values()) != [0,0,0,0,0,1]): if (not result["inserted"] and not result["replaced"]
raise Exception( or sorted(result.values()) != [0,0,0,0,0,1]):
"unexpected result %s updating stats %s" % ( raise Exception(
result, self._batch[bucket])) "unexpected result %s updating stats %s" % (
self._batch = {} result, self._batch[bucket]))
self._batch = {}
if not self._stop.is_set(): except Exception as e:
self._timer = threading.Timer(2.0, self._update_batch) self.logger.error("problem updating stats", exc_info=True)
self._timer.name = "RethinkStats-batch-update-timer-%s" % ( finally:
datetime.datetime.utcnow().isoformat()) if not self._stop.is_set():
self._timer.start() self._timer = threading.Timer(2.0, self._update_batch)
else: self._timer.name = "RethinkStats-batch-update-timer-%s" % (
self.logger.info("finished") datetime.datetime.utcnow().isoformat())
self._timer.start()
else:
self.logger.info("finished")
def _ensure_db_table(self): def _ensure_db_table(self):
dbs = self.rr.db_list().run() dbs = self.rr.db_list().run()