mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
wrap rethinkdb operations and retry if appropriate (as best as we can tell)
This commit is contained in:
parent
44a62111fb
commit
022f6e7215
@ -1,5 +1,3 @@
|
||||
# vim:set sw=4 et:
|
||||
|
||||
from argparse import Namespace as _Namespace
|
||||
|
||||
def digest_str(hash_obj, base32):
|
||||
@ -19,6 +17,40 @@ class Options(_Namespace):
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
class Rethinker:
|
||||
import logging
|
||||
logger = logging.getLogger("warcprox.Rethinker")
|
||||
|
||||
def __init__(self, servers=["localhost"], db=None):
|
||||
self.servers = servers
|
||||
self.db = db
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
import rethinkdb as r
|
||||
import random
|
||||
while True:
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
try:
|
||||
host, port = server.split(":")
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
except Exception as e:
|
||||
self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True)
|
||||
import time
|
||||
time.sleep(0.5)
|
||||
|
||||
def run(self, query):
|
||||
while True:
|
||||
with self._random_server_connection() as conn:
|
||||
try:
|
||||
return query.run(conn, db=self.db)
|
||||
except (ReqlAvailabilityError, ReqlTimeoutError) as e:
|
||||
self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True)
|
||||
|
||||
version_bytes = _read_version_bytes().strip()
|
||||
version_str = version_bytes.decode('utf-8')
|
||||
|
||||
|
@ -16,52 +16,39 @@ class RethinkCaptures:
|
||||
logger = logging.getLogger("warcprox.dedup.RethinkCaptures")
|
||||
|
||||
def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()):
|
||||
self.servers = servers
|
||||
self.db = db
|
||||
self.r = warcprox.Rethinker(servers, db)
|
||||
self.table = table
|
||||
self.shards = shards
|
||||
self.replicas = replicas
|
||||
self.options = options
|
||||
self._ensure_db_table()
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
host, port = server.split(":")
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
|
||||
def _ensure_db_table(self):
|
||||
with self._random_server_connection() as conn:
|
||||
dbs = r.db_list().run(conn)
|
||||
if not self.db in dbs:
|
||||
self.logger.info("creating rethinkdb database %s", repr(self.db))
|
||||
r.db_create(self.db).run(conn)
|
||||
tables = r.db(self.db).table_list().run(conn)
|
||||
if not self.table in tables:
|
||||
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
|
||||
r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn)
|
||||
r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn)
|
||||
r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run(conn)
|
||||
dbs = self.r.run(r.db_list())
|
||||
if not self.r.db in dbs:
|
||||
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
|
||||
self.r.run(r.db_create(self.r.db))
|
||||
tables = self.r.run(r.table_list())
|
||||
if not self.table in tables:
|
||||
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db))
|
||||
self.r.run(r.table_create(self.table, shards=self.shards, replicas=self.replicas))
|
||||
self.r.run(r.table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]))
|
||||
self.r.run(r.table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]))
|
||||
|
||||
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
|
||||
if algo != "sha1":
|
||||
raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo))
|
||||
sha1base32 = base64.b32encode(raw_digest).decode("utf-8")
|
||||
with self._random_server_connection() as conn:
|
||||
cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run(conn)
|
||||
results = list(cursor)
|
||||
if len(results) > 1:
|
||||
raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32)
|
||||
elif len(results) == 1:
|
||||
result = results[0]
|
||||
else:
|
||||
result = None
|
||||
self.logger.info("returning %s for sha1base32=%s", result, sha1base32)
|
||||
return result
|
||||
cursor = self.r.run(r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type"))
|
||||
results = list(cursor)
|
||||
if len(results) > 1:
|
||||
raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32)
|
||||
elif len(results) == 1:
|
||||
result = results[0]
|
||||
else:
|
||||
result = None
|
||||
self.logger.info("returning %s for sha1base32=%s", result, sha1base32)
|
||||
return result
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
if not recorded_url.response_recorder:
|
||||
@ -94,11 +81,10 @@ class RethinkCaptures:
|
||||
"bucket": bucket,
|
||||
}
|
||||
|
||||
with self._random_server_connection() as conn:
|
||||
result = r.db(self.db).table(self.table).insert(entry).run(conn)
|
||||
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
|
||||
raise Exception("unexpected result %s saving %s", result, entry)
|
||||
self.logger.info("big capture table db saved %s", entry)
|
||||
result = self.r.run(r.table(self.table).insert(entry))
|
||||
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
|
||||
raise Exception("unexpected result %s saving %s", result, entry)
|
||||
self.logger.info("big capture table db saved %s", entry)
|
||||
|
||||
class RethinkCapturesDedup:
|
||||
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
|
||||
|
@ -88,34 +88,22 @@ class RethinkDedupDb:
|
||||
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
||||
|
||||
def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()):
|
||||
self.servers = servers
|
||||
self.db = db
|
||||
self.r = warcprox.Rethinker(servers, db)
|
||||
self.table = table
|
||||
self.shards = shards
|
||||
self.replicas = replicas
|
||||
self._ensure_db_table()
|
||||
self.options = options
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
host, port = server.split(":")
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
|
||||
def _ensure_db_table(self):
|
||||
with self._random_server_connection() as conn:
|
||||
dbs = r.db_list().run(conn)
|
||||
if not self.db in dbs:
|
||||
self.logger.info("creating rethinkdb database %s", repr(self.db))
|
||||
r.db_create(self.db).run(conn)
|
||||
tables = r.db(self.db).table_list().run(conn)
|
||||
if not self.table in tables:
|
||||
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
|
||||
r.db(self.db).table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run(conn)
|
||||
dbs = self.r.run(r.db_list())
|
||||
if not self.r.db in dbs:
|
||||
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
|
||||
self.r.run(r.db_create(self.r.db))
|
||||
tables = self.r.run(r.table_list())
|
||||
if not self.table in tables:
|
||||
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db))
|
||||
self.r.run(r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas))
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
@ -130,22 +118,20 @@ class RethinkDedupDb:
|
||||
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
|
||||
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
|
||||
record = {'key':k,'url':url,'date':date,'id':record_id}
|
||||
with self._random_server_connection() as conn:
|
||||
result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn)
|
||||
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||
raise Exception("unexpected result %s saving %s", result, record)
|
||||
self.logger.debug('dedup db saved %s:%s', k, record)
|
||||
result = self.r.run(r.table(self.table).insert(record,conflict="replace"))
|
||||
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||
raise Exception("unexpected result %s saving %s", result, record)
|
||||
self.logger.debug('dedup db saved %s:%s', k, record)
|
||||
|
||||
def lookup(self, digest_key, bucket=""):
|
||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||
k = "{}|{}".format(k, bucket)
|
||||
with self._random_server_connection() as conn:
|
||||
result = r.db(self.db).table(self.table).get(k).run(conn)
|
||||
if result:
|
||||
for x in result:
|
||||
result[x] = result[x].encode("utf-8")
|
||||
self.logger.debug('dedup db lookup of key=%s returning %s', k, result)
|
||||
return result
|
||||
result = self.r.run(r.table(self.table).get(k))
|
||||
if result:
|
||||
for x in result:
|
||||
result[x] = result[x].encode("utf-8")
|
||||
self.logger.debug('dedup db lookup of key=%s returning %s', k, result)
|
||||
return result
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
||||
|
@ -108,34 +108,22 @@ class RethinkStatsDb:
|
||||
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
|
||||
|
||||
def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()):
|
||||
self.servers = servers
|
||||
self.db = db
|
||||
self.r = warcprox.Rethinker(servers, db)
|
||||
self.table = table
|
||||
self.shards = shards
|
||||
self.replicas = replicas
|
||||
self._ensure_db_table()
|
||||
self.options = options
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
host, port = server.split(":")
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
|
||||
def _ensure_db_table(self):
|
||||
with self._random_server_connection() as conn:
|
||||
dbs = r.db_list().run(conn)
|
||||
if not self.db in dbs:
|
||||
self.logger.info("creating rethinkdb database %s", repr(self.db))
|
||||
r.db_create(self.db).run(conn)
|
||||
tables = r.db(self.db).table_list().run(conn)
|
||||
if not self.table in tables:
|
||||
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
|
||||
r.db(self.db).table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run(conn)
|
||||
dbs = self.r.run(r.db_list())
|
||||
if not self.r.db in dbs:
|
||||
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
|
||||
self.r.run(r.db_create(self.r.db))
|
||||
tables = self.r.run(r.table_list())
|
||||
if not self.table in tables:
|
||||
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db))
|
||||
self.r.run(r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas))
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
@ -145,16 +133,15 @@ class RethinkStatsDb:
|
||||
|
||||
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
|
||||
# XXX use pluck?
|
||||
with self._random_server_connection() as conn:
|
||||
bucket0_stats = r.db(self.db).table(self.table).get(bucket0).run(conn)
|
||||
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
|
||||
if bucket0_stats:
|
||||
if bucket1:
|
||||
if bucket2:
|
||||
return bucket0_stats[bucket1][bucket2]
|
||||
else:
|
||||
return bucket0_stats[bucket1]
|
||||
return bucket0_stats
|
||||
bucket0_stats = self.r.run(r.table(self.table).get(bucket0))
|
||||
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
|
||||
if bucket0_stats:
|
||||
if bucket1:
|
||||
if bucket2:
|
||||
return bucket0_stats[bucket1][bucket2]
|
||||
else:
|
||||
return bucket0_stats[bucket1]
|
||||
return bucket0_stats
|
||||
|
||||
def tally(self, recorded_url, records):
|
||||
buckets = ["__all__"]
|
||||
@ -166,24 +153,23 @@ class RethinkStatsDb:
|
||||
else:
|
||||
buckets.append("__unspecified__")
|
||||
|
||||
with self._random_server_connection() as conn:
|
||||
for bucket in buckets:
|
||||
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
||||
for bucket in buckets:
|
||||
bucket_stats = self.value(bucket) or _empty_bucket(bucket)
|
||||
|
||||
bucket_stats["total"]["urls"] += 1
|
||||
bucket_stats["total"]["wire_bytes"] += recorded_url.size
|
||||
bucket_stats["total"]["urls"] += 1
|
||||
bucket_stats["total"]["wire_bytes"] += recorded_url.size
|
||||
|
||||
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
|
||||
bucket_stats["revisit"]["urls"] += 1
|
||||
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
|
||||
else:
|
||||
bucket_stats["new"]["urls"] += 1
|
||||
bucket_stats["new"]["wire_bytes"] += recorded_url.size
|
||||
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
|
||||
bucket_stats["revisit"]["urls"] += 1
|
||||
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
|
||||
else:
|
||||
bucket_stats["new"]["urls"] += 1
|
||||
bucket_stats["new"]["wire_bytes"] += recorded_url.size
|
||||
|
||||
self.logger.debug("saving %s", bucket_stats)
|
||||
result = r.db(self.db).table(self.table).insert(bucket_stats, conflict="replace").run(conn)
|
||||
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||
raise Exception("unexpected result %s saving %s", result, record)
|
||||
self.logger.debug("saving %s", bucket_stats)
|
||||
result = self.r.run(r.table(self.table).insert(bucket_stats, conflict="replace"))
|
||||
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
|
||||
raise Exception("unexpected result %s saving %s", result, record)
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
self.tally(recorded_url, records)
|
||||
|
@ -148,9 +148,8 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
|
||||
def fin():
|
||||
if captures_db:
|
||||
logging.info('dropping rethinkdb database {}'.format(db))
|
||||
with captures_db._random_server_connection() as conn:
|
||||
result = r.db_drop(db).run(conn)
|
||||
logging.info("result=%s", result)
|
||||
result = captures_db.r.run(r.db_drop(db))
|
||||
logging.info("result=%s", result)
|
||||
request.addfinalizer(fin)
|
||||
|
||||
return captures_db
|
||||
@ -170,9 +169,8 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db):
|
||||
if rethinkdb_servers:
|
||||
if not captures_db:
|
||||
logging.info('dropping rethinkdb database {}'.format(db))
|
||||
with ddb._random_server_connection() as conn:
|
||||
result = r.db_drop(db).run(conn)
|
||||
logging.info("result=%s", result)
|
||||
result = ddb.r.run(r.db_drop(db))
|
||||
logging.info("result=%s", result)
|
||||
request.addfinalizer(fin)
|
||||
|
||||
return ddb
|
||||
@ -210,9 +208,8 @@ def stats_db(request, rethinkdb_servers):
|
||||
def fin():
|
||||
if rethinkdb_servers:
|
||||
logging.info('dropping rethinkdb database {}'.format(db))
|
||||
with sdb._random_server_connection() as conn:
|
||||
result = r.db_drop(db).run(conn)
|
||||
logging.info("result=%s", result)
|
||||
result = sdb.r.run(r.db_drop(db))
|
||||
logging.info("result=%s", result)
|
||||
else:
|
||||
logging.info('deleting file {}'.format(stats_db_file))
|
||||
os.unlink(stats_db_file)
|
||||
|
Loading…
x
Reference in New Issue
Block a user