mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
update big captures table asynchronously
This commit is contained in:
parent
f1362e4da0
commit
3e1566cd6f
@ -1,5 +1,3 @@
|
||||
# vim:set sw=4 et:
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
@ -10,6 +8,7 @@ import base64
|
||||
import surt
|
||||
import os
|
||||
import hashlib
|
||||
import concurrent.futures
|
||||
|
||||
class RethinkCaptures:
|
||||
logger = logging.getLogger("warcprox.bigtables.RethinkCaptures")
|
||||
@ -22,6 +21,10 @@ class RethinkCaptures:
|
||||
self.options = options
|
||||
self._ensure_db_table()
|
||||
|
||||
# only one worker thread to ensure consistency, see
|
||||
# https://rethinkdb.com/docs/consistency/
|
||||
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def _ensure_db_table(self):
|
||||
dbs = self.r.db_list().run()
|
||||
if not self.r.dbname in dbs:
|
||||
@ -49,7 +52,7 @@ class RethinkCaptures:
|
||||
self.logger.debug("returning %s for sha1base32=%s bucket=%s", result, sha1base32, bucket)
|
||||
return result
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
def _assemble_entry(self, recorded_url, records):
|
||||
if recorded_url.response_recorder:
|
||||
if recorded_url.response_recorder.payload_digest.name == "sha1":
|
||||
sha1base32 = base64.b32encode(recorded_url.response_recorder.payload_digest.digest()).decode("utf-8")
|
||||
@ -87,10 +90,26 @@ class RethinkCaptures:
|
||||
"length": records[0].length,
|
||||
}
|
||||
|
||||
result = self.r.table(self.table).insert(entry).run()
|
||||
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
|
||||
raise Exception("unexpected result %s saving %s", result, entry)
|
||||
self.logger.debug("big capture table db saved %s", entry)
|
||||
return entry
|
||||
|
||||
def _save_entry(self, entry):
|
||||
try:
|
||||
result = self.r.table(self.table).insert(entry).run()
|
||||
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
|
||||
raise Exception("unexpected result %s saving %s", result, entry)
|
||||
self.logger.debug("big capture table db saved %s", entry)
|
||||
except:
|
||||
self.logger.error("unexpected problem ", exc_info=True)
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
entry = self._assemble_entry(recorded_url, records)
|
||||
self._executor.submit(self._save_entry, entry)
|
||||
|
||||
def close(self):
|
||||
self.logger.info("waiting for ~%s tasks to finish",
|
||||
self._executor._work_queue.qsize() + (self._executor._max_workers/2))
|
||||
self._executor.shutdown(wait=True)
|
||||
self.logger.info("shut down complete")
|
||||
|
||||
class RethinkCapturesDedup:
|
||||
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
|
||||
@ -114,4 +133,4 @@ class RethinkCapturesDedup:
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.captures_db.close()
|
||||
|
Loading…
x
Reference in New Issue
Block a user