mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
simplify recovery of stats batch in case of exception saving them (not sure what was wrong with summy_merge, but this is simpler)
This commit is contained in:
parent
c0ee9c6093
commit
b23e485898
2
setup.py
2
setup.py
@ -50,7 +50,7 @@ except:
|
||||
|
||||
setuptools.setup(
|
||||
name='warcprox',
|
||||
version='2.1b1.dev91',
|
||||
version='2.1b1.dev92',
|
||||
description='WARC writing MITM HTTP/S proxy',
|
||||
url='https://github.com/internetarchive/warcprox',
|
||||
author='Noah Levitt',
|
||||
|
@ -1429,45 +1429,6 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
|
||||
elif record.rec_type == 'request':
|
||||
assert record.http_headers.get_header('via') == '1.1 warcprox'
|
||||
|
||||
def test_summy_merge():
|
||||
d1 = {
|
||||
'a': {
|
||||
'metadata': 'some value',
|
||||
'a1': 5,
|
||||
'a2': 6,
|
||||
},
|
||||
'b': {
|
||||
'b1': 9,
|
||||
}
|
||||
}
|
||||
|
||||
d2 = {
|
||||
'a': {
|
||||
'a1': 7,
|
||||
'a3': 8,
|
||||
},
|
||||
'c': {
|
||||
'c1': 10,
|
||||
}
|
||||
}
|
||||
|
||||
merged = {
|
||||
'a': {
|
||||
'metadata': 'some value',
|
||||
'a1': 12,
|
||||
'a2': 6,
|
||||
'a3': 8,
|
||||
},
|
||||
'b': {
|
||||
'b1': 9,
|
||||
},
|
||||
'c': {
|
||||
'c1': 10,
|
||||
},
|
||||
}
|
||||
|
||||
assert warcprox.stats.summy_merge(d1, d2) == merged
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main()
|
||||
|
||||
|
@ -51,28 +51,6 @@ def _empty_bucket(bucket):
|
||||
},
|
||||
}
|
||||
|
||||
def summy_merge(e, f):
|
||||
if isinstance(e, (int, float)) and isinstance(f, (int, float)):
|
||||
return e + f
|
||||
elif (e is not None and not hasattr(e, 'keys')) or (
|
||||
f is not None and not hasattr(f, 'keys')):
|
||||
return e or f
|
||||
else:
|
||||
result = {}
|
||||
all_keys = set(e.keys()).union(f.keys())
|
||||
for k in all_keys:
|
||||
m = e.get(k)
|
||||
n = f.get(k)
|
||||
if m is None and isinstance(n, (int, float)):
|
||||
m = 0
|
||||
elif n is None and isinstance(m, (int, float)):
|
||||
n = 0
|
||||
else:
|
||||
m = m or {}
|
||||
n = n or {}
|
||||
result[k] = summy_merge(m, n)
|
||||
return result
|
||||
|
||||
class StatsDb:
|
||||
logger = logging.getLogger("warcprox.stats.StatsDb")
|
||||
|
||||
@ -274,7 +252,7 @@ class RethinkStatsDb(StatsDb):
|
||||
# now we need to restore the stats that didn't get saved to the
|
||||
# batch so that they are saved in the next call to _update_batch()
|
||||
with self._batch_lock:
|
||||
self._batch = summy_merge(self._batch, batch_copy)
|
||||
self._add_to_batch(batch_copy)
|
||||
finally:
|
||||
if not self._stop.is_set():
|
||||
self._timer = threading.Timer(2.0, self._update_batch)
|
||||
@ -344,6 +322,18 @@ class RethinkStatsDb(StatsDb):
|
||||
bucket_stats["new"]["urls"] += 1
|
||||
bucket_stats["new"]["wire_bytes"] += recorded_url.size
|
||||
|
||||
def _add_to_batch(self, add_me):
|
||||
with self._batch_lock:
|
||||
for bucket in add_me:
|
||||
bucket_stats = self._batch.setdefault(
|
||||
bucket, _empty_bucket(bucket))
|
||||
bucket_stats["total"]["urls"] += add_me[bucket]["total"]["urls"]
|
||||
bucket_stats["total"]["wire_bytes"] += add_me[bucket]["total"]["wire_bytes"]
|
||||
bucket_stats["revisit"]["urls"] += add_me[bucket]["revisit"]["urls"]
|
||||
bucket_stats["revisit"]["wire_bytes"] += add_me[bucket]["revisit"]["wire_bytes"]
|
||||
bucket_stats["new"]["urls"] += add_me[bucket]["new"]["urls"]
|
||||
bucket_stats["new"]["wire_bytes"] += add_me[bucket]["new"]["wire_bytes"]
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
self.tally(recorded_url, records)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user