make sure batch insert timer thread survives rethinkdb outages

This commit is contained in:
Noah Levitt 2016-03-18 02:06:07 +00:00
parent 42a81d8f8f
commit 6b6c0b3bac

View File

@ -31,22 +31,28 @@ class RethinkCaptures:
self._insert_batch() # starts repeating timer self._insert_batch() # starts repeating timer
def _insert_batch(self): def _insert_batch(self):
with self._batch_lock: try:
if len(self._batch) > 0: with self._batch_lock:
result = self.r.table(self.table).insert(self._batch).run() if len(self._batch) > 0:
if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]: result = self.r.table(self.table).insert(self._batch).run()
raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch)) if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]:
self.logger.info("saved %s entries to big capture table db", len(self._batch)) raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch))
self._batch = [] self.logger.info("saved %s entries to big capture table db", len(self._batch))
self._batch = []
if not self._stop.is_set(): except BaseException as e:
t = threading.Timer(0.5, self._insert_batch) self.logger.error(
t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat() "caught exception trying to save %s entries, they will "
t.start() "be included in the next batch", len(self._batch),
# ensure self._timer joinable (already started) whenever close() happens to be called exc_info=True)
self._timer = t finally:
else: if not self._stop.is_set():
self.logger.info("finished") t = threading.Timer(0.5, self._insert_batch)
t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat()
t.start()
# ensure self._timer joinable (already started) whenever close() happens to be called
self._timer = t
else:
self.logger.info("finished")
def _ensure_db_table(self): def _ensure_db_table(self):
dbs = self.r.db_list().run() dbs = self.r.db_list().run()