mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
make service registry table name configurable
This commit is contained in:
parent
c02c4b7d2c
commit
5cbfe18f9e
@ -57,7 +57,7 @@ def parse_rethinkdb_url(s):
|
|||||||
`table` and `database` may be None
|
`table` and `database` may be None
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ValueError if url cannot be pasrsed a a rethinkdb url
|
ValueError if url cannot be parsed as a rethinkdb url
|
||||||
|
|
||||||
There is some precedent for this kind of url (though only with a single
|
There is some precedent for this kind of url (though only with a single
|
||||||
host):
|
host):
|
||||||
|
@ -24,8 +24,7 @@ import doublethink
|
|||||||
|
|
||||||
class ServiceRegistry(object):
|
class ServiceRegistry(object):
|
||||||
'''
|
'''
|
||||||
Simple service registry which stores service information in the rethinkdb
|
Simple rethinkdb service registry.
|
||||||
table 'services'.
|
|
||||||
|
|
||||||
Services are responsible for keeping their status information up to date
|
Services are responsible for keeping their status information up to date
|
||||||
by calling `heartbeat(status_info)` periodically.
|
by calling `heartbeat(status_info)` periodically.
|
||||||
@ -82,7 +81,7 @@ class ServiceRegistry(object):
|
|||||||
'''
|
'''
|
||||||
logger = logging.getLogger('doublethink.ServiceRegistry')
|
logger = logging.getLogger('doublethink.ServiceRegistry')
|
||||||
|
|
||||||
def __init__(self, rr):
|
def __init__(self, rr, table='services'):
|
||||||
'''
|
'''
|
||||||
Initialize the service registry.
|
Initialize the service registry.
|
||||||
|
|
||||||
@ -93,6 +92,7 @@ class ServiceRegistry(object):
|
|||||||
have `dbname` set
|
have `dbname` set
|
||||||
'''
|
'''
|
||||||
self.rr = rr
|
self.rr = rr
|
||||||
|
self.table = table
|
||||||
self._ensure_table()
|
self._ensure_table()
|
||||||
|
|
||||||
def _ensure_table(self):
|
def _ensure_table(self):
|
||||||
@ -103,14 +103,14 @@ class ServiceRegistry(object):
|
|||||||
'creating rethinkdb database %s', repr(self.rr.dbname))
|
'creating rethinkdb database %s', repr(self.rr.dbname))
|
||||||
self.rr.db_create(self.rr.dbname).run()
|
self.rr.db_create(self.rr.dbname).run()
|
||||||
tables = self.rr.table_list().run()
|
tables = self.rr.table_list().run()
|
||||||
if not 'services' in tables:
|
if not self.table in tables:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"creating rethinkdb table 'services' in database %s",
|
"creating rethinkdb table %r in database %r",
|
||||||
repr(self.rr.dbname))
|
self.table, self.rr.dbname)
|
||||||
self.rr.table_create(
|
self.rr.table_create(
|
||||||
'services', shards=1,
|
self.table, shards=1,
|
||||||
replicas=min(3, len(self.rr.servers))).run()
|
replicas=min(3, len(self.rr.servers))).run()
|
||||||
self.rr.table('services').index_create('role').run()
|
self.rr.table(self.table).index_create('role').run()
|
||||||
|
|
||||||
def heartbeat(self, status_info):
|
def heartbeat(self, status_info):
|
||||||
'''
|
'''
|
||||||
@ -152,7 +152,7 @@ class ServiceRegistry(object):
|
|||||||
if not 'pid' in updated_status_info:
|
if not 'pid' in updated_status_info:
|
||||||
updated_status_info['pid'] = os.getpid()
|
updated_status_info['pid'] = os.getpid()
|
||||||
try:
|
try:
|
||||||
result = self.rr.table('services').insert(
|
result = self.rr.table(self.table).insert(
|
||||||
updated_status_info, conflict='replace',
|
updated_status_info, conflict='replace',
|
||||||
return_changes=True).run()
|
return_changes=True).run()
|
||||||
return result['changes'][0]['new_val'] # XXX check
|
return result['changes'][0]['new_val'] # XXX check
|
||||||
@ -162,9 +162,9 @@ class ServiceRegistry(object):
|
|||||||
|
|
||||||
def unregister(self, id):
|
def unregister(self, id):
|
||||||
'''
|
'''
|
||||||
Remove the service with id `id` from the 'services' table.
|
Remove the service with id `id` from the service registry.
|
||||||
'''
|
'''
|
||||||
result = self.rr.table('services').get(id).delete().run()
|
result = self.rr.table(self.table).get(id).delete().run()
|
||||||
if result != {
|
if result != {
|
||||||
'deleted':1, 'errors':0,'inserted':0,
|
'deleted':1, 'errors':0,'inserted':0,
|
||||||
'replaced':0,'skipped':0,'unchanged':0}:
|
'replaced':0,'skipped':0,'unchanged':0}:
|
||||||
@ -237,7 +237,7 @@ class ServiceRegistry(object):
|
|||||||
candidate['pid'] = os.getpid()
|
candidate['pid'] = os.getpid()
|
||||||
|
|
||||||
result = self.rr.table(
|
result = self.rr.table(
|
||||||
'services', read_mode='majority').get(role).replace(
|
self.table, read_mode='majority').get(role).replace(
|
||||||
lambda row: r.branch(
|
lambda row: r.branch(
|
||||||
r.branch(
|
r.branch(
|
||||||
row,
|
row,
|
||||||
@ -250,10 +250,10 @@ class ServiceRegistry(object):
|
|||||||
if k not in ('first_heartbeat', 'last_heartbeat')]):
|
if k not in ('first_heartbeat', 'last_heartbeat')]):
|
||||||
# candidate is the unique_service, send a heartbeat
|
# candidate is the unique_service, send a heartbeat
|
||||||
del candidate['first_heartbeat'] # don't touch first_heartbeat
|
del candidate['first_heartbeat'] # don't touch first_heartbeat
|
||||||
self.rr.table('services').get(role).update(candidate).run()
|
self.rr.table(self.table).get(role).update(candidate).run()
|
||||||
|
|
||||||
results = list(self.rr.table(
|
results = list(self.rr.table(
|
||||||
'services', read_mode='majority').get_all(role).filter(
|
self.table, read_mode='majority').get_all(role).filter(
|
||||||
lambda row: row['last_heartbeat'] > now - row['ttl']).run())
|
lambda row: row['last_heartbeat'] > now - row['ttl']).run())
|
||||||
if results:
|
if results:
|
||||||
return results[0]
|
return results[0]
|
||||||
@ -275,7 +275,7 @@ class ServiceRegistry(object):
|
|||||||
value of 'load'
|
value of 'load'
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
result = self.rr.table('services').get_all(role, index='role').filter(
|
result = self.rr.table(self.table).get_all(role, index='role').filter(
|
||||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"]
|
lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"]
|
||||||
).order_by("load")[0].run()
|
).order_by("load")[0].run()
|
||||||
return result
|
return result
|
||||||
@ -298,7 +298,7 @@ class ServiceRegistry(object):
|
|||||||
return an empty list.
|
return an empty list.
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
query = self.rr.table('services')
|
query = self.rr.table(self.table)
|
||||||
if role:
|
if role:
|
||||||
query = query.get_all(role, index='role')
|
query = query.get_all(role, index='role')
|
||||||
query = query.filter(
|
query = query.filter(
|
||||||
@ -313,7 +313,7 @@ class ServiceRegistry(object):
|
|||||||
available_services = healthy_services
|
available_services = healthy_services
|
||||||
|
|
||||||
def purge_stale_services(self, ttls_until_deletion=2):
|
def purge_stale_services(self, ttls_until_deletion=2):
|
||||||
query = self.rr.table('services').filter(
|
query = self.rr.table(self.table).filter(
|
||||||
lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion)
|
lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion)
|
||||||
).delete()
|
).delete()
|
||||||
logging.debug("Running query: %s", query)
|
logging.debug("Running query: %s", query)
|
||||||
|
2
setup.py
2
setup.py
@ -3,7 +3,7 @@ import codecs
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='doublethink',
|
name='doublethink',
|
||||||
version='0.2.0.dev86',
|
version='0.2.0.dev87',
|
||||||
packages=['doublethink'],
|
packages=['doublethink'],
|
||||||
classifiers=[
|
classifiers=[
|
||||||
'Programming Language :: Python :: 2.7',
|
'Programming Language :: Python :: 2.7',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user