From 022f6e72157218520955d4aac2f4be1e759ba59d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 27 Aug 2015 23:57:12 +0000 Subject: [PATCH] wrap rethinkdb operations and retry if appropriate (as best as we can tell) --- warcprox/__init__.py | 36 ++++++++++++++- warcprox/bigtable.py | 64 +++++++++++---------------- warcprox/dedup.py | 52 ++++++++-------------- warcprox/stats.py | 78 ++++++++++++++------------------- warcprox/tests/test_warcprox.py | 15 +++---- 5 files changed, 116 insertions(+), 129 deletions(-) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 994b919..703952e 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,5 +1,3 @@ -# vim:set sw=4 et: - from argparse import Namespace as _Namespace def digest_str(hash_obj, base32): @@ -19,6 +17,40 @@ class Options(_Namespace): except AttributeError: return None +class Rethinker: + import logging + logger = logging.getLogger("warcprox.Rethinker") + + def __init__(self, servers=["localhost"], db=None): + self.servers = servers + self.db = db + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + import rethinkdb as r + import random + while True: + server = random.choice(self.servers) + try: + try: + host, port = server.split(":") + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + except Exception as e: + self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True) + import time + time.sleep(0.5) + + def run(self, query): + while True: + with self._random_server_connection() as conn: + try: + return query.run(conn, db=self.db) + except (ReqlAvailabilityError, ReqlTimeoutError) as e: + self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True) + version_bytes = _read_version_bytes().strip() version_str = version_bytes.decode('utf-8') diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index aecb4ed..3c695db 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -16,52 +16,39 @@ class RethinkCaptures: logger = logging.getLogger("warcprox.dedup.RethinkCaptures") def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()): - self.servers = servers - self.db = db + self.r = warcprox.Rethinker(servers, db) self.table = table self.shards = shards self.replicas = replicas self.options = options self._ensure_db_table() - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - server = random.choice(self.servers) - try: - host, port = server.split(":") - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - def _ensure_db_table(self): - with self._random_server_connection() as conn: - dbs = r.db_list().run(conn) - if not self.db in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.db)) - r.db_create(self.db).run(conn) - tables = r.db(self.db).table_list().run(conn) - if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) - r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn) - r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) - r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run(conn) + dbs = self.r.run(r.db_list()) + if not self.r.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.r.db)) + self.r.run(r.db_create(self.r.db)) + tables = self.r.run(r.table_list()) + if not self.table in tables: + self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db)) + self.r.run(r.table_create(self.table, shards=self.shards, replicas=self.replicas)) + self.r.run(r.table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]])) + self.r.run(r.table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]])) def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") - with self._random_server_connection() as conn: - cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run(conn) - results = list(cursor) - if len(results) > 1: - raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) - elif len(results) == 1: - result = results[0] - else: - result = None - self.logger.info("returning %s for sha1base32=%s", result, sha1base32) - return result + cursor = self.r.run(r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type")) + results = list(cursor) + if len(results) > 1: + raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) + elif len(results) == 1: + result = results[0] + else: + result = None + self.logger.info("returning %s for sha1base32=%s", result, sha1base32) + return result def notify(self, recorded_url, records): if not recorded_url.response_recorder: @@ -94,11 +81,10 @@ class RethinkCaptures: "bucket": bucket, } - with self._random_server_connection() as conn: - result = r.db(self.db).table(self.table).insert(entry).run(conn) - if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: - raise Exception("unexpected result %s saving %s", result, entry) - self.logger.info("big capture table db saved %s", entry) + result = self.r.run(r.table(self.table).insert(entry)) + if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: + raise Exception("unexpected result %s saving %s", result, entry) + self.logger.info("big capture table db saved %s", entry) class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7148773..4eea112 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -88,34 +88,22 @@ class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()): - self.servers = servers - self.db = db + self.r = warcprox.Rethinker(servers, db) self.table = table self.shards = shards self.replicas = replicas self._ensure_db_table() self.options = options - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - server = random.choice(self.servers) - try: - host, port = server.split(":") - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - def _ensure_db_table(self): - with self._random_server_connection() as conn: - dbs = r.db_list().run(conn) - if not self.db in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.db)) - r.db_create(self.db).run(conn) - tables = r.db(self.db).table_list().run(conn) - if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) - r.db(self.db).table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run(conn) + dbs = self.r.run(r.db_list()) + if not self.r.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.r.db)) + self.r.run(r.db_create(self.r.db)) + tables = self.r.run(r.table_list()) + if not self.table in tables: + self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db)) + self.r.run(r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas)) def close(self): pass @@ -130,22 +118,20 @@ class RethinkDedupDb: url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') record = {'key':k,'url':url,'date':date,'id':record_id} - with self._random_server_connection() as conn: - result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn) - if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: - raise Exception("unexpected result %s saving %s", result, record) - self.logger.debug('dedup db saved %s:%s', k, record) + result = self.r.run(r.table(self.table).insert(record,conflict="replace")) + if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: + raise Exception("unexpected result %s saving %s", result, record) + self.logger.debug('dedup db saved %s:%s', k, record) def lookup(self, digest_key, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) - with self._random_server_connection() as conn: - result = r.db(self.db).table(self.table).get(k).run(conn) - if result: - for x in result: - result[x] = result[x].encode("utf-8") - self.logger.debug('dedup db lookup of key=%s returning %s', k, result) - return result + result = self.r.run(r.table(self.table).get(k)) + if result: + for x in result: + result[x] = result[x].encode("utf-8") + self.logger.debug('dedup db lookup of key=%s returning %s', k, result) + return result def notify(self, recorded_url, records): if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE diff --git a/warcprox/stats.py b/warcprox/stats.py index d246d69..6f6c04d 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -108,34 +108,22 @@ class RethinkStatsDb: logger = logging.getLogger("warcprox.stats.RethinkStatsDb") def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()): - self.servers = servers - self.db = db + self.r = warcprox.Rethinker(servers, db) self.table = table self.shards = shards self.replicas = replicas self._ensure_db_table() self.options = options - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - server = random.choice(self.servers) - try: - host, port = server.split(":") - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - def _ensure_db_table(self): - with self._random_server_connection() as conn: - dbs = r.db_list().run(conn) - if not self.db in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.db)) - r.db_create(self.db).run(conn) - tables = r.db(self.db).table_list().run(conn) - if not self.table in tables: - self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) - r.db(self.db).table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run(conn) + dbs = self.r.run(r.db_list()) + if not self.r.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.r.db)) + self.r.run(r.db_create(self.r.db)) + tables = self.r.run(r.table_list()) + if not self.table in tables: + self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db)) + self.r.run(r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas)) def close(self): pass @@ -145,16 +133,15 @@ class RethinkStatsDb: def value(self, bucket0="__all__", bucket1=None, bucket2=None): # XXX use pluck? - with self._random_server_connection() as conn: - bucket0_stats = r.db(self.db).table(self.table).get(bucket0).run(conn) - self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats) - if bucket0_stats: - if bucket1: - if bucket2: - return bucket0_stats[bucket1][bucket2] - else: - return bucket0_stats[bucket1] - return bucket0_stats + bucket0_stats = self.r.run(r.table(self.table).get(bucket0)) + self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats) + if bucket0_stats: + if bucket1: + if bucket2: + return bucket0_stats[bucket1][bucket2] + else: + return bucket0_stats[bucket1] + return bucket0_stats def tally(self, recorded_url, records): buckets = ["__all__"] @@ -166,24 +153,23 @@ class RethinkStatsDb: else: buckets.append("__unspecified__") - with self._random_server_connection() as conn: - for bucket in buckets: - bucket_stats = self.value(bucket) or _empty_bucket(bucket) + for bucket in buckets: + bucket_stats = self.value(bucket) or _empty_bucket(bucket) - bucket_stats["total"]["urls"] += 1 - bucket_stats["total"]["wire_bytes"] += recorded_url.size + bucket_stats["total"]["urls"] += 1 + bucket_stats["total"]["wire_bytes"] += recorded_url.size - if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size + if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size - self.logger.debug("saving %s", bucket_stats) - result = r.db(self.db).table(self.table).insert(bucket_stats, conflict="replace").run(conn) - if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: - raise Exception("unexpected result %s saving %s", result, record) + self.logger.debug("saving %s", bucket_stats) + result = self.r.run(r.table(self.table).insert(bucket_stats, conflict="replace")) + if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: + raise Exception("unexpected result %s saving %s", result, record) def notify(self, recorded_url, records): self.tally(recorded_url, records) diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index e588754..0331c7b 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -148,9 +148,8 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table): def fin(): if captures_db: logging.info('dropping rethinkdb database {}'.format(db)) - with captures_db._random_server_connection() as conn: - result = r.db_drop(db).run(conn) - logging.info("result=%s", result) + result = captures_db.r.run(r.db_drop(db)) + logging.info("result=%s", result) request.addfinalizer(fin) return captures_db @@ -170,9 +169,8 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db): if rethinkdb_servers: if not captures_db: logging.info('dropping rethinkdb database {}'.format(db)) - with ddb._random_server_connection() as conn: - result = r.db_drop(db).run(conn) - logging.info("result=%s", result) + result = ddb.r.run(r.db_drop(db)) + logging.info("result=%s", result) request.addfinalizer(fin) return ddb @@ -210,9 +208,8 @@ def stats_db(request, rethinkdb_servers): def fin(): if rethinkdb_servers: logging.info('dropping rethinkdb database {}'.format(db)) - with sdb._random_server_connection() as conn: - result = r.db_drop(db).run(conn) - logging.info("result=%s", result) + result = sdb.r.run(r.db_drop(db)) + logging.info("result=%s", result) else: logging.info('deleting file {}'.format(stats_db_file)) os.unlink(stats_db_file)