diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 07e1923..9fe0e6e 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -8,9 +8,10 @@ import base64 import surt import os import hashlib -import concurrent.futures +import threading class RethinkCaptures: + """Inserts in batches every 0.5 seconds""" logger = logging.getLogger("warcprox.bigtables.RethinkCaptures") def __init__(self, r, table="captures", shards=None, replicas=None, options=warcprox.Options()): @@ -21,9 +22,28 @@ class RethinkCaptures: self.options = options self._ensure_db_table() - # only one worker thread to ensure consistency, see - # https://rethinkdb.com/docs/consistency/ - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + self._stop = threading.Event() + self._batch_lock = threading.RLock() + with self._batch_lock: + self._batch = [] + self._insert_batch() # starts repeating timer + + def _insert_batch(self): + with self._batch_lock: + if len(self._batch) > 0: + result = self.r.table(self.table).insert(self._batch).run() + if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]: + raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch)) + self.logger.info("big capture table db saved %s entries", len(self._batch)) + self.logger.info("saved %s", self._batch) + self._batch = [] + + if not self._stop.is_set(): + self._timer = threading.Timer(0.5, self._insert_batch) + self._timer.name = "RethinkCaptures-batch-insert-timer" + self._timer.start() + else: + self.logger.info("finished") def _ensure_db_table(self): dbs = self.r.db_list().run() @@ -94,6 +114,7 @@ class RethinkCaptures: def _save_entry(self, entry): try: + threading.current_thread.name = 'RethinkCaptures-futures-thread(tid={})'.format(warcprox.gettid()) result = self.r.table(self.table).insert(entry).run() if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: raise Exception("unexpected result %s saving %s", result, entry) @@ -103,13 +124,13 @@ class RethinkCaptures: def notify(self, recorded_url, records): entry = self._assemble_entry(recorded_url, records) - self._executor.submit(self._save_entry, entry) + with self._batch_lock: + self._batch.append(entry) def close(self): - self.logger.info("waiting for ~%s tasks to finish", - self._executor._work_queue.qsize() + (self._executor._max_workers/2)) - self._executor.shutdown(wait=True) - self.logger.info("shut down complete") + self.logger.info("closing rethinkdb captures table") + self._stop.set() + self._timer.join() class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")