diff --git a/doublethink/__init__.py b/doublethink/__init__.py index cfb220c..2c1ad55 100644 --- a/doublethink/__init__.py +++ b/doublethink/__init__.py @@ -57,7 +57,7 @@ def parse_rethinkdb_url(s): `table` and `database` may be None Raises: - ValueError if url cannot be pasrsed a a rethinkdb url + ValueError if url cannot be parsed as a rethinkdb url There is some precedent for this kind of url (though only with a single host): diff --git a/doublethink/services.py b/doublethink/services.py index 2289190..50bb370 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -24,8 +24,7 @@ import doublethink class ServiceRegistry(object): ''' - Simple service registry which stores service information in the rethinkdb - table 'services'. + Simple rethinkdb service registry. Services are responsible for keeping their status information up to date by calling `heartbeat(status_info)` periodically. @@ -82,7 +81,7 @@ class ServiceRegistry(object): ''' logger = logging.getLogger('doublethink.ServiceRegistry') - def __init__(self, rr): + def __init__(self, rr, table='services'): ''' Initialize the service registry. @@ -93,6 +92,7 @@ class ServiceRegistry(object): have `dbname` set ''' self.rr = rr + self.table = table self._ensure_table() def _ensure_table(self): @@ -103,14 +103,14 @@ class ServiceRegistry(object): 'creating rethinkdb database %s', repr(self.rr.dbname)) self.rr.db_create(self.rr.dbname).run() tables = self.rr.table_list().run() - if not 'services' in tables: + if not self.table in tables: self.logger.info( - "creating rethinkdb table 'services' in database %s", - repr(self.rr.dbname)) + "creating rethinkdb table %r in database %r", + self.table, self.rr.dbname) self.rr.table_create( - 'services', shards=1, + self.table, shards=1, replicas=min(3, len(self.rr.servers))).run() - self.rr.table('services').index_create('role').run() + self.rr.table(self.table).index_create('role').run() def heartbeat(self, status_info): ''' @@ -152,7 +152,7 @@ class ServiceRegistry(object): if not 'pid' in updated_status_info: updated_status_info['pid'] = os.getpid() try: - result = self.rr.table('services').insert( + result = self.rr.table(self.table).insert( updated_status_info, conflict='replace', return_changes=True).run() return result['changes'][0]['new_val'] # XXX check @@ -162,9 +162,9 @@ class ServiceRegistry(object): def unregister(self, id): ''' - Remove the service with id `id` from the 'services' table. + Remove the service with id `id` from the service registry. ''' - result = self.rr.table('services').get(id).delete().run() + result = self.rr.table(self.table).get(id).delete().run() if result != { 'deleted':1, 'errors':0,'inserted':0, 'replaced':0,'skipped':0,'unchanged':0}: @@ -237,7 +237,7 @@ class ServiceRegistry(object): candidate['pid'] = os.getpid() result = self.rr.table( - 'services', read_mode='majority').get(role).replace( + self.table, read_mode='majority').get(role).replace( lambda row: r.branch( r.branch( row, @@ -250,10 +250,10 @@ class ServiceRegistry(object): if k not in ('first_heartbeat', 'last_heartbeat')]): # candidate is the unique_service, send a heartbeat del candidate['first_heartbeat'] # don't touch first_heartbeat - self.rr.table('services').get(role).update(candidate).run() + self.rr.table(self.table).get(role).update(candidate).run() results = list(self.rr.table( - 'services', read_mode='majority').get_all(role).filter( + self.table, read_mode='majority').get_all(role).filter( lambda row: row['last_heartbeat'] > now - row['ttl']).run()) if results: return results[0] @@ -275,7 +275,7 @@ class ServiceRegistry(object): value of 'load' ''' try: - result = self.rr.table('services').get_all(role, index='role').filter( + result = self.rr.table(self.table).get_all(role, index='role').filter( lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] ).order_by("load")[0].run() return result @@ -298,7 +298,7 @@ class ServiceRegistry(object): return an empty list. ''' try: - query = self.rr.table('services') + query = self.rr.table(self.table) if role: query = query.get_all(role, index='role') query = query.filter( @@ -313,7 +313,7 @@ class ServiceRegistry(object): available_services = healthy_services def purge_stale_services(self, ttls_until_deletion=2): - query = self.rr.table('services').filter( + query = self.rr.table(self.table).filter( lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion) ).delete() logging.debug("Running query: %s", query) diff --git a/setup.py b/setup.py index 4c80ddc..4a14464 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ import codecs setuptools.setup( name='doublethink', - version='0.2.0.dev86', + version='0.2.0.dev87', packages=['doublethink'], classifiers=[ 'Programming Language :: Python :: 2.7',