mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
insert captures entries in batch every 0.5 seconds, since rethinkdb updates were falling way behind sometimes
This commit is contained in:
parent
afdb6cf557
commit
783e730e52
@ -8,9 +8,10 @@ import base64
|
||||
import surt
|
||||
import os
|
||||
import hashlib
|
||||
import concurrent.futures
|
||||
import threading
|
||||
|
||||
class RethinkCaptures:
|
||||
"""Inserts in batches every 0.5 seconds"""
|
||||
logger = logging.getLogger("warcprox.bigtables.RethinkCaptures")
|
||||
|
||||
def __init__(self, r, table="captures", shards=None, replicas=None, options=warcprox.Options()):
|
||||
@ -21,9 +22,28 @@ class RethinkCaptures:
|
||||
self.options = options
|
||||
self._ensure_db_table()
|
||||
|
||||
# only one worker thread to ensure consistency, see
|
||||
# https://rethinkdb.com/docs/consistency/
|
||||
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
self._stop = threading.Event()
|
||||
self._batch_lock = threading.RLock()
|
||||
with self._batch_lock:
|
||||
self._batch = []
|
||||
self._insert_batch() # starts repeating timer
|
||||
|
||||
def _insert_batch(self):
|
||||
with self._batch_lock:
|
||||
if len(self._batch) > 0:
|
||||
result = self.r.table(self.table).insert(self._batch).run()
|
||||
if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]:
|
||||
raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch))
|
||||
self.logger.info("big capture table db saved %s entries", len(self._batch))
|
||||
self.logger.info("saved %s", self._batch)
|
||||
self._batch = []
|
||||
|
||||
if not self._stop.is_set():
|
||||
self._timer = threading.Timer(0.5, self._insert_batch)
|
||||
self._timer.name = "RethinkCaptures-batch-insert-timer"
|
||||
self._timer.start()
|
||||
else:
|
||||
self.logger.info("finished")
|
||||
|
||||
def _ensure_db_table(self):
|
||||
dbs = self.r.db_list().run()
|
||||
@ -94,6 +114,7 @@ class RethinkCaptures:
|
||||
|
||||
def _save_entry(self, entry):
|
||||
try:
|
||||
threading.current_thread.name = 'RethinkCaptures-futures-thread(tid={})'.format(warcprox.gettid())
|
||||
result = self.r.table(self.table).insert(entry).run()
|
||||
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
|
||||
raise Exception("unexpected result %s saving %s", result, entry)
|
||||
@ -103,13 +124,13 @@ class RethinkCaptures:
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
entry = self._assemble_entry(recorded_url, records)
|
||||
self._executor.submit(self._save_entry, entry)
|
||||
with self._batch_lock:
|
||||
self._batch.append(entry)
|
||||
|
||||
def close(self):
|
||||
self.logger.info("waiting for ~%s tasks to finish",
|
||||
self._executor._work_queue.qsize() + (self._executor._max_workers/2))
|
||||
self._executor.shutdown(wait=True)
|
||||
self.logger.info("shut down complete")
|
||||
self.logger.info("closing rethinkdb captures table")
|
||||
self._stop.set()
|
||||
self._timer.join()
|
||||
|
||||
class RethinkCapturesDedup:
|
||||
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
|
||||
|
Loading…
x
Reference in New Issue
Block a user