mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
avoid holding the lock, which makes all warc writer threads block, while doing rethinkdb operations, in RethinkStatsDb
This commit is contained in:
parent
24082c2e8c
commit
c0ee9c6093
2
setup.py
2
setup.py
@ -50,7 +50,7 @@ except:
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='warcprox',
|
name='warcprox',
|
||||||
version='2.1b1.dev90',
|
version='2.1b1.dev91',
|
||||||
description='WARC writing MITM HTTP/S proxy',
|
description='WARC writing MITM HTTP/S proxy',
|
||||||
url='https://github.com/internetarchive/warcprox',
|
url='https://github.com/internetarchive/warcprox',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
@ -1429,6 +1429,45 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
|
|||||||
elif record.rec_type == 'request':
|
elif record.rec_type == 'request':
|
||||||
assert record.http_headers.get_header('via') == '1.1 warcprox'
|
assert record.http_headers.get_header('via') == '1.1 warcprox'
|
||||||
|
|
||||||
|
def test_summy_merge():
|
||||||
|
d1 = {
|
||||||
|
'a': {
|
||||||
|
'metadata': 'some value',
|
||||||
|
'a1': 5,
|
||||||
|
'a2': 6,
|
||||||
|
},
|
||||||
|
'b': {
|
||||||
|
'b1': 9,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
d2 = {
|
||||||
|
'a': {
|
||||||
|
'a1': 7,
|
||||||
|
'a3': 8,
|
||||||
|
},
|
||||||
|
'c': {
|
||||||
|
'c1': 10,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
merged = {
|
||||||
|
'a': {
|
||||||
|
'metadata': 'some value',
|
||||||
|
'a1': 12,
|
||||||
|
'a2': 6,
|
||||||
|
'a3': 8,
|
||||||
|
},
|
||||||
|
'b': {
|
||||||
|
'b1': 9,
|
||||||
|
},
|
||||||
|
'c': {
|
||||||
|
'c1': 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
assert warcprox.stats.summy_merge(d1, d2) == merged
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
pytest.main()
|
pytest.main()
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ import rethinkdb as r
|
|||||||
import datetime
|
import datetime
|
||||||
import urlcanon
|
import urlcanon
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import copy
|
||||||
|
|
||||||
def _empty_bucket(bucket):
|
def _empty_bucket(bucket):
|
||||||
return {
|
return {
|
||||||
@ -50,6 +51,28 @@ def _empty_bucket(bucket):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def summy_merge(e, f):
|
||||||
|
if isinstance(e, (int, float)) and isinstance(f, (int, float)):
|
||||||
|
return e + f
|
||||||
|
elif (e is not None and not hasattr(e, 'keys')) or (
|
||||||
|
f is not None and not hasattr(f, 'keys')):
|
||||||
|
return e or f
|
||||||
|
else:
|
||||||
|
result = {}
|
||||||
|
all_keys = set(e.keys()).union(f.keys())
|
||||||
|
for k in all_keys:
|
||||||
|
m = e.get(k)
|
||||||
|
n = f.get(k)
|
||||||
|
if m is None and isinstance(n, (int, float)):
|
||||||
|
m = 0
|
||||||
|
elif n is None and isinstance(m, (int, float)):
|
||||||
|
n = 0
|
||||||
|
else:
|
||||||
|
m = m or {}
|
||||||
|
n = n or {}
|
||||||
|
result[k] = summy_merge(m, n)
|
||||||
|
return result
|
||||||
|
|
||||||
class StatsDb:
|
class StatsDb:
|
||||||
logger = logging.getLogger("warcprox.stats.StatsDb")
|
logger = logging.getLogger("warcprox.stats.StatsDb")
|
||||||
|
|
||||||
@ -207,45 +230,51 @@ class RethinkStatsDb(StatsDb):
|
|||||||
"""Starts batch update repeating timer."""
|
"""Starts batch update repeating timer."""
|
||||||
self._update_batch() # starts repeating timer
|
self._update_batch() # starts repeating timer
|
||||||
|
|
||||||
def _bucket_batch_update_reql(self, bucket):
|
def _bucket_batch_update_reql(self, bucket, batch):
|
||||||
return self.rr.table(self.table).get(bucket).replace(
|
return self.rr.table(self.table).get(bucket).replace(
|
||||||
lambda old: r.branch(
|
lambda old: r.branch(
|
||||||
old.eq(None), self._batch[bucket], old.merge({
|
old.eq(None), batch[bucket], old.merge({
|
||||||
"total": {
|
"total": {
|
||||||
"urls": old["total"]["urls"].add(
|
"urls": old["total"]["urls"].add(
|
||||||
self._batch[bucket]["total"]["urls"]),
|
batch[bucket]["total"]["urls"]),
|
||||||
"wire_bytes": old["total"]["wire_bytes"].add(
|
"wire_bytes": old["total"]["wire_bytes"].add(
|
||||||
self._batch[bucket]["total"]["wire_bytes"]),
|
batch[bucket]["total"]["wire_bytes"]),
|
||||||
},
|
},
|
||||||
"new": {
|
"new": {
|
||||||
"urls": old["new"]["urls"].add(
|
"urls": old["new"]["urls"].add(
|
||||||
self._batch[bucket]["new"]["urls"]),
|
batch[bucket]["new"]["urls"]),
|
||||||
"wire_bytes": old["new"]["wire_bytes"].add(
|
"wire_bytes": old["new"]["wire_bytes"].add(
|
||||||
self._batch[bucket]["new"]["wire_bytes"]),
|
batch[bucket]["new"]["wire_bytes"]),
|
||||||
},
|
},
|
||||||
"revisit": {
|
"revisit": {
|
||||||
"urls": old["revisit"]["urls"].add(
|
"urls": old["revisit"]["urls"].add(
|
||||||
self._batch[bucket]["revisit"]["urls"]),
|
batch[bucket]["revisit"]["urls"]),
|
||||||
"wire_bytes": old["revisit"]["wire_bytes"].add(
|
"wire_bytes": old["revisit"]["wire_bytes"].add(
|
||||||
self._batch[bucket]["revisit"]["wire_bytes"]),
|
batch[bucket]["revisit"]["wire_bytes"]),
|
||||||
},
|
},
|
||||||
})))
|
})))
|
||||||
|
|
||||||
def _update_batch(self):
|
def _update_batch(self):
|
||||||
|
with self._batch_lock:
|
||||||
|
batch_copy = copy.deepcopy(self._batch)
|
||||||
|
self._batch = {}
|
||||||
try:
|
try:
|
||||||
with self._batch_lock:
|
if len(batch_copy) > 0:
|
||||||
if len(self._batch) > 0:
|
# XXX can all the buckets be done in one query?
|
||||||
# XXX can all the buckets be done in one query?
|
for bucket in batch_copy:
|
||||||
for bucket in self._batch:
|
result = self._bucket_batch_update_reql(
|
||||||
result = self._bucket_batch_update_reql(bucket).run()
|
bucket, batch_copy).run()
|
||||||
if (not result["inserted"] and not result["replaced"]
|
if (not result["inserted"] and not result["replaced"]
|
||||||
or sorted(result.values()) != [0,0,0,0,0,1]):
|
or sorted(result.values()) != [0,0,0,0,0,1]):
|
||||||
raise Exception(
|
raise Exception(
|
||||||
"unexpected result %s updating stats %s" % (
|
"unexpected result %s updating stats %s" % (
|
||||||
result, self._batch[bucket]))
|
result, batch_copy[bucket]))
|
||||||
self._batch = {}
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error("problem updating stats", exc_info=True)
|
self.logger.error("problem updating stats", exc_info=True)
|
||||||
|
# now we need to restore the stats that didn't get saved to the
|
||||||
|
# batch so that they are saved in the next call to _update_batch()
|
||||||
|
with self._batch_lock:
|
||||||
|
self._batch = summy_merge(self._batch, batch_copy)
|
||||||
finally:
|
finally:
|
||||||
if not self._stop.is_set():
|
if not self._stop.is_set():
|
||||||
self._timer = threading.Timer(2.0, self._update_batch)
|
self._timer = threading.Timer(2.0, self._update_batch)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user