diff --git a/warcprox/stats.py b/warcprox/stats.py index 660d4c7..750ba5d 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -137,16 +137,23 @@ class RethinkStatsDb: self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run() def close(self): - self.logger.info("waiting for ~%s tasks to finish", - self._executor._work_queue.qsize() + (self._executor._max_workers/2)) + self._executor.shutdown(wait=False) + last_update = 0 + while True: + time.sleep(0.5) + remaining_estimate = self._executor._work_queue.qsize() + self._executor._max_workers/2 + if remaining_estimate < self._executor._max_workers: + break + if time.time() - last_update >= 30: + self.logger.info("waiting for ~%s tasks to finish", remaining_estimate) + last_update = time.time() self._executor.shutdown(wait=True) - self.logger.info("shut down complete") + self.logger.info("all tasks finished") def sync(self): pass def value(self, bucket0="__all__", bucket1=None, bucket2=None): - # XXX use pluck? bucket0_stats = self.r.table(self.table).get(bucket0).run() self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats) if bucket0_stats: @@ -157,8 +164,12 @@ class RethinkStatsDb: return bucket0_stats[bucket1] return bucket0_stats + + # >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn) + def _tally(self, buckets, size, is_revisit): try: + threading.current_thread.name = 'RethinkStatsDb-futures-thread(tid={})'.format(warcprox.gettid()) self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit)) for bucket in buckets: bucket_stats = self.value(bucket) or _empty_bucket(bucket)