1
0
mirror of https://github.com/internetarchive/warcprox.git synced 2025-01-18 13:22:09 +01:00

for big captures table, do insert with conflict="replace"

We're doing this because one time this happened:
rethinkdb.errors.ReqlOpIndeterminateError: Cannot perform write: The primary replica isn't connected to a quorum of replicas....
and on the next attempt this happened:
{'errors': 1, 'inserted': 1, 'first_error': 'Duplicate primary key `id`: ....

When we got ReqlOpIndeterminateError the operation actually succeeded
partially, one of the records was inserted. After that the batch insert
failed every time because it was trying to insert the same entry. With
this change there will be no error from a duplicate key.
This commit is contained in:
Noah Levitt 2016-10-25 16:54:07 -07:00
parent 1671080755
commit 41bd6c72af
2 changed files with 22 additions and 11 deletions

@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.0b2.dev34',
version='2.0b2.dev35',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

@ -40,7 +40,9 @@ class RethinkCaptures:
"""Inserts in batches every 0.5 seconds"""
logger = logging.getLogger("warcprox.bigtable.RethinkCaptures")
def __init__(self, r, table="captures", shards=None, replicas=None, options=warcprox.Options()):
def __init__(
self, r, table="captures", shards=None, replicas=None,
options=warcprox.Options()):
self.r = r
self.table = table
self.shards = shards or len(r.servers)
@ -62,15 +64,21 @@ class RethinkCaptures:
try:
with self._batch_lock:
if len(self._batch) > 0:
result = self.r.table(self.table).insert(self._batch).run()
if result["inserted"] != len(self._batch) or sorted(
result.values()) != [0,0,0,0,0,len(self._batch)]:
result = self.r.table(self.table).insert(
self._batch, conflict="replace").run()
if result["inserted"] + result["replaced"] != len(self._batch):
raise Exception(
"unexpected result %s saving batch of %s "
"entries", result, len(self._batch))
self.logger.debug(
"saved %s entries to big capture table db",
len(self._batch))
if result["replaced"] > 0:
self.logger.warn(
"inserted %s entries and replaced %s entries "
"in big captures table", result["inserted"],
result["replaced"])
else:
self.logger.debug(
"inserted %s entries to big captures table",
len(self._batch))
self._batch = []
except BaseException as e:
self.logger.error(
@ -82,7 +90,8 @@ class RethinkCaptures:
t = threading.Timer(0.5, self._insert_batch)
t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat()
t.start()
# ensure self._timer joinable (already started) whenever close() happens to be called
# ensure self._timer joinable (already started) whenever
# close() happens to be called
self._timer = t
else:
self.logger.info("finished")
@ -101,7 +110,9 @@ class RethinkCaptures:
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
if algo != "sha1":
raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo))
raise Exception(
"digest type is %s but big captures table is indexed by "
"sha1" % algo)
sha1base32 = base64.b32encode(raw_digest).decode("utf-8")
results_iter = self.r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run()
results = list(results_iter)
@ -123,7 +134,7 @@ class RethinkCaptures:
).decode("utf-8")
else:
self.logger.warn(
"digest type is %s but big capture table is indexed "
"digest type is %s but big captures table is indexed "
"by sha1",
recorded_url.response_recorder.payload_digest.name)
else: