diff --git a/setup.py b/setup.py index db9cadc..d54b2cf 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.0b2.dev34', + version='2.0b2.dev35', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index a1ac377..0b3a4e9 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -40,7 +40,9 @@ class RethinkCaptures: """Inserts in batches every 0.5 seconds""" logger = logging.getLogger("warcprox.bigtable.RethinkCaptures") - def __init__(self, r, table="captures", shards=None, replicas=None, options=warcprox.Options()): + def __init__( + self, r, table="captures", shards=None, replicas=None, + options=warcprox.Options()): self.r = r self.table = table self.shards = shards or len(r.servers) @@ -62,15 +64,21 @@ class RethinkCaptures: try: with self._batch_lock: if len(self._batch) > 0: - result = self.r.table(self.table).insert(self._batch).run() - if result["inserted"] != len(self._batch) or sorted( - result.values()) != [0,0,0,0,0,len(self._batch)]: + result = self.r.table(self.table).insert( + self._batch, conflict="replace").run() + if result["inserted"] + result["replaced"] != len(self._batch): raise Exception( "unexpected result %s saving batch of %s " "entries", result, len(self._batch)) - self.logger.debug( - "saved %s entries to big capture table db", - len(self._batch)) + if result["replaced"] > 0: + self.logger.warn( + "inserted %s entries and replaced %s entries " + "in big captures table", result["inserted"], + result["replaced"]) + else: + self.logger.debug( + "inserted %s entries to big captures table", + len(self._batch)) self._batch = [] except BaseException as e: self.logger.error( @@ -82,7 +90,8 @@ class RethinkCaptures: t = threading.Timer(0.5, self._insert_batch) t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat() t.start() - # ensure self._timer joinable (already started) whenever close() happens to be called + # ensure self._timer joinable (already started) whenever + # close() happens to be called self._timer = t else: self.logger.info("finished") @@ -101,7 +110,9 @@ class RethinkCaptures: def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": - raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) + raise Exception( + "digest type is %s but big captures table is indexed by " + "sha1" % algo) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") results_iter = self.r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run() results = list(results_iter) @@ -123,7 +134,7 @@ class RethinkCaptures: ).decode("utf-8") else: self.logger.warn( - "digest type is %s but big capture table is indexed " + "digest type is %s but big captures table is indexed " "by sha1", recorded_url.response_recorder.payload_digest.name) else: