new convention, call Rethinker object rr (mnemonic RethinkeR, or *double*[think]), leave rethinkdb as r per the rethinkdb convention

This commit is contained in:
Noah Levitt 2017-03-01 11:20:27 -08:00
parent 536bf8d1d8
commit c14bae6050
6 changed files with 100 additions and 100 deletions

View File

@ -25,9 +25,9 @@ Usage Example
:: ::
import doublethink import doublethink
r = doublethink.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db') rr = doublethink.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db')
r.table('mytable').insert({'foo':'bar','baz':2}).run() rr.table('mytable').insert({'foo':'bar','baz':2}).run()
for result in r.table('mytable'): for result in rr.table('mytable'):
print("result={}".format(result)) print("result={}".format(result))
ORM ORM
@ -42,16 +42,16 @@ Usage Example
import doublethink import doublethink
r = doublethink.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db') rr = doublethink.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db')
class MyTable(doublethink.Document): class MyTable(doublethink.Document):
pass pass
MyTable.table_create() MyTable.table_create(rr)
doc1 = MyTable(r, {'animal': 'elephant', 'size': 'large'}) doc1 = MyTable(rr, {'animal': 'elephant', 'size': 'large'})
doc1.save() doc1.save()
doc1_copy = MyTable.get(r, doc1.id) doc1_copy = MyTable.get(rr, doc1.id)
doc1_copy.food = 'bread' doc1_copy.food = 'bread'
doc1_copy.save() doc1_copy.save()

View File

@ -158,25 +158,25 @@ class Document(dict, object):
return cls.__name__.lower() return cls.__name__.lower()
@classmethod @classmethod
def load(cls, rethinker, pk): def load(cls, rr, pk):
''' '''
Retrieves a document from the database, by primary key. Retrieves a document from the database, by primary key.
''' '''
doc = cls(rethinker) doc = cls(rr)
doc[doc.pk_field] = pk doc[doc.pk_field] = pk
doc.refresh() doc.refresh()
return doc return doc
@classmethod @classmethod
def table_create(cls, rethinker): def table_create(cls, rr):
''' '''
Creates the table. Subclasses may want to override this method to do Creates the table. Subclasses may want to override this method to do
more things, such as creating secondary indexes. more things, such as creating secondary indexes.
''' '''
rethinker.table_create(cls.table).run() rr.table_create(cls.table).run()
def __init__(self, rethinker, d={}): def __init__(self, rr, d={}):
self._r = rethinker dict.__setattr__(self, 'rr', rr)
self._pk = None self._pk = None
self._clear_updates() self._clear_updates()
for k in d: for k in d:
@ -235,14 +235,14 @@ class Document(dict, object):
''' '''
if not self._pk: if not self._pk:
try: try:
pk = self._r.db('rethinkdb').table('table_config').filter({ pk = self.rr.db('rethinkdb').table('table_config').filter({
'db': self._r.dbname, 'name': self.table}).get_field( 'db': self.rr.dbname, 'name': self.table}).get_field(
'primary_key')[0].run() 'primary_key')[0].run()
self._pk = pk self._pk = pk
except Exception as e: except Exception as e:
raise Exception( raise Exception(
'problem determining primary key for table %s.%s: %s', 'problem determining primary key for table %s.%s: %s',
self._r.dbname, self.table, e) self.rr.dbname, self.table, e)
return self._pk return self._pk
@property @property
@ -270,7 +270,7 @@ class Document(dict, object):
# r.literal() to replace, not merge with, nested fields # r.literal() to replace, not merge with, nested fields
updates = {field: r.literal(self._updates[field]) updates = {field: r.literal(self._updates[field])
for field in self._updates} for field in self._updates}
query = self._r.table(self.table).get( query = self.rr.table(self.table).get(
self.pk_value).update(updates) self.pk_value).update(updates)
result = query.run() result = query.run()
if result['skipped']: # primary key not found if result['skipped']: # primary key not found
@ -280,7 +280,7 @@ class Document(dict, object):
'unexpected result %s from rethinkdb query %s' % ( 'unexpected result %s from rethinkdb query %s' % (
result, query)) result, query))
if not should_insert and self._deletes: if not should_insert and self._deletes:
query = self._r.table(self.table).replace( query = self.rr.table(self.table).replace(
r.row.without(self._deletes)) r.row.without(self._deletes))
result = query.run() result = query.run()
if result['errors']: # primary key not found if result['errors']: # primary key not found
@ -293,7 +293,7 @@ class Document(dict, object):
should_insert = True should_insert = True
if should_insert: if should_insert:
query = self._r.table(self.table).insert(self) query = self.rr.table(self.table).insert(self)
result = query.run() result = query.run()
if result['inserted'] != 1: if result['inserted'] != 1:
raise Exception( raise Exception(
@ -309,7 +309,7 @@ class Document(dict, object):
''' '''
Refresh the document from the database. Refresh the document from the database.
''' '''
d = self._r.table(self.table).get(self.pk_value).run() d = self.rr.table(self.table).get(self.pk_value).run()
if d is None: if d is None:
raise KeyError raise KeyError
for k in d: for k in d:

View File

@ -24,16 +24,16 @@ import types
class RethinkerWrapper(object): class RethinkerWrapper(object):
logger = logging.getLogger('doublethink.RethinkerWrapper') logger = logging.getLogger('doublethink.RethinkerWrapper')
def __init__(self, rethinker, wrapped): def __init__(self, rr, wrapped):
self.rethinker = rethinker self.rr = rr
self.wrapped = wrapped self.wrapped = wrapped
def __getattr__(self, name): def __getattr__(self, name):
delegate = getattr(self.wrapped, name) delegate = getattr(self.wrapped, name)
return self.rethinker.wrap(delegate) return self.rr.wrap(delegate)
def __getitem__(self, key): def __getitem__(self, key):
return self.rethinker.wrap(self.wrapped.__getitem__)(key) return self.rr.wrap(self.wrapped.__getitem__)(key)
def __repr__(self): def __repr__(self):
return '<RethinkerWrapper{}>'.format(repr(self.wrapped)) return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
@ -41,10 +41,10 @@ class RethinkerWrapper(object):
def run(self, db=None): def run(self, db=None):
self.wrapped.run # raise AttributeError early self.wrapped.run # raise AttributeError early
while True: while True:
conn = self.rethinker._random_server_connection() conn = self.rr._random_server_connection()
is_iter = False is_iter = False
try: try:
result = self.wrapped.run(conn, db=db or self.rethinker.dbname) result = self.wrapped.run(conn, db=db or self.rr.dbname)
if hasattr(result, '__next__'): if hasattr(result, '__next__'):
is_iter = True is_iter = True
def gen(): def gen():
@ -72,8 +72,8 @@ class RethinkerWrapper(object):
class Rethinker(object): class Rethinker(object):
''' '''
>>> r = Rethinker(db='my_db') >>> rr = Rethinker(db='my_db')
>>> doc = r.table('my_table').get(1).run() >>> doc = rr.table('my_table').get(1).run()
''' '''
logger = logging.getLogger('doublethink.Rethinker') logger = logging.getLogger('doublethink.Rethinker')

View File

@ -39,20 +39,20 @@ class ServiceRegistry(object):
logger = logging.getLogger('doublethink.ServiceRegistry') logger = logging.getLogger('doublethink.ServiceRegistry')
def __init__(self, rethinker): def __init__(self, rr):
self.r = rethinker self.rr = rr
self._ensure_table() self._ensure_table()
def _ensure_table(self): def _ensure_table(self):
dbs = self.r.db_list().run() dbs = self.rr.db_list().run()
if not self.r.dbname in dbs: if not self.rr.dbname in dbs:
self.logger.info('creating rethinkdb database %s', repr(self.r.dbname)) self.logger.info('creating rethinkdb database %s', repr(self.rr.dbname))
self.r.db_create(self.r.dbname).run() self.rr.db_create(self.rr.dbname).run()
tables = self.r.table_list().run() tables = self.rr.table_list().run()
if not 'services' in tables: if not 'services' in tables:
self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname)) self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.rr.dbname))
self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run() self.rr.table_create('services', shards=1, replicas=min(3, len(self.rr.servers))).run()
# self.r.table('sites').index_create...? # self.rr.table('sites').index_create...?
def heartbeat(self, status_info): def heartbeat(self, status_info):
''' '''
@ -68,7 +68,7 @@ class ServiceRegistry(object):
if not 'pid' in updated_status_info: if not 'pid' in updated_status_info:
updated_status_info['pid'] = os.getpid() updated_status_info['pid'] = os.getpid()
try: try:
result = self.r.table('services').insert( result = self.rr.table('services').insert(
updated_status_info, conflict='replace', updated_status_info, conflict='replace',
return_changes=True).run() return_changes=True).run()
return result['changes'][0]['new_val'] # XXX check return result['changes'][0]['new_val'] # XXX check
@ -77,13 +77,13 @@ class ServiceRegistry(object):
return status_info return status_info
def unregister(self, id): def unregister(self, id):
result = self.r.table('services').get(id).delete().run() result = self.rr.table('services').get(id).delete().run()
if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}: if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result) self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
def available_service(self, role): def available_service(self, role):
try: try:
result = self.r.table('services').filter({"role":role}).filter( result = self.rr.table('services').filter({"role":role}).filter(
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
).order_by("load")[0].run() ).order_by("load")[0].run()
return result return result
@ -92,7 +92,7 @@ class ServiceRegistry(object):
def available_services(self, role=None): def available_services(self, role=None):
try: try:
query = self.r.table('services') query = self.rr.table('services')
if role: if role:
query = query.filter({"role":role}) query = query.filter({"role":role})
query = query.filter( query = query.filter(

View File

@ -3,7 +3,7 @@ import codecs
setuptools.setup( setuptools.setup(
name='doublethink', name='doublethink',
version='0.2.0.dev65', version='0.2.0.dev66',
packages=['doublethink'], packages=['doublethink'],
classifiers=[ classifiers=[
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',

View File

@ -22,7 +22,7 @@ import sys
import types import types
import gc import gc
import pytest import pytest
import rethinkdb import rethinkdb as r
import time import time
import socket import socket
import os import os
@ -41,82 +41,82 @@ class RethinkerForTesting(doublethink.Rethinker):
return self.last_conn return self.last_conn
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def r(): def rr():
r = RethinkerForTesting() rr = RethinkerForTesting()
try: try:
r.db_drop("doublethink_test_db").run() rr.db_drop("doublethink_test_db").run()
except rethinkdb.errors.ReqlOpFailedError: except r.errors.ReqlOpFailedError:
pass pass
result = r.db_create("doublethink_test_db").run() result = rr.db_create("doublethink_test_db").run()
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
assert result["dbs_created"] == 1 assert result["dbs_created"] == 1
return RethinkerForTesting(db="doublethink_test_db") return RethinkerForTesting(db="doublethink_test_db")
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def my_table(r): def my_table(rr):
assert r.table_list().run() == [] assert rr.table_list().run() == []
result = r.table_create("my_table").run() result = rr.table_create("my_table").run()
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
assert result["tables_created"] == 1 assert result["tables_created"] == 1
def test_rethinker(r, my_table): def test_rethinker(rr, my_table):
assert r.table("my_table").index_create("foo").run() == {"created": 1} assert rr.table("my_table").index_create("foo").run() == {"created": 1}
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
result = r.table("my_table").insert(({"foo":i,"bar":"repeat"*i} for i in range(2000))).run() result = rr.table("my_table").insert(({"foo":i,"bar":"repeat"*i} for i in range(2000))).run()
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
assert len(result["generated_keys"]) == 2000 assert len(result["generated_keys"]) == 2000
assert result["inserted"] == 2000 assert result["inserted"] == 2000
result = r.table("my_table").run() result = rr.table("my_table").run()
assert r.last_conn.is_open() # should still be open this time assert rr.last_conn.is_open() # should still be open this time
assert isinstance(result, types.GeneratorType) assert isinstance(result, types.GeneratorType)
n = 0 n = 0
for x in result: for x in result:
n += 1 n += 1
pass pass
# connection should be closed after finished iterating over results # connection should be closed after finished iterating over results
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
assert n == 2000 assert n == 2000
result = r.table("my_table").run() result = rr.table("my_table").run()
assert r.last_conn.is_open() # should still be open this time assert rr.last_conn.is_open() # should still be open this time
assert isinstance(result, types.GeneratorType) assert isinstance(result, types.GeneratorType)
next(result) next(result)
result = None result = None
gc.collect() gc.collect()
# connection should be closed after result is garbage-collected # connection should be closed after result is garbage-collected
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
result = r.table("my_table").run() result = rr.table("my_table").run()
assert r.last_conn.is_open() # should still be open this time assert rr.last_conn.is_open() # should still be open this time
assert isinstance(result, types.GeneratorType) assert isinstance(result, types.GeneratorType)
result = None result = None
gc.collect() gc.collect()
# connection should be closed after result is garbage-collected # connection should be closed after result is garbage-collected
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
def test_too_many_errors(r): def test_too_many_errors(rr):
with pytest.raises(rethinkdb.errors.ReqlOpFailedError): with pytest.raises(r.errors.ReqlOpFailedError):
r.table_create("too_many_replicas", replicas=99).run() rr.table_create("too_many_replicas", replicas=99).run()
with pytest.raises(rethinkdb.errors.ReqlOpFailedError): with pytest.raises(r.errors.ReqlOpFailedError):
r.table_create("too_many_shards", shards=99).run() rr.table_create("too_many_shards", shards=99).run()
def test_slice(r, my_table): def test_slice(rr, my_table):
"""Tests RethinkerWrapper.__getitem__()""" """Tests RethinkerWrapper.__getitem__()"""
result = r.table("my_table")[5:10].run() result = rr.table("my_table")[5:10].run()
assert r.last_conn.is_open() # should still be open this time assert rr.last_conn.is_open() # should still be open this time
assert isinstance(result, types.GeneratorType) assert isinstance(result, types.GeneratorType)
n = 0 n = 0
for x in result: for x in result:
n += 1 n += 1
pass pass
# connection should be closed after finished iterating over results # connection should be closed after finished iterating over results
assert not r.last_conn.is_open() assert not rr.last_conn.is_open()
assert n == 5 assert n == 5
def test_service_registry(r): def test_service_registry(rr):
svcreg = doublethink.ServiceRegistry(r) svcreg = doublethink.ServiceRegistry(rr)
assert svcreg.available_service("yes-such-role") == None assert svcreg.available_service("yes-such-role") == None
assert svcreg.available_services("yes-such-role") == [] assert svcreg.available_services("yes-such-role") == []
assert svcreg.available_services() == [] assert svcreg.available_services() == []
@ -235,19 +235,19 @@ def test_service_registry(r):
assert len(svcreg.available_services("yes-such-role")) == 2 assert len(svcreg.available_services("yes-such-role")) == 2
assert len(svcreg.available_services()) == 4 assert len(svcreg.available_services()) == 4
def test_svcreg_heartbeat_server_down(r): def test_svcreg_heartbeat_server_down(rr):
class MockRethinker: class MockRethinker:
def table(self, *args, **kwargs): def table(self, *args, **kwargs):
raise Exception('catch me if you can') raise Exception('catch me if you can')
class SortOfFakeServiceRegistry(doublethink.ServiceRegistry): class SortOfFakeServiceRegistry(doublethink.ServiceRegistry):
def __init__(self, rethinker): def __init__(self, rethinker):
self.r = rethinker self.rr = rethinker
# self._ensure_table() # not doing this here # self._ensure_table() # not doing this here
# no such rethinkdb server # no such rethinkdb server
r = MockRethinker() rr = MockRethinker()
svcreg = SortOfFakeServiceRegistry(r) svcreg = SortOfFakeServiceRegistry(rr)
svc0 = { svc0 = {
"role": "role-foo", "role": "role-foo",
"load": 100.0, "load": 100.0,
@ -279,19 +279,19 @@ def test_utcnow():
## XXX what else can we test without jumping through hoops? ## XXX what else can we test without jumping through hoops?
def test_orm(r): def test_orm(rr):
class SomeDoc(doublethink.Document): class SomeDoc(doublethink.Document):
table = 'some_doc' table = 'some_doc'
SomeDoc.table_create(r) SomeDoc.table_create(rr)
with pytest.raises(Exception): with pytest.raises(Exception):
SomeDoc.table_create(r) SomeDoc.table_create(rr)
# test that overriding Document.table works # test that overriding Document.table works
assert 'some_doc' in r.table_list().run() assert 'some_doc' in rr.table_list().run()
assert not 'somedoc' in r.table_list().run() assert not 'somedoc' in rr.table_list().run()
d = SomeDoc(rethinker=r, d={ d = SomeDoc(rr, d={
'a': 'b', 'a': 'b',
'c': {'d': 'e'}, 'c': {'d': 'e'},
'f': ['g', 'h'], 'f': ['g', 'h'],
@ -410,7 +410,7 @@ def test_orm(r):
assert d._updates == {} assert d._updates == {}
assert d._deletes == set() assert d._deletes == set()
d_copy = SomeDoc.load(r, d.id) d_copy = SomeDoc.load(rr, d.id)
assert d == d_copy assert d == d_copy
d['zuh'] = 'toot' d['zuh'] = 'toot'
d.save() d.save()
@ -450,7 +450,7 @@ def test_orm(r):
assert d._updates == {} assert d._updates == {}
assert d._deletes == set() assert d._deletes == set()
d_copy = SomeDoc.load(r, d.id) d_copy = SomeDoc.load(rr, d.id)
assert d == d_copy assert d == d_copy
d['yuh'] = 'soot' d['yuh'] = 'soot'
d.save() d.save()
@ -458,19 +458,19 @@ def test_orm(r):
d_copy.refresh() d_copy.refresh()
assert d == d_copy assert d == d_copy
def test_orm_pk(r): def test_orm_pk(rr):
class NonstandardPrimaryKey(doublethink.Document): class NonstandardPrimaryKey(doublethink.Document):
@classmethod @classmethod
def table_create(cls, rethinker): def table_create(cls, rethinker):
rethinker.table_create(cls.table, primary_key='not_id').run() rethinker.table_create(cls.table, primary_key='not_id').run()
with pytest.raises(Exception): with pytest.raises(Exception):
NonstandardPrimaryKey.load(r, 'no_such_thing') NonstandardPrimaryKey.load(rr, 'no_such_thing')
NonstandardPrimaryKey.table_create(r) NonstandardPrimaryKey.table_create(rr)
# new empty doc # new empty doc
f = NonstandardPrimaryKey(r, {}) f = NonstandardPrimaryKey(rr, {})
f.save() f.save()
assert f.pk_value assert f.pk_value
assert 'not_id' in f assert 'not_id' in f
@ -478,19 +478,19 @@ def test_orm_pk(r):
assert len(f.keys()) == 1 assert len(f.keys()) == 1
with pytest.raises(KeyError): with pytest.raises(KeyError):
NonstandardPrimaryKey.load(r, 'no_such_thing') NonstandardPrimaryKey.load(rr, 'no_such_thing')
# new doc with (only) primary key # new doc with (only) primary key
d = NonstandardPrimaryKey(r, {'not_id': 1}) d = NonstandardPrimaryKey(rr, {'not_id': 1})
assert d.not_id == 1 assert d.not_id == 1
assert d.pk_value == 1 assert d.pk_value == 1
d.save() d.save()
d_copy = NonstandardPrimaryKey.load(r, 1) d_copy = NonstandardPrimaryKey.load(rr, 1)
assert d == d_copy assert d == d_copy
# new doc with something in it # new doc with something in it
e = NonstandardPrimaryKey(r, {'some_field': 'something'}) e = NonstandardPrimaryKey(rr, {'some_field': 'something'})
with pytest.raises(KeyError): with pytest.raises(KeyError):
e['not_id'] e['not_id']
assert e.not_id is None assert e.not_id is None
@ -498,7 +498,7 @@ def test_orm_pk(r):
e.save() e.save()
assert e.not_id assert e.not_id
e_copy = NonstandardPrimaryKey.load(r, e.not_id) e_copy = NonstandardPrimaryKey.load(rr, e.not_id)
assert e == e_copy assert e == e_copy
e_copy['blah'] = 'toot' e_copy['blah'] = 'toot'
e_copy.save() e_copy.save()