2017-02-21 16:27:49 -08:00
|
|
|
'''
|
2017-02-28 16:23:59 -08:00
|
|
|
doublethink/services.py - rethinkdb service registry
|
2017-02-21 16:27:49 -08:00
|
|
|
|
2023-05-18 17:16:04 -07:00
|
|
|
Copyright (C) 2015-2023 Internet Archive
|
2017-02-21 16:27:49 -08:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
'''
|
|
|
|
|
2023-05-18 17:16:04 -07:00
|
|
|
import rethinkdb as rdb
|
2017-02-21 16:27:49 -08:00
|
|
|
import logging
|
|
|
|
import socket
|
|
|
|
import os
|
2017-05-17 12:11:26 -07:00
|
|
|
import doublethink
|
2017-02-21 16:27:49 -08:00
|
|
|
|
2023-05-18 17:16:04 -07:00
|
|
|
r = rdb.RethinkDB()
|
|
|
|
|
2017-02-21 16:27:49 -08:00
|
|
|
class ServiceRegistry(object):
|
|
|
|
'''
|
2017-10-10 11:05:32 -07:00
|
|
|
Simple rethinkdb service registry.
|
2017-04-28 15:23:55 -07:00
|
|
|
|
|
|
|
Services are responsible for keeping their status information up to date
|
|
|
|
by calling `heartbeat(status_info)` periodically.
|
|
|
|
|
|
|
|
`status_info` is a dict and must have at least the fields 'role', 'load',
|
2017-05-17 12:28:48 -07:00
|
|
|
and 'ttl'. Certain other fields are populated automatically as in the
|
|
|
|
example below. In addition, services may set arbitrary other fields.
|
2017-04-28 15:23:55 -07:00
|
|
|
|
|
|
|
Some information about required fields:
|
|
|
|
|
|
|
|
'role': The role of the service. `healthy_service()` and
|
|
|
|
`healthy_services()` look up services using this field.
|
2017-05-17 12:28:48 -07:00
|
|
|
'ttl': If a service's last heartbeat was more than 'ttl' seconds ago, it
|
|
|
|
is considered to be "down". `healthy_services()` and
|
|
|
|
`healthy_service()` never return entries for services that are
|
|
|
|
considered "down". A sensible convention is to heartbeat 3 times per
|
|
|
|
'ttl', that is, every `ttl/3` seconds.
|
2017-04-28 15:23:55 -07:00
|
|
|
'load': An arbitrary numeric value. It is up to each service to populate
|
|
|
|
this field in a way that makes sense to the particular service.
|
|
|
|
`healthy_service(role)` returns the service with the lowest load
|
|
|
|
for the supplied role. Thus load values need to be comparable to
|
|
|
|
within the context of a single service, but comparing loads of
|
2017-05-17 12:28:48 -07:00
|
|
|
services of different roles might not make any sense.
|
2017-04-28 15:23:55 -07:00
|
|
|
|
|
|
|
About the 'id' field:
|
|
|
|
|
|
|
|
The only way that the service registry uniquely identifies a particular
|
|
|
|
instance of a service is using the 'id' field.
|
|
|
|
|
|
|
|
Services can supply their own 'id', or let rethinkdb generate a random
|
|
|
|
one.
|
|
|
|
|
|
|
|
If a service provides its own 'id', it should make it something
|
|
|
|
predictable and unique to each instance of the service. For example
|
|
|
|
`'%s:%s:%s' % (role, host, port)` might work for some services.
|
|
|
|
|
|
|
|
If, on the other hand, a server lets rethinkdb generate 'id', it will
|
|
|
|
need to remember the result returned by calls to `heartbeat()` and
|
|
|
|
supply the `id` value from there with every subsequent heartbeat.
|
|
|
|
|
|
|
|
Example service registry entry, with notes:
|
|
|
|
|
|
|
|
{
|
|
|
|
'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied
|
|
|
|
'role': 'brozzler-worker',
|
|
|
|
'load': 0.5, # load score
|
2017-05-17 12:28:48 -07:00
|
|
|
'ttl': 60.0,
|
2017-04-28 15:23:55 -07:00
|
|
|
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
|
|
|
|
'pid': 1234, # set in svcreg.heartbeat() as a fallback
|
|
|
|
'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
|
|
|
|
'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat()
|
|
|
|
# ... plus anything else you want...
|
|
|
|
}
|
2017-02-21 16:27:49 -08:00
|
|
|
'''
|
2017-02-28 16:23:59 -08:00
|
|
|
logger = logging.getLogger('doublethink.ServiceRegistry')
|
2017-02-21 16:27:49 -08:00
|
|
|
|
2017-10-10 11:05:32 -07:00
|
|
|
def __init__(self, rr, table='services'):
|
2017-04-28 15:23:55 -07:00
|
|
|
'''
|
|
|
|
Initialize the service registry.
|
|
|
|
|
|
|
|
Creates the database table if it does not exist.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
rr (doublethink.Rethinker): a doublethink.Rethinker, which must
|
|
|
|
have `dbname` set
|
|
|
|
'''
|
2017-03-01 11:20:27 -08:00
|
|
|
self.rr = rr
|
2017-10-10 11:05:32 -07:00
|
|
|
self.table = table
|
2017-02-21 16:27:49 -08:00
|
|
|
self._ensure_table()
|
|
|
|
|
|
|
|
def _ensure_table(self):
|
2017-03-01 11:20:27 -08:00
|
|
|
dbs = self.rr.db_list().run()
|
2017-04-28 15:23:55 -07:00
|
|
|
assert self.rr.dbname
|
2017-03-01 11:20:27 -08:00
|
|
|
if not self.rr.dbname in dbs:
|
2017-04-28 15:23:55 -07:00
|
|
|
self.logger.info(
|
|
|
|
'creating rethinkdb database %s', repr(self.rr.dbname))
|
2017-03-01 11:20:27 -08:00
|
|
|
self.rr.db_create(self.rr.dbname).run()
|
|
|
|
tables = self.rr.table_list().run()
|
2017-10-10 11:05:32 -07:00
|
|
|
if not self.table in tables:
|
2017-04-28 15:23:55 -07:00
|
|
|
self.logger.info(
|
2017-10-10 11:05:32 -07:00
|
|
|
"creating rethinkdb table %r in database %r",
|
|
|
|
self.table, self.rr.dbname)
|
2017-04-28 15:23:55 -07:00
|
|
|
self.rr.table_create(
|
2017-10-10 11:05:32 -07:00
|
|
|
self.table, shards=1,
|
2017-04-28 15:23:55 -07:00
|
|
|
replicas=min(3, len(self.rr.servers))).run()
|
2017-10-10 11:05:32 -07:00
|
|
|
self.rr.table(self.table).index_create('role').run()
|
2017-02-21 16:27:49 -08:00
|
|
|
|
|
|
|
def heartbeat(self, status_info):
|
|
|
|
'''
|
2017-04-28 15:23:55 -07:00
|
|
|
Update service status, indicating "up"-ness.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
status_info (dict): a dictionary representing the status of the
|
|
|
|
service
|
|
|
|
|
|
|
|
`status_info` must have at least the fields 'role', 'load', and
|
2017-05-17 12:28:48 -07:00
|
|
|
'ttl'. Some additional fields are populated automatically by this
|
|
|
|
method. If the field 'id' is absent, it will be generated by rethinkdb.
|
2017-04-28 15:23:55 -07:00
|
|
|
|
|
|
|
See the ServiceRegistry class-level documentation for more information
|
|
|
|
about the various fields.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
On success, returns the modified status info dict. On failure
|
|
|
|
communicating with rethinkdb, returns `status_info` unmodified.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
Exception: if `status_info` is missing a required field, or a
|
2017-05-17 12:28:48 -07:00
|
|
|
`status_info['ttl']` is not a number greater than zero
|
2017-02-21 16:27:49 -08:00
|
|
|
'''
|
2017-05-16 14:31:32 -07:00
|
|
|
for field in 'role', 'ttl', 'load':
|
2017-04-27 14:34:07 -07:00
|
|
|
if not field in status_info:
|
|
|
|
raise Exception(
|
2017-04-28 15:23:55 -07:00
|
|
|
'status_info is missing required field %s',
|
|
|
|
repr(field))
|
2017-05-16 14:31:32 -07:00
|
|
|
val = status_info['ttl']
|
2017-04-27 14:34:07 -07:00
|
|
|
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
|
2017-05-16 14:31:32 -07:00
|
|
|
raise Exception('ttl must be a number > 0')
|
2017-02-21 16:27:49 -08:00
|
|
|
updated_status_info = dict(status_info)
|
|
|
|
updated_status_info['last_heartbeat'] = r.now()
|
|
|
|
if not 'first_heartbeat' in updated_status_info:
|
|
|
|
updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat']
|
|
|
|
if not 'host' in updated_status_info:
|
|
|
|
updated_status_info['host'] = socket.gethostname()
|
|
|
|
if not 'pid' in updated_status_info:
|
|
|
|
updated_status_info['pid'] = os.getpid()
|
|
|
|
try:
|
2017-10-10 11:05:32 -07:00
|
|
|
result = self.rr.table(self.table).insert(
|
2017-02-21 16:27:49 -08:00
|
|
|
updated_status_info, conflict='replace',
|
|
|
|
return_changes=True).run()
|
|
|
|
return result['changes'][0]['new_val'] # XXX check
|
|
|
|
except:
|
|
|
|
self.logger.error('error updating service registry', exc_info=True)
|
|
|
|
return status_info
|
|
|
|
|
|
|
|
def unregister(self, id):
|
2017-04-28 15:23:55 -07:00
|
|
|
'''
|
2017-10-10 11:05:32 -07:00
|
|
|
Remove the service with id `id` from the service registry.
|
2017-04-28 15:23:55 -07:00
|
|
|
'''
|
2017-10-10 11:05:32 -07:00
|
|
|
result = self.rr.table(self.table).get(id).delete().run()
|
2017-04-28 15:23:55 -07:00
|
|
|
if result != {
|
|
|
|
'deleted':1, 'errors':0,'inserted':0,
|
|
|
|
'replaced':0,'skipped':0,'unchanged':0}:
|
2019-05-16 23:29:55 +00:00
|
|
|
self.logger.warning(
|
2017-04-28 15:23:55 -07:00
|
|
|
'unexpected result attempting to delete id=%s from '
|
|
|
|
'rethinkdb services table: %s', id, result)
|
2017-02-21 16:27:49 -08:00
|
|
|
|
2017-04-28 16:29:00 -07:00
|
|
|
def unique_service(self, role, candidate=None):
|
2017-04-18 16:56:53 -07:00
|
|
|
'''
|
2017-04-28 16:29:00 -07:00
|
|
|
Retrieve a unique service, possibly setting or heartbeating it first.
|
2017-04-18 16:56:53 -07:00
|
|
|
|
2017-04-28 16:29:00 -07:00
|
|
|
A "unique service" is a service with only one instance for a given
|
|
|
|
role. Uniqueness is enforced by using the role name as the primary key
|
|
|
|
`{'id':role, ...}`.
|
2017-04-18 16:56:53 -07:00
|
|
|
|
2017-04-28 16:29:00 -07:00
|
|
|
Args:
|
|
|
|
role (str): role name
|
|
|
|
candidate (dict): if supplied, candidate info for the unique
|
|
|
|
service, explained below
|
|
|
|
|
|
|
|
`candidate` normally represents "myself, this instance of the service".
|
|
|
|
When a service supplies `candidate`, it is nominating itself for
|
|
|
|
selection as the unique service, or retaining its claim to the role
|
|
|
|
(heartbeating).
|
|
|
|
|
|
|
|
If `candidate` is supplied:
|
|
|
|
|
|
|
|
First, atomically in a single rethinkdb query, checks if there is
|
|
|
|
already a unique healthy instance of this service in rethinkdb, and
|
|
|
|
if not, sets `candidate` as the unique service.
|
|
|
|
|
|
|
|
Looks at the result of that query to determine if `candidate` is
|
|
|
|
the unique service or not. If it is, updates 'last_heartbeat' in
|
|
|
|
rethinkdb.
|
|
|
|
|
|
|
|
To determine whether `candidate` is the unique service, checks that
|
|
|
|
all the fields other than 'first_heartbeat' and 'last_heartbeat'
|
|
|
|
have the same value in `candidate` as in the value returned from
|
|
|
|
rethinkdb.
|
|
|
|
|
|
|
|
***Important***: this means that the caller must ensure that none
|
|
|
|
of the fields of the unique service ever change. Don't store things
|
|
|
|
like 'load' or any other volatile value in there. If you try to do
|
|
|
|
that, heartbeats will end up not being sent, and the unique service
|
|
|
|
will flap among the candidates.
|
|
|
|
|
|
|
|
Finally, retrieves the service from rethinkdb and returns it, if it is
|
|
|
|
healthy.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
the unique service, if there is one and it is healthy, otherwise
|
|
|
|
None
|
2017-04-18 16:56:53 -07:00
|
|
|
'''
|
2017-09-06 17:25:35 -07:00
|
|
|
# use the same concept of 'now' for all queries
|
2017-05-17 12:11:26 -07:00
|
|
|
now = doublethink.utcnow()
|
2017-04-28 16:29:00 -07:00
|
|
|
if candidate is not None:
|
|
|
|
candidate['id'] = role
|
|
|
|
|
2017-05-16 14:31:32 -07:00
|
|
|
if not 'ttl' in candidate:
|
2017-05-17 12:28:48 -07:00
|
|
|
raise Exception("candidate is missing required field 'ttl'")
|
2017-05-16 14:31:32 -07:00
|
|
|
val = candidate['ttl']
|
2017-04-28 16:29:00 -07:00
|
|
|
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
|
2017-05-17 12:28:48 -07:00
|
|
|
raise Exception("'ttl' must be a number > 0")
|
2017-04-28 16:29:00 -07:00
|
|
|
|
2017-05-16 11:31:03 -07:00
|
|
|
candidate['first_heartbeat'] = now
|
|
|
|
candidate['last_heartbeat'] = now
|
2017-04-28 16:29:00 -07:00
|
|
|
if not 'host' in candidate:
|
|
|
|
candidate['host'] = socket.gethostname()
|
|
|
|
if not 'pid' in candidate:
|
|
|
|
candidate['pid'] = os.getpid()
|
|
|
|
|
|
|
|
result = self.rr.table(
|
2017-10-10 11:05:32 -07:00
|
|
|
self.table, read_mode='majority').get(role).replace(
|
2017-04-28 16:29:00 -07:00
|
|
|
lambda row: r.branch(
|
|
|
|
r.branch(
|
|
|
|
row,
|
2017-05-17 12:15:44 -07:00
|
|
|
row['last_heartbeat'] > now - row['ttl'],
|
2017-04-28 16:29:00 -07:00
|
|
|
False),
|
2017-05-01 12:04:26 -07:00
|
|
|
row, candidate),
|
2017-04-28 16:29:00 -07:00
|
|
|
return_changes='always').run()
|
|
|
|
new_val = result['changes'][0]['new_val']
|
2017-05-26 14:52:36 -07:00
|
|
|
if all([new_val.get(k) == candidate[k] for k in candidate
|
2017-04-28 16:29:00 -07:00
|
|
|
if k not in ('first_heartbeat', 'last_heartbeat')]):
|
|
|
|
# candidate is the unique_service, send a heartbeat
|
|
|
|
del candidate['first_heartbeat'] # don't touch first_heartbeat
|
2017-10-10 11:05:32 -07:00
|
|
|
self.rr.table(self.table).get(role).update(candidate).run()
|
2017-04-28 16:29:00 -07:00
|
|
|
|
|
|
|
results = list(self.rr.table(
|
2017-10-10 11:05:32 -07:00
|
|
|
self.table, read_mode='majority').get_all(role).filter(
|
2017-05-17 12:15:44 -07:00
|
|
|
lambda row: row['last_heartbeat'] > now - row['ttl']).run())
|
2017-04-28 16:29:00 -07:00
|
|
|
if results:
|
|
|
|
return results[0]
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
2017-04-28 15:23:55 -07:00
|
|
|
def healthy_service(self, role):
|
|
|
|
'''
|
|
|
|
Find least loaded healthy service in the registry.
|
|
|
|
|
2017-05-17 12:28:48 -07:00
|
|
|
A service is considered healthy if its 'last_heartbeat' was less than
|
|
|
|
'ttl' seconds ago
|
2017-04-28 15:23:55 -07:00
|
|
|
|
|
|
|
Args:
|
|
|
|
role (str): role name
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
the healthy service with the supplied `role` with the smallest
|
|
|
|
value of 'load'
|
|
|
|
'''
|
2017-02-21 16:27:49 -08:00
|
|
|
try:
|
2017-10-10 11:05:32 -07:00
|
|
|
result = self.rr.table(self.table).get_all(role, index='role').filter(
|
2017-09-06 17:25:35 -07:00
|
|
|
lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"]
|
|
|
|
).order_by("load")[0].run()
|
2017-02-21 16:27:49 -08:00
|
|
|
return result
|
|
|
|
except r.ReqlNonExistenceError:
|
|
|
|
return None
|
|
|
|
|
2017-04-28 15:23:55 -07:00
|
|
|
def healthy_services(self, role=None):
|
|
|
|
'''
|
|
|
|
Look up healthy services in the registry.
|
|
|
|
|
2017-05-17 12:28:48 -07:00
|
|
|
A service is considered healthy if its 'last_heartbeat' was less than
|
|
|
|
'ttl' seconds ago
|
2017-04-28 15:23:55 -07:00
|
|
|
|
|
|
|
Args:
|
|
|
|
role (str, optional): role name
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
If `role` is supplied, returns list of healthy services for the
|
|
|
|
given role, otherwise returns list of all healthy services. May
|
|
|
|
return an empty list.
|
|
|
|
'''
|
2017-02-21 16:27:49 -08:00
|
|
|
try:
|
2017-10-10 11:05:32 -07:00
|
|
|
query = self.rr.table(self.table)
|
2017-02-21 16:27:49 -08:00
|
|
|
if role:
|
2017-09-06 17:25:35 -07:00
|
|
|
query = query.get_all(role, index='role')
|
2017-02-21 16:27:49 -08:00
|
|
|
query = query.filter(
|
2017-05-16 14:31:32 -07:00
|
|
|
lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] #.default(20.0)
|
2017-02-21 16:27:49 -08:00
|
|
|
).order_by("load")
|
|
|
|
result = query.run()
|
|
|
|
return result
|
|
|
|
except r.ReqlNonExistenceError:
|
|
|
|
return []
|
|
|
|
|
2017-04-28 15:23:55 -07:00
|
|
|
available_service = healthy_service
|
|
|
|
available_services = healthy_services
|
|
|
|
|
2017-09-26 16:43:37 -07:00
|
|
|
def purge_stale_services(self, ttls_until_deletion=2):
|
2017-10-10 11:05:32 -07:00
|
|
|
query = self.rr.table(self.table).filter(
|
2017-09-26 16:43:37 -07:00
|
|
|
lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion)
|
2017-09-26 17:00:17 -07:00
|
|
|
).delete()
|
|
|
|
logging.debug("Running query: %s", query)
|
|
|
|
result = query.run()
|
|
|
|
logging.debug("Results: %s", result)
|
2017-09-26 15:51:11 -07:00
|
|
|
return result
|