diff --git a/requirements.txt b/requirements.txt index 810de6a..b00387c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,11 @@ certauth>=1.1.0 -rethinkdb git+https://github.com/internetarchive/warctools.git kafka-python -. -# -e . - git+https://github.com/nlevitt/surt.git@py3 -# -e /home/nlevitt/workspace/surt +git+https://github.com/nlevitt/rethinkstuff.git +. -https://github.com/nlevitt/pyrethink.git -# -e /home/nlevitt/workspace/pyrethink +# -e /home/nlevitt/workspace/surt +# -e /home/nlevitt/workspace/rethinkstuff +# -e . diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 551fdca..a1620e2 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -4,21 +4,18 @@ from __future__ import absolute_import import logging from hanzo import warctools -import rethinkdb -r = rethinkdb import random import warcprox import base64 import surt import os import hashlib -import pyrethink class RethinkCaptures: logger = logging.getLogger("warcprox.bigtables.RethinkCaptures") - def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()): - self.r = pyrethink.Rethinker(servers, db) + def __init__(self, r, table="captures", shards=3, replicas=3, options=warcprox.Options()): + self.r = r self.table = table self.shards = shards self.replicas = replicas @@ -26,22 +23,22 @@ class RethinkCaptures: self._ensure_db_table() def _ensure_db_table(self): - dbs = self.r.run(r.db_list()) + dbs = self.r.db_list().run() if not self.r.db in dbs: self.logger.info("creating rethinkdb database %s", repr(self.r.db)) - self.r.run(r.db_create(self.r.db)) - tables = self.r.run(r.table_list()) + self.r.db_create(self.r.db).run() + tables = self.r.table_list().run() if not self.table in tables: self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db)) - self.r.run(r.table_create(self.table, shards=self.shards, replicas=self.replicas)) - self.r.run(r.table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]])) - self.r.run(r.table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]])) + self.r.table_create(self.table, shards=self.shards, replicas=self.replicas).run() + self.r.table(self.table).index_create("abbr_canon_surt_timesamp", [self.r.row["abbr_canon_surt"], self.r.row["timestamp"]]).run() + self.r.table(self.table).index_create("sha1_warc_type", [self.r.row["sha1base32"], self.r.row["warc_type"], self.r.row["bucket"]]).run() def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") - results_iter = self.r.results_iter(r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type")) + results_iter = self.r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run() results = list(results_iter) if len(results) > 1: raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) @@ -90,7 +87,7 @@ class RethinkCaptures: "length": records[0].length, } - result = self.r.run(r.table(self.table).insert(entry)) + result = self.r.table(self.table).insert(entry).run() if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: raise Exception("unexpected result %s saving %s", result, entry) self.logger.debug("big capture table db saved %s", entry) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 368054a..33e93af 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,5 +1,3 @@ -# vim:set sw=4 et: - from __future__ import absolute_import try: @@ -15,10 +13,7 @@ import os import json from hanzo import warctools import warcprox -import rethinkdb -r = rethinkdb import random -import pyrethink class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -88,8 +83,8 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") - def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()): - self.r = pyrethink.Rethinker(servers, db) + def __init__(self, r, table="dedup", shards=3, replicas=3, options=warcprox.Options()): + self.r = r self.table = table self.shards = shards self.replicas = replicas @@ -97,14 +92,14 @@ class RethinkDedupDb: self.options = options def _ensure_db_table(self): - dbs = self.r.run(r.db_list()) + dbs = self.r.db_list().run() if not self.r.db in dbs: self.logger.info("creating rethinkdb database %s", repr(self.r.db)) - self.r.run(r.db_create(self.r.db)) - tables = self.r.run(r.table_list()) + self.r.db_create(self.r.db).run() + tables = self.r.table_list().run() if not self.table in tables: self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db)) - self.r.run(r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas)) + self.r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run() def close(self): pass @@ -119,7 +114,7 @@ class RethinkDedupDb: url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') record = {'key':k,'url':url,'date':date,'id':record_id} - result = self.r.run(r.table(self.table).insert(record,conflict="replace")) + result = self.r.table(self.table).insert(record,conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) @@ -127,7 +122,7 @@ class RethinkDedupDb: def lookup(self, digest_key, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) - result = self.r.run(r.table(self.table).get(k)) + result = self.r.table(self.table).get(k).run() if result: for x in result: result[x] = result[x].encode("utf-8") diff --git a/warcprox/main.py b/warcprox/main.py index cd0cbbe..e647a7e 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -21,6 +21,7 @@ import threading import certauth.certauth import warcprox import re +import rethinkstuff def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser = argparse.ArgumentParser(prog=prog, @@ -120,12 +121,13 @@ def main(argv=sys.argv): listeners = [] if args.rethinkdb_servers: + r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db) if args.rethinkdb_big_table: - captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) + captures_db = warcprox.bigtable.RethinkCaptures(r, options=options) dedup_db = warcprox.bigtable.RethinkCapturesDedup(captures_db, options=options) listeners.append(captures_db) else: - dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) + dedup_db = warcprox.dedup.RethinkDedupDb(r, options=options) listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') @@ -135,7 +137,7 @@ def main(argv=sys.argv): listeners.append(dedup_db) if args.rethinkdb_servers: - stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) + stats_db = warcprox.stats.RethinkStatsDb(r, options=options) listeners.append(stats_db) elif args.stats_db_file in (None, '', '/dev/null'): logging.info('statistics tracking disabled') @@ -183,5 +185,7 @@ def main(argv=sys.argv): if __name__ == '__main__': + import gc + gc.set_debug(gc.DEBUG_LEAK) main() diff --git a/warcprox/stats.py b/warcprox/stats.py index 3c16833..61c573d 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -12,11 +12,8 @@ import logging import os import json from hanzo import warctools -import rethinkdb -r = rethinkdb import random import warcprox -import pyrethink def _empty_bucket(bucket): return { @@ -106,8 +103,8 @@ class StatsDb: class RethinkStatsDb: logger = logging.getLogger("warcprox.stats.RethinkStatsDb") - def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()): - self.r = pyrethink.Rethinker(servers, db) + def __init__(self, r, table="stats", shards=3, replicas=3, options=warcprox.Options()): + self.r = r self.table = table self.shards = shards self.replicas = replicas @@ -115,14 +112,14 @@ class RethinkStatsDb: self.options = options def _ensure_db_table(self): - dbs = self.r.run(r.db_list()) + dbs = self.r.db_list().run() if not self.r.db in dbs: self.logger.info("creating rethinkdb database %s", repr(self.r.db)) - self.r.run(r.db_create(self.r.db)) - tables = self.r.run(r.table_list()) + self.r.db_create(self.r.db).run() + tables = self.r.table_list().run() if not self.table in tables: self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db)) - self.r.run(r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas)) + self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run() def close(self): pass @@ -132,7 +129,7 @@ class RethinkStatsDb: def value(self, bucket0="__all__", bucket1=None, bucket2=None): # XXX use pluck? - bucket0_stats = self.r.run(r.table(self.table).get(bucket0)) + bucket0_stats = self.r.table(self.table).get(bucket0).run() self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats) if bucket0_stats: if bucket1: @@ -166,7 +163,7 @@ class RethinkStatsDb: bucket_stats["new"]["wire_bytes"] += recorded_url.size self.logger.debug("saving %s", bucket_stats) - result = self.r.run(r.table(self.table).insert(bucket_stats, conflict="replace")) + result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index a994810..badf9ed 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -15,9 +15,8 @@ import shutil import requests import re import json -import rethinkdb -r = rethinkdb import random +import rethinkstuff from hanzo import warctools try: @@ -143,12 +142,13 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table): servers = rethinkdb_servers.split(",") if rethinkdb_big_table: db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - captures_db = warcprox.bigtable.RethinkCaptures(servers, db) + r = rethinkstuff.Rethinker(servers, db) + captures_db = warcprox.bigtable.RethinkCaptures(r) def fin(): if captures_db: logging.info('dropping rethinkdb database {}'.format(db)) - result = captures_db.r.run(r.db_drop(db)) + result = captures_db.r.db_drop(db).run() logging.info("result=%s", result) request.addfinalizer(fin) @@ -163,13 +163,14 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db): else: servers = rethinkdb_servers.split(",") db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - ddb = warcprox.dedup.RethinkDedupDb(servers, db) + r = rethinkstuff.Rethinker(servers, db) + ddb = warcprox.dedup.RethinkDedupDb(r) def fin(): if rethinkdb_servers: if not captures_db: logging.info('dropping rethinkdb database {}'.format(db)) - result = ddb.r.run(r.db_drop(db)) + result = ddb.r.db_drop(db).run() logging.info("result=%s", result) request.addfinalizer(fin) @@ -198,7 +199,8 @@ def stats_db(request, rethinkdb_servers): if rethinkdb_servers: servers = rethinkdb_servers.split(",") db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - sdb = warcprox.stats.RethinkStatsDb(servers, db) + r = rethinkstuff.Rethinker(servers, db) + sdb = warcprox.stats.RethinkStatsDb(r) else: f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) f.close() @@ -208,7 +210,7 @@ def stats_db(request, rethinkdb_servers): def fin(): if rethinkdb_servers: logging.info('dropping rethinkdb database {}'.format(db)) - result = sdb.r.run(r.db_drop(db)) + result = sdb.r.db_drop(db).run() logging.info("result=%s", result) else: logging.info('deleting file {}'.format(stats_db_file))