From d48e2c462dbe9e0897e14576b4f5c4d09cbadd7f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 16 Jun 2016 00:04:59 +0000 Subject: [PATCH] add a start() method to the two classes that save data to rethinkdb periodically in batches, instead of starting the timer in __init__ --- tests/test_warcprox.py | 2 ++ warcprox/bigtable.py | 18 ++++++++++++++++-- warcprox/controller.py | 10 ++++++++-- warcprox/dedup.py | 13 +++++++++++++ warcprox/stats.py | 33 ++++++++++++++++++++++++++------- 5 files changed, 65 insertions(+), 11 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 45933b5..db97674 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -188,6 +188,7 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table): db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) r = rethinkstuff.Rethinker(servers, db) captures_db = warcprox.bigtable.RethinkCaptures(r) + captures_db.start() def fin(): if captures_db: @@ -247,6 +248,7 @@ def stats_db(request, rethinkdb_servers): db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) r = rethinkstuff.Rethinker(servers, db) sdb = warcprox.stats.RethinkStatsDb(r) + sdb.start() else: f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) f.close() diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 3f8989a..66b84f0 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -52,7 +52,11 @@ class RethinkCaptures: self._batch_lock = threading.RLock() with self._batch_lock: self._batch = [] - self._insert_batch() # starts repeating timer + self._timer = None + + def start(self): + """Starts batch insert repeating timer""" + self._insert_batch() def _insert_batch(self): try: @@ -165,9 +169,13 @@ class RethinkCaptures: self._batch.append(entry) def close(self): + self.stop() + + def stop(self): self.logger.info("closing rethinkdb captures table") self._stop.set() - self._timer.join() + if self._timer: + self._timer.join() class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") @@ -195,5 +203,11 @@ class RethinkCapturesDedup: else: return None + def start(self): + self.captures_db.start() + + def stop(self): + self.captures_db.stop() + def close(self): self.captures_db.close() diff --git a/warcprox/controller.py b/warcprox/controller.py index a813345..760a1e8 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -149,8 +149,14 @@ class WarcproxController(object): Start warcprox and run until shut down. Call warcprox_controller.stop.set() to initiate graceful shutdown. """ - proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') + if self.proxy.stats_db: + self.proxy.stats_db.start() + proxy_thread = threading.Thread( + target=self.proxy.serve_forever, name='ProxyThread') proxy_thread.start() + + if self.warc_writer_thread.dedup_db: + self.warc_writer_thread.dedup_db.start() self.warc_writer_thread.start() if self.playback_proxy is not None: @@ -199,7 +205,7 @@ class WarcproxController(object): self.warc_writer_thread.join() if self.proxy.stats_db: - self.proxy.stats_db.close() + self.proxy.stats_db.stop() if self.warc_writer_thread.dedup_db: self.warc_writer_thread.dedup_db.close() diff --git a/warcprox/dedup.py b/warcprox/dedup.py index eb71cb4..c5080d3 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -48,6 +48,12 @@ class DedupDb(object): self.db = dbm_gnu.open(dbm_file, 'c') self.options = options + def start(self): + pass + + def stop(self): + self.close() + def close(self): self.db.close() @@ -125,6 +131,13 @@ class RethinkDedupDb: repr(self.table), repr(self.r.dbname), self.shards, self.replicas) self.r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run() + + def start(self): + pass + + def stop(self): + pass + def close(self): pass diff --git a/warcprox/stats.py b/warcprox/stats.py index 8dd3a86..7bf3fbc 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -71,6 +71,13 @@ class StatsDb: self.db = dbm_gnu.open(dbm_file, 'c') self.options = options + def start(self): + # method only exists to match RethinkStatsDb + pass + + def stop(self): + self.close() + def close(self): self.db.close() @@ -134,7 +141,7 @@ class StatsDb: self.db[b] = json.dumps(bucket_stats, separators=(',',':')).encode("utf-8") class RethinkStatsDb: - """Updates database in batch every 0.5 seconds""" + """Updates database in batch every 2.0 seconds""" logger = logging.getLogger("warcprox.stats.RethinkStatsDb") def __init__(self, rethinker, table="stats", shards=None, replicas=None, options=warcprox.Options()): @@ -149,7 +156,10 @@ class RethinkStatsDb: self._batch_lock = threading.RLock() with self._batch_lock: self._batch = {} + self._timer = None + def start(self): + """Starts batch update repeating timer.""" self._update_batch() # starts repeating timer def _update_batch(self): @@ -190,18 +200,27 @@ class RethinkStatsDb: def _ensure_db_table(self): dbs = self.r.db_list().run() if not self.r.dbname in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.r.dbname)) + self.logger.info( + "creating rethinkdb database %s", repr(self.r.dbname)) self.r.db_create(self.r.dbname).run() tables = self.r.table_list().run() if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s shards=%s replicas=%s", - repr(self.table), repr(self.r.dbname), self.shards, self.replicas) - self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run() + self.logger.info( + "creating rethinkdb table %s in database %s shards=%s " + "replicas=%s", repr(self.table), repr(self.r.dbname), + self.shards, self.replicas) + self.r.table_create( + self.table, primary_key="bucket", shards=self.shards, + replicas=self.replicas).run() def close(self): - self.logger.info("closing rethinkdb stats table") + self.stop() + + def stop(self): + self.logger.info("stopping rethinkdb stats table batch updates") self._stop.set() - self._timer.join() + if self._timer: + self._timer.join() def sync(self): pass