diff --git a/setup.py b/setup.py index 40a5d9d..9ad4da4 100755 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev90', + version='2.1b1.dev91', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index dd80a86..55e655f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1429,6 +1429,45 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback elif record.rec_type == 'request': assert record.http_headers.get_header('via') == '1.1 warcprox' +def test_summy_merge(): + d1 = { + 'a': { + 'metadata': 'some value', + 'a1': 5, + 'a2': 6, + }, + 'b': { + 'b1': 9, + } + } + + d2 = { + 'a': { + 'a1': 7, + 'a3': 8, + }, + 'c': { + 'c1': 10, + } + } + + merged = { + 'a': { + 'metadata': 'some value', + 'a1': 12, + 'a2': 6, + 'a3': 8, + }, + 'b': { + 'b1': 9, + }, + 'c': { + 'c1': 10, + }, + } + + assert warcprox.stats.summy_merge(d1, d2) == merged + if __name__ == '__main__': pytest.main() diff --git a/warcprox/stats.py b/warcprox/stats.py index 6e0afdd..682d197 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -32,6 +32,7 @@ import rethinkdb as r import datetime import urlcanon import sqlite3 +import copy def _empty_bucket(bucket): return { @@ -50,6 +51,28 @@ def _empty_bucket(bucket): }, } +def summy_merge(e, f): + if isinstance(e, (int, float)) and isinstance(f, (int, float)): + return e + f + elif (e is not None and not hasattr(e, 'keys')) or ( + f is not None and not hasattr(f, 'keys')): + return e or f + else: + result = {} + all_keys = set(e.keys()).union(f.keys()) + for k in all_keys: + m = e.get(k) + n = f.get(k) + if m is None and isinstance(n, (int, float)): + m = 0 + elif n is None and isinstance(m, (int, float)): + n = 0 + else: + m = m or {} + n = n or {} + result[k] = summy_merge(m, n) + return result + class StatsDb: logger = logging.getLogger("warcprox.stats.StatsDb") @@ -207,45 +230,51 @@ class RethinkStatsDb(StatsDb): """Starts batch update repeating timer.""" self._update_batch() # starts repeating timer - def _bucket_batch_update_reql(self, bucket): + def _bucket_batch_update_reql(self, bucket, batch): return self.rr.table(self.table).get(bucket).replace( lambda old: r.branch( - old.eq(None), self._batch[bucket], old.merge({ + old.eq(None), batch[bucket], old.merge({ "total": { "urls": old["total"]["urls"].add( - self._batch[bucket]["total"]["urls"]), + batch[bucket]["total"]["urls"]), "wire_bytes": old["total"]["wire_bytes"].add( - self._batch[bucket]["total"]["wire_bytes"]), + batch[bucket]["total"]["wire_bytes"]), }, "new": { "urls": old["new"]["urls"].add( - self._batch[bucket]["new"]["urls"]), + batch[bucket]["new"]["urls"]), "wire_bytes": old["new"]["wire_bytes"].add( - self._batch[bucket]["new"]["wire_bytes"]), + batch[bucket]["new"]["wire_bytes"]), }, "revisit": { "urls": old["revisit"]["urls"].add( - self._batch[bucket]["revisit"]["urls"]), + batch[bucket]["revisit"]["urls"]), "wire_bytes": old["revisit"]["wire_bytes"].add( - self._batch[bucket]["revisit"]["wire_bytes"]), + batch[bucket]["revisit"]["wire_bytes"]), }, }))) def _update_batch(self): + with self._batch_lock: + batch_copy = copy.deepcopy(self._batch) + self._batch = {} try: - with self._batch_lock: - if len(self._batch) > 0: - # XXX can all the buckets be done in one query? - for bucket in self._batch: - result = self._bucket_batch_update_reql(bucket).run() - if (not result["inserted"] and not result["replaced"] - or sorted(result.values()) != [0,0,0,0,0,1]): - raise Exception( - "unexpected result %s updating stats %s" % ( - result, self._batch[bucket])) - self._batch = {} + if len(batch_copy) > 0: + # XXX can all the buckets be done in one query? + for bucket in batch_copy: + result = self._bucket_batch_update_reql( + bucket, batch_copy).run() + if (not result["inserted"] and not result["replaced"] + or sorted(result.values()) != [0,0,0,0,0,1]): + raise Exception( + "unexpected result %s updating stats %s" % ( + result, batch_copy[bucket])) except Exception as e: self.logger.error("problem updating stats", exc_info=True) + # now we need to restore the stats that didn't get saved to the + # batch so that they are saved in the next call to _update_batch() + with self._batch_lock: + self._batch = summy_merge(self._batch, batch_copy) finally: if not self._stop.is_set(): self._timer = threading.Timer(2.0, self._update_batch)