diff --git a/requirements.txt b/requirements.txt index a320b31..810de6a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,13 @@ certauth>=1.1.0 rethinkdb git+https://github.com/internetarchive/warctools.git -git+https://github.com/nlevitt/surt.git@py3 kafka-python + +. +# -e . + +git+https://github.com/nlevitt/surt.git@py3 +# -e /home/nlevitt/workspace/surt + +https://github.com/nlevitt/pyrethink.git +# -e /home/nlevitt/workspace/pyrethink diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 875bea2..e437074 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -17,44 +17,10 @@ class Options(_Namespace): except AttributeError: return None -class Rethinker: - import logging - logger = logging.getLogger("warcprox.Rethinker") - - def __init__(self, servers=["localhost"], db=None): - self.servers = servers - self.db = db - - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - import rethinkdb as r - import random - while True: - server = random.choice(self.servers) - try: - try: - host, port = server.split(":") - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - except Exception as e: - self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True) - import time - time.sleep(0.5) - - def run(self, query): - import rethinkdb as r - while True: - with self._random_server_connection() as conn: - try: - return query.run(conn, db=self.db) - except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: - self.logger.error("will retry rethinkdb query/operation %s which failed like so:", query, exc_info=True) - version_bytes = _read_version_bytes().strip() version_str = version_bytes.decode('utf-8') +# XXX linux-specific def gettid(): try: import ctypes @@ -63,7 +29,7 @@ def gettid(): tid = libc.syscall(SYS_gettid) return tid except: - logging.warn("gettid failed?") + logging.warn("gettid failed?", exc_info=True) import warcprox.controller as controller import warcprox.playback as playback diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 917edab..551fdca 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -12,12 +12,13 @@ import base64 import surt import os import hashlib +import pyrethink class RethinkCaptures: logger = logging.getLogger("warcprox.bigtables.RethinkCaptures") def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()): - self.r = warcprox.Rethinker(servers, db) + self.r = pyrethink.Rethinker(servers, db) self.table = table self.shards = shards self.replicas = replicas @@ -40,8 +41,8 @@ class RethinkCaptures: if algo != "sha1": raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") - cursor = self.r.run(r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type")) - results = list(cursor) + results_iter = self.r.results_iter(r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type")) + results = list(results_iter) if len(results) > 1: raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) elif len(results) == 1: diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 4eea112..368054a 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -18,6 +18,7 @@ import warcprox import rethinkdb r = rethinkdb import random +import pyrethink class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -88,7 +89,7 @@ class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()): - self.r = warcprox.Rethinker(servers, db) + self.r = pyrethink.Rethinker(servers, db) self.table = table self.shards = shards self.replicas = replicas diff --git a/warcprox/stats.py b/warcprox/stats.py index 852975f..3c16833 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -16,6 +16,7 @@ import rethinkdb r = rethinkdb import random import warcprox +import pyrethink def _empty_bucket(bucket): return { @@ -106,7 +107,7 @@ class RethinkStatsDb: logger = logging.getLogger("warcprox.stats.RethinkStatsDb") def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()): - self.r = warcprox.Rethinker(servers, db) + self.r = pyrethink.Rethinker(servers, db) self.table = table self.shards = shards self.replicas = replicas