mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
adding missing import, remove unused method, logging tweaks, avoid exception at shutdown joining unstarted timer thread
This commit is contained in:
parent
f38ce708bf
commit
7eb82ab8a2
@ -9,6 +9,7 @@ import surt
|
|||||||
import os
|
import os
|
||||||
import hashlib
|
import hashlib
|
||||||
import threading
|
import threading
|
||||||
|
import datetime
|
||||||
|
|
||||||
class RethinkCaptures:
|
class RethinkCaptures:
|
||||||
"""Inserts in batches every 0.5 seconds"""
|
"""Inserts in batches every 0.5 seconds"""
|
||||||
@ -34,14 +35,15 @@ class RethinkCaptures:
|
|||||||
result = self.r.table(self.table).insert(self._batch).run()
|
result = self.r.table(self.table).insert(self._batch).run()
|
||||||
if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]:
|
if result["inserted"] != len(self._batch) or sorted(result.values()) != [0,0,0,0,0,len(self._batch)]:
|
||||||
raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch))
|
raise Exception("unexpected result %s saving batch of %s entries", result, len(self._batch))
|
||||||
self.logger.info("big capture table db saved %s entries", len(self._batch))
|
self.logger.info("saved %s entries to big capture table db", len(self._batch))
|
||||||
self.logger.info("saved %s", self._batch)
|
|
||||||
self._batch = []
|
self._batch = []
|
||||||
|
|
||||||
if not self._stop.is_set():
|
if not self._stop.is_set():
|
||||||
self._timer = threading.Timer(0.5, self._insert_batch)
|
t = threading.Timer(0.5, self._insert_batch)
|
||||||
self._timer.name = "RethinkCaptures-batch-insert-timer"
|
t.name = "RethinkCaptures-batch-insert-timer-%s" % datetime.datetime.utcnow().isoformat()
|
||||||
self._timer.start()
|
t.start()
|
||||||
|
# ensure self._timer joinable (already started) whenever close() happens to be called
|
||||||
|
self._timer = t
|
||||||
else:
|
else:
|
||||||
self.logger.info("finished")
|
self.logger.info("finished")
|
||||||
|
|
||||||
@ -65,7 +67,7 @@ class RethinkCaptures:
|
|||||||
results = list(results_iter)
|
results = list(results_iter)
|
||||||
if len(results) > 0:
|
if len(results) > 0:
|
||||||
if len(results) > 1:
|
if len(results) > 1:
|
||||||
self.logger.error("expected 0 or 1 but found %s results for sha1base32=%s bucket=%s (will use first result)", len(results), sha1base32, bucket)
|
self.logger.debug("expected 0 or 1 but found %s results for sha1base32=%s bucket=%s (will use first result)", len(results), sha1base32, bucket)
|
||||||
result = results[0]
|
result = results[0]
|
||||||
else:
|
else:
|
||||||
result = None
|
result = None
|
||||||
@ -112,16 +114,6 @@ class RethinkCaptures:
|
|||||||
|
|
||||||
return entry
|
return entry
|
||||||
|
|
||||||
def _save_entry(self, entry):
|
|
||||||
try:
|
|
||||||
threading.current_thread.name = 'RethinkCaptures-futures-thread(tid={})'.format(warcprox.gettid())
|
|
||||||
result = self.r.table(self.table).insert(entry).run()
|
|
||||||
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
|
|
||||||
raise Exception("unexpected result %s saving %s", result, entry)
|
|
||||||
self.logger.debug("big capture table db saved %s", entry)
|
|
||||||
except:
|
|
||||||
self.logger.error("unexpected problem ", exc_info=True)
|
|
||||||
|
|
||||||
def notify(self, recorded_url, records):
|
def notify(self, recorded_url, records):
|
||||||
entry = self._assemble_entry(recorded_url, records)
|
entry = self._assemble_entry(recorded_url, records)
|
||||||
with self._batch_lock:
|
with self._batch_lock:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user