mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
log status in close()
This commit is contained in:
parent
93a2e4ff85
commit
afdb6cf557
@ -137,16 +137,23 @@ class RethinkStatsDb:
|
|||||||
self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run()
|
self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.logger.info("waiting for ~%s tasks to finish",
|
self._executor.shutdown(wait=False)
|
||||||
self._executor._work_queue.qsize() + (self._executor._max_workers/2))
|
last_update = 0
|
||||||
|
while True:
|
||||||
|
time.sleep(0.5)
|
||||||
|
remaining_estimate = self._executor._work_queue.qsize() + self._executor._max_workers/2
|
||||||
|
if remaining_estimate < self._executor._max_workers:
|
||||||
|
break
|
||||||
|
if time.time() - last_update >= 30:
|
||||||
|
self.logger.info("waiting for ~%s tasks to finish", remaining_estimate)
|
||||||
|
last_update = time.time()
|
||||||
self._executor.shutdown(wait=True)
|
self._executor.shutdown(wait=True)
|
||||||
self.logger.info("shut down complete")
|
self.logger.info("all tasks finished")
|
||||||
|
|
||||||
def sync(self):
|
def sync(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
|
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
|
||||||
# XXX use pluck?
|
|
||||||
bucket0_stats = self.r.table(self.table).get(bucket0).run()
|
bucket0_stats = self.r.table(self.table).get(bucket0).run()
|
||||||
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
|
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
|
||||||
if bucket0_stats:
|
if bucket0_stats:
|
||||||
@ -157,8 +164,12 @@ class RethinkStatsDb:
|
|||||||
return bucket0_stats[bucket1]
|
return bucket0_stats[bucket1]
|
||||||
return bucket0_stats
|
return bucket0_stats
|
||||||
|
|
||||||
|
|
||||||
|
# >>> r.db("archiveit_brozzler").table("test00").get("foo01").replace(lambda old: r.branch(old.eq(None), {"id":"foo01", "a":{"b":88}}, old.merge({"a":{"b":old["a"]["b"].add(3)}}))).run(conn)
|
||||||
|
|
||||||
def _tally(self, buckets, size, is_revisit):
|
def _tally(self, buckets, size, is_revisit):
|
||||||
try:
|
try:
|
||||||
|
threading.current_thread.name = 'RethinkStatsDb-futures-thread(tid={})'.format(warcprox.gettid())
|
||||||
self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit))
|
self.logger.debug("starting task self._tally(%s)", (buckets, size, is_revisit))
|
||||||
for bucket in buckets:
|
for bucket in buckets:
|
||||||
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user