From 3e1566cd6f650653e74fe0c52d182705c1206bea Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 28 Oct 2015 23:49:44 +0000 Subject: [PATCH] update big captures table asynchronously --- warcprox/bigtable.py | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index ec75360..e020364 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -1,5 +1,3 @@ -# vim:set sw=4 et: - from __future__ import absolute_import import logging @@ -10,6 +8,7 @@ import base64 import surt import os import hashlib +import concurrent.futures class RethinkCaptures: logger = logging.getLogger("warcprox.bigtables.RethinkCaptures") @@ -22,6 +21,10 @@ class RethinkCaptures: self.options = options self._ensure_db_table() + # only one worker thread to ensure consistency, see + # https://rethinkdb.com/docs/consistency/ + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + def _ensure_db_table(self): dbs = self.r.db_list().run() if not self.r.dbname in dbs: @@ -49,7 +52,7 @@ class RethinkCaptures: self.logger.debug("returning %s for sha1base32=%s bucket=%s", result, sha1base32, bucket) return result - def notify(self, recorded_url, records): + def _assemble_entry(self, recorded_url, records): if recorded_url.response_recorder: if recorded_url.response_recorder.payload_digest.name == "sha1": sha1base32 = base64.b32encode(recorded_url.response_recorder.payload_digest.digest()).decode("utf-8") @@ -87,10 +90,26 @@ class RethinkCaptures: "length": records[0].length, } - result = self.r.table(self.table).insert(entry).run() - if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: - raise Exception("unexpected result %s saving %s", result, entry) - self.logger.debug("big capture table db saved %s", entry) + return entry + + def _save_entry(self, entry): + try: + result = self.r.table(self.table).insert(entry).run() + if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: + raise Exception("unexpected result %s saving %s", result, entry) + self.logger.debug("big capture table db saved %s", entry) + except: + self.logger.error("unexpected problem ", exc_info=True) + + def notify(self, recorded_url, records): + entry = self._assemble_entry(recorded_url, records) + self._executor.submit(self._save_entry, entry) + + def close(self): + self.logger.info("waiting for ~%s tasks to finish", + self._executor._work_queue.qsize() + (self._executor._max_workers/2)) + self._executor.shutdown(wait=True) + self.logger.info("shut down complete") class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") @@ -114,4 +133,4 @@ class RethinkCapturesDedup: return None def close(self): - pass + self.captures_db.close()