use nicer rethinkdbstuff.Rethinker api

This commit is contained in:
Noah Levitt 2015-09-22 01:31:24 +00:00
parent f90c3a6403
commit 3b9345e7d7
6 changed files with 48 additions and 55 deletions

View File

@ -1,13 +1,11 @@
certauth>=1.1.0
rethinkdb
git+https://github.com/internetarchive/warctools.git
kafka-python
.
# -e .
git+https://github.com/nlevitt/surt.git@py3
# -e /home/nlevitt/workspace/surt
git+https://github.com/nlevitt/rethinkstuff.git
.
https://github.com/nlevitt/pyrethink.git
# -e /home/nlevitt/workspace/pyrethink
# -e /home/nlevitt/workspace/surt
# -e /home/nlevitt/workspace/rethinkstuff
# -e .

View File

@ -4,21 +4,18 @@ from __future__ import absolute_import
import logging
from hanzo import warctools
import rethinkdb
r = rethinkdb
import random
import warcprox
import base64
import surt
import os
import hashlib
import pyrethink
class RethinkCaptures:
logger = logging.getLogger("warcprox.bigtables.RethinkCaptures")
def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3, options=warcprox.Options()):
self.r = pyrethink.Rethinker(servers, db)
def __init__(self, r, table="captures", shards=3, replicas=3, options=warcprox.Options()):
self.r = r
self.table = table
self.shards = shards
self.replicas = replicas
@ -26,22 +23,22 @@ class RethinkCaptures:
self._ensure_db_table()
def _ensure_db_table(self):
dbs = self.r.run(r.db_list())
dbs = self.r.db_list().run()
if not self.r.db in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
self.r.run(r.db_create(self.r.db))
tables = self.r.run(r.table_list())
self.r.db_create(self.r.db).run()
tables = self.r.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db))
self.r.run(r.table_create(self.table, shards=self.shards, replicas=self.replicas))
self.r.run(r.table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]))
self.r.run(r.table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]))
self.r.table_create(self.table, shards=self.shards, replicas=self.replicas).run()
self.r.table(self.table).index_create("abbr_canon_surt_timesamp", [self.r.row["abbr_canon_surt"], self.r.row["timestamp"]]).run()
self.r.table(self.table).index_create("sha1_warc_type", [self.r.row["sha1base32"], self.r.row["warc_type"], self.r.row["bucket"]]).run()
def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
if algo != "sha1":
raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo))
sha1base32 = base64.b32encode(raw_digest).decode("utf-8")
results_iter = self.r.results_iter(r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type"))
results_iter = self.r.table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run()
results = list(results_iter)
if len(results) > 1:
raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32)
@ -90,7 +87,7 @@ class RethinkCaptures:
"length": records[0].length,
}
result = self.r.run(r.table(self.table).insert(entry))
result = self.r.table(self.table).insert(entry).run()
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
raise Exception("unexpected result %s saving %s", result, entry)
self.logger.debug("big capture table db saved %s", entry)

View File

@ -1,5 +1,3 @@
# vim:set sw=4 et:
from __future__ import absolute_import
try:
@ -15,10 +13,7 @@ import os
import json
from hanzo import warctools
import warcprox
import rethinkdb
r = rethinkdb
import random
import pyrethink
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -88,8 +83,8 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()):
self.r = pyrethink.Rethinker(servers, db)
def __init__(self, r, table="dedup", shards=3, replicas=3, options=warcprox.Options()):
self.r = r
self.table = table
self.shards = shards
self.replicas = replicas
@ -97,14 +92,14 @@ class RethinkDedupDb:
self.options = options
def _ensure_db_table(self):
dbs = self.r.run(r.db_list())
dbs = self.r.db_list().run()
if not self.r.db in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
self.r.run(r.db_create(self.r.db))
tables = self.r.run(r.table_list())
self.r.db_create(self.r.db).run()
tables = self.r.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db))
self.r.run(r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas))
self.r.table_create(self.table, primary_key="key", shards=self.shards, replicas=self.replicas).run()
def close(self):
pass
@ -119,7 +114,7 @@ class RethinkDedupDb:
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record = {'key':k,'url':url,'date':date,'id':record_id}
result = self.r.run(r.table(self.table).insert(record,conflict="replace"))
result = self.r.table(self.table).insert(record,conflict="replace").run()
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug('dedup db saved %s:%s', k, record)
@ -127,7 +122,7 @@ class RethinkDedupDb:
def lookup(self, digest_key, bucket=""):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
result = self.r.run(r.table(self.table).get(k))
result = self.r.table(self.table).get(k).run()
if result:
for x in result:
result[x] = result[x].encode("utf-8")

View File

@ -21,6 +21,7 @@ import threading
import certauth.certauth
import warcprox
import re
import rethinkstuff
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(prog=prog,
@ -120,12 +121,13 @@ def main(argv=sys.argv):
listeners = []
if args.rethinkdb_servers:
r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db)
if args.rethinkdb_big_table:
captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
captures_db = warcprox.bigtable.RethinkCaptures(r, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(captures_db, options=options)
listeners.append(captures_db)
else:
dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
dedup_db = warcprox.dedup.RethinkDedupDb(r, options=options)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
@ -135,7 +137,7 @@ def main(argv=sys.argv):
listeners.append(dedup_db)
if args.rethinkdb_servers:
stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
stats_db = warcprox.stats.RethinkStatsDb(r, options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
@ -183,5 +185,7 @@ def main(argv=sys.argv):
if __name__ == '__main__':
import gc
gc.set_debug(gc.DEBUG_LEAK)
main()

View File

@ -12,11 +12,8 @@ import logging
import os
import json
from hanzo import warctools
import rethinkdb
r = rethinkdb
import random
import warcprox
import pyrethink
def _empty_bucket(bucket):
return {
@ -106,8 +103,8 @@ class StatsDb:
class RethinkStatsDb:
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()):
self.r = pyrethink.Rethinker(servers, db)
def __init__(self, r, table="stats", shards=3, replicas=3, options=warcprox.Options()):
self.r = r
self.table = table
self.shards = shards
self.replicas = replicas
@ -115,14 +112,14 @@ class RethinkStatsDb:
self.options = options
def _ensure_db_table(self):
dbs = self.r.run(r.db_list())
dbs = self.r.db_list().run()
if not self.r.db in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
self.r.run(r.db_create(self.r.db))
tables = self.r.run(r.table_list())
self.r.db_create(self.r.db).run()
tables = self.r.table_list().run()
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.r.db))
self.r.run(r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas))
self.r.table_create(self.table, primary_key="bucket", shards=self.shards, replicas=self.replicas).run()
def close(self):
pass
@ -132,7 +129,7 @@ class RethinkStatsDb:
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
# XXX use pluck?
bucket0_stats = self.r.run(r.table(self.table).get(bucket0))
bucket0_stats = self.r.table(self.table).get(bucket0).run()
self.logger.debug('stats db lookup of bucket=%s returned %s', bucket0, bucket0_stats)
if bucket0_stats:
if bucket1:
@ -166,7 +163,7 @@ class RethinkStatsDb:
bucket_stats["new"]["wire_bytes"] += recorded_url.size
self.logger.debug("saving %s", bucket_stats)
result = self.r.run(r.table(self.table).insert(bucket_stats, conflict="replace"))
result = self.r.table(self.table).insert(bucket_stats, conflict="replace").run()
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record)

View File

@ -15,9 +15,8 @@ import shutil
import requests
import re
import json
import rethinkdb
r = rethinkdb
import random
import rethinkstuff
from hanzo import warctools
try:
@ -143,12 +142,13 @@ def captures_db(request, rethinkdb_servers, rethinkdb_big_table):
servers = rethinkdb_servers.split(",")
if rethinkdb_big_table:
db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
captures_db = warcprox.bigtable.RethinkCaptures(servers, db)
r = rethinkstuff.Rethinker(servers, db)
captures_db = warcprox.bigtable.RethinkCaptures(r)
def fin():
if captures_db:
logging.info('dropping rethinkdb database {}'.format(db))
result = captures_db.r.run(r.db_drop(db))
result = captures_db.r.db_drop(db).run()
logging.info("result=%s", result)
request.addfinalizer(fin)
@ -163,13 +163,14 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db):
else:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
ddb = warcprox.dedup.RethinkDedupDb(servers, db)
r = rethinkstuff.Rethinker(servers, db)
ddb = warcprox.dedup.RethinkDedupDb(r)
def fin():
if rethinkdb_servers:
if not captures_db:
logging.info('dropping rethinkdb database {}'.format(db))
result = ddb.r.run(r.db_drop(db))
result = ddb.r.db_drop(db).run()
logging.info("result=%s", result)
request.addfinalizer(fin)
@ -198,7 +199,8 @@ def stats_db(request, rethinkdb_servers):
if rethinkdb_servers:
servers = rethinkdb_servers.split(",")
db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8))
sdb = warcprox.stats.RethinkStatsDb(servers, db)
r = rethinkstuff.Rethinker(servers, db)
sdb = warcprox.stats.RethinkStatsDb(r)
else:
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False)
f.close()
@ -208,7 +210,7 @@ def stats_db(request, rethinkdb_servers):
def fin():
if rethinkdb_servers:
logging.info('dropping rethinkdb database {}'.format(db))
result = sdb.r.run(r.db_drop(db))
result = sdb.r.db_drop(db).run()
logging.info("result=%s", result)
else:
logging.info('deleting file {}'.format(stats_db_file))