From 6b6c0b3bac56bf834981127b88c3cab245a88040 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 18 Mar 2016 02:06:07 +0000 Subject: [PATCH] make sure batch insert timer thread survives rethinkdb outages --- warcprox/bigtable.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 9bb3d6d..8aea52c 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -31,22 +31,28 @@ class RethinkCaptures: self._insert_batch() # starts repeating timer def _insert_batch(self): - with self._batch_lock: - if len(self._batch) > 0: - result = self.r.table(self.table).insert(self._batch).run() - if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]: - raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch)) - self.logger.info("saved %s entries to big capture table db", len(self._batch)) - self._batch = [] - - if not self._stop.is_set(): - t = threading.Timer(0.5, self._insert_batch) - t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat() - t.start() - # ensure self._timer joinable (already started) whenever close() happens to be called - self._timer = t - else: - self.logger.info("finished") + try: + with self._batch_lock: + if len(self._batch) > 0: + result = self.r.table(self.table).insert(self._batch).run() + if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]: + raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch)) + self.logger.info("saved %s entries to big capture table db", len(self._batch)) + self._batch = [] + except BaseException as e: + self.logger.error( + "caught exception trying to save %s entries, they will " + "be included in the next batch", len(self._batch), + exc_info=True) + finally: + if not self._stop.is_set(): + t = threading.Timer(0.5, self._insert_batch) + t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat() + t.start() + # ensure self._timer joinable (already started) whenever close() happens to be called + self._timer = t + else: + self.logger.info("finished") def _ensure_db_table(self): dbs = self.r.db_list().run()