add a start() method to the two classes that save data to rethinkdb periodically in batches, instead of starting the timer in __init__

This commit is contained in:
Noah Levitt 2016-06-16 00:04:59 +00:00
parent 4bb3556709
commit d48e2c462d
5 changed files with 65 additions and 11 deletions

View File

@ -188,6 +188,7 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
r = rethinkstuff.Rethinker(servers, db)
captures_db = warcprox.bigtable.RethinkCaptures(r)
captures_db.start()
def fin():
if captures_db:
@ -247,6 +248,7 @@ def stats_db(request, rethinkdb_servers):
db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
r = rethinkstuff.Rethinker(servers, db)
sdb = warcprox.stats.RethinkStatsDb(r)
sdb.start()
else:
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
f.close()

View File

@ -52,7 +52,11 @@ class RethinkCaptures:
self._batch_lock = threading.RLock()
with self._batch_lock:
self._batch = []
self._insert_batch() # starts repeating timer
self._timer = None
def start(self):
"""Starts batch insert repeating timer"""
self._insert_batch()
def _insert_batch(self):
try:
@ -165,9 +169,13 @@ class RethinkCaptures:
self._batch.append(entry)
def close(self):
self.stop()
def stop(self):
self.logger.info("closing rethinkdb captures table")
self._stop.set()
self._timer.join()
if self._timer:
self._timer.join()
class RethinkCapturesDedup:
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
@ -195,5 +203,11 @@ class RethinkCapturesDedup:
else:
return None
def start(self):
self.captures_db.start()
def stop(self):
self.captures_db.stop()
def close(self):
self.captures_db.close()

View File

@ -149,8 +149,14 @@ class WarcproxController(object):
Start warcprox and run until shut down. Call
warcprox_controller.stop.set() to initiate graceful shutdown.
"""
proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread')
if self.proxy.stats_db:
self.proxy.stats_db.start()
proxy_thread = threading.Thread(
target=self.proxy.serve_forever, name='ProxyThread')
proxy_thread.start()
if self.warc_writer_thread.dedup_db:
self.warc_writer_thread.dedup_db.start()
self.warc_writer_thread.start()
if self.playback_proxy is not None:
@ -199,7 +205,7 @@ class WarcproxController(object):
self.warc_writer_thread.join()
if self.proxy.stats_db:
self.proxy.stats_db.close()
self.proxy.stats_db.stop()
if self.warc_writer_thread.dedup_db:
self.warc_writer_thread.dedup_db.close()

View File

@ -48,6 +48,12 @@ class DedupDb(object):
self.db = dbm_gnu.open(dbm_file, 'c')
self.options = options
def start(self):
pass
def stop(self):
self.close()
def close(self):
self.db.close()
@ -125,6 +131,13 @@ class RethinkDedupDb:
repr(self.table), repr(self.r.dbname), self.shards, self.replicas)
self.r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run()
def start(self):
pass
def stop(self):
pass
def close(self):
pass

View File

@ -71,6 +71,13 @@ class StatsDb:
self.db = dbm_gnu.open(dbm_file, 'c')
self.options = options
def start(self):
# method only exists to match RethinkStatsDb
pass
def stop(self):
self.close()
def close(self):
self.db.close()
@ -134,7 +141,7 @@ class StatsDb:
self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8")
class RethinkStatsDb:
"""Updates database in batch every 0.5 seconds"""
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()):
@ -149,7 +156,10 @@ class RethinkStatsDb:
self._batch_lock = threading.RLock()
with self._batch_lock:
self._batch = {}
self._timer = None
def start(self):
"""Starts batch update repeating timer."""
self._update_batch() # starts repeating timer
def _update_batch(self):
@ -190,18 +200,27 @@ class RethinkStatsDb:
def _ensure_db_table(self):
dbs = self.r.db_list().run()
if not self.r.dbname in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.dbname))
self.logger.info(
"creating rethinkdb database %s", repr(self.r.dbname))
self.r.db_create(self.r.dbname).run()
tables = self.r.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s",
repr(self.table), repr(self.r.dbname), self.shards, self.replicas)
self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run()
self.logger.info(
"creating rethinkdb table %s in database %s shards=%s "
"replicas=%s", repr(self.table), repr(self.r.dbname),
self.shards, self.replicas)
self.r.table_create(
self.table, primary_key="bucket", shards=self.shards,
replicas=self.replicas).run()
def close(self):
self.logger.info("closing rethinkdb stats table")
self.stop()
def stop(self):
self.logger.info("stopping rethinkdb stats table batch updates")
self._stop.set()
self._timer.join()
if self._timer:
self._timer.join()
def sync(self):
pass