diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 9fe0e6e..4feeb7d 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -9,6 +9,7 @@ import surt import os import hashlib import threading +import datetime class RethinkCaptures: """Inserts in batches every 0.5 seconds""" @@ -34,14 +35,15 @@ class RethinkCaptures: result = self.r.table(self.table).insert(self._batch).run() if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]: raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch)) - self.logger.info("big capture table db saved %s entries", len(self._batch)) - self.logger.info("saved %s", self._batch) + self.logger.info("saved %s entries to big capture table db", len(self._batch)) self._batch = [] if not self._stop.is_set(): - self._timer = threading.Timer(0.5, self._insert_batch) - self._timer.name = "RethinkCaptures-batch-insert-timer" - self._timer.start() + t = threading.Timer(0.5, self._insert_batch) + t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat() + t.start() + # ensure self._timer joinable (already started) whenever close() happens to be called + self._timer = t else: self.logger.info("finished") @@ -65,7 +67,7 @@ class RethinkCaptures: results = list(results_iter) if len(results) > 0: if len(results) > 1: - self.logger.error("expected 0 or 1 but found %s results for sha1base32=%s bucket=%s (will use first result)", len(results), sha1base32, bucket) + self.logger.debug("expected 0 or 1 but found %s results for sha1base32=%s bucket=%s (will use first result)", len(results), sha1base32, bucket) result = results[0] else: result = None @@ -112,16 +114,6 @@ class RethinkCaptures: return entry - def _save_entry(self, entry): - try: - threading.current_thread.name = 'RethinkCaptures-futures-thread(tid={})'.format(warcprox.gettid()) - result = self.r.table(self.table).insert(entry).run() - if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: - raise Exception("unexpected result %s saving %s", result, entry) - self.logger.debug("big capture table db saved %s", entry) - except: - self.logger.error("unexpected problem ", exc_info=True) - def notify(self, recorded_url, records): entry = self._assemble_entry(recorded_url, records) with self._batch_lock: