From 41bd6c72af1aff22e82348f2e7e10b776bac34b8 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 25 Oct 2016 16:54:07 -0700 Subject: [PATCH] for big captures table, do insert with conflict="replace" We're doing this because one time this happened: rethinkdb.errors.ReqlOpIndeterminateError: Cannot perform write: The primary replica isn't connected to a quorum of replicas.... and on the next attempt this happened: {'errors': 1, 'inserted': 1, 'first_error': 'Duplicate primary key `id`: .... When we got ReqlOpIndeterminateError the operation actually succeeded partially, one of the records was inserted. After that the batch insert failed every time because it was trying to insert the same entry. With this change there will be no error from a duplicate key. --- setup.py | 2 +- warcprox/bigtable.py | 31 +++++++++++++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/setup.py b/setup.py index db9cadc..d54b2cf 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.0b2.dev34', + version='2.0b2.dev35', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index a1ac377..0b3a4e9 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -40,7 +40,9 @@ class RethinkCaptures: """Inserts in batches every 0.5 seconds""" logger = logging.getLogger("warcprox.bigtable.RethinkCaptures") - def __init__(self, r, table="captures", shards=None, replicas=None, options=warcprox.Options()): + def __init__( + self, r, table="captures", shards=None, replicas=None, + options=warcprox.Options()): self.r = r self.table = table self.shards = shards or len(r.servers) @@ -62,15 +64,21 @@ class RethinkCaptures: try: with self._batch_lock: if len(self._batch) > 0: - result = self.r.table(self.table).insert(self._batch).run() - if result["inserted"] != len(self._batch) or sorted( - result.values()) != [0,0,0,0,0,len(self._batch)]: + result = self.r.table(self.table).insert( + self._batch, conflict="replace").run() + if result["inserted"] + result["replaced"] != len(self._batch): raise Exception( "unexpected result %s saving batch of %s " "entries", result, len(self._batch)) - self.logger.debug( - "saved %s entries to big capture table db", - len(self._batch)) + if result["replaced"] > 0: + self.logger.warn( + "inserted %s entries and replaced %s entries " + "in big captures table", result["inserted"], + result["replaced"]) + else: + self.logger.debug( + "inserted %s entries to big captures table", + len(self._batch)) self._batch = [] except BaseException as e: self.logger.error( @@ -82,7 +90,8 @@ class RethinkCaptures: t = threading.Timer(0.5, self._insert_batch) t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat() t.start() - # ensure self._timer joinable (already started) whenever close() happens to be called + # ensure self._timer joinable (already started) whenever + # close() happens to be called self._timer = t else: self.logger.info("finished") @@ -101,7 +110,9 @@ class RethinkCaptures: def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": - raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) + raise Exception( + "digest type is %s but big captures table is indexed by " + "sha1" % algo) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") results_iter = self.r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run() results = list(results_iter) @@ -123,7 +134,7 @@ class RethinkCaptures: ).decode("utf-8") else: self.logger.warn( - "digest type is %s but big capture table is indexed " + "digest type is %s but big captures table is indexed " "by sha1", recorded_url.response_recorder.payload_digest.name) else: