Merge pull request #3 from internetarchive/unique-service

rename ServiceRegistry.leader() to ServiceRegistry.unique_service(); … 👍
This commit is contained in:
jkafader 2017-05-01 12:35:43 -07:00 committed by GitHub
commit 2815983e40
2 changed files with 88 additions and 20 deletions

View File

@ -174,25 +174,93 @@ class ServiceRegistry(object):
'unexpected result attempting to delete id=%s from '
'rethinkdb services table: %s', id, result)
def leader(self, role_name, default=None):
def unique_service(self, role, candidate=None):
'''
Perform leader election for a role.
Retrieve a unique service, possibly setting or heartbeating it first.
If only 'role_name' is provided, simply looks up a leader for 'role_name', returning None on failure.
A "unique service" is a service with only one instance for a given
role. Uniqueness is enforced by using the role name as the primary key
`{'id':role, ...}`.
If 'default' is provided, it is considered to be a leader candidate. The supplied leader candidate
will become leader in the case that:
a) there is no registered leader
OR
b) the current leader has missed 3 or more heartbeat intervals.
Args:
role (str): role name
candidate (dict): if supplied, candidate info for the unique
service, explained below
`candidate` normally represents "myself, this instance of the service".
When a service supplies `candidate`, it is nominating itself for
selection as the unique service, or retaining its claim to the role
(heartbeating).
If `candidate` is supplied:
First, atomically in a single rethinkdb query, checks if there is
already a unique healthy instance of this service in rethinkdb, and
if not, sets `candidate` as the unique service.
Looks at the result of that query to determine if `candidate` is
the unique service or not. If it is, updates 'last_heartbeat' in
rethinkdb.
To determine whether `candidate` is the unique service, checks that
all the fields other than 'first_heartbeat' and 'last_heartbeat'
have the same value in `candidate` as in the value returned from
rethinkdb.
***Important***: this means that the caller must ensure that none
of the fields of the unique service ever change. Don't store things
like 'load' or any other volatile value in there. If you try to do
that, heartbeats will end up not being sent, and the unique service
will flap among the candidates.
Finally, retrieves the service from rethinkdb and returns it, if it is
healthy.
Returns:
the unique service, if there is one and it is healthy, otherwise
None
'''
if default is not None:
default['id'] = role_name
default['last_heartbeat'] = r.now()
if not 'heartbeat_interval' in default:
raise Exception('Default service must contain a key called "heartbeat_interval"')
self.rr.table('services', read_mode='majority').get(role_name).replace(lambda row: r.branch(r.branch(row, row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, False), row, default)).run()
return self.rr.table('services', read_mode='majority').get(role_name).run()
if candidate is not None:
candidate['id'] = role
if not 'heartbeat_interval' in candidate:
raise Exception(
"candidate is missing required field "
"'heartbeat_interval'")
val = candidate['heartbeat_interval']
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
raise Exception('heartbeat_interval must be a number > 0')
candidate['first_heartbeat'] = r.now()
candidate['last_heartbeat'] = r.now()
if not 'host' in candidate:
candidate['host'] = socket.gethostname()
if not 'pid' in candidate:
candidate['pid'] = os.getpid()
result = self.rr.table(
'services', read_mode='majority').get(role).replace(
lambda row: r.branch(
r.branch(
row,
row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3,
False),
row, candidate),
return_changes='always').run()
new_val = result['changes'][0]['new_val']
if all([new_val[k] == candidate[k] for k in candidate
if k not in ('first_heartbeat', 'last_heartbeat')]):
# candidate is the unique_service, send a heartbeat
del candidate['first_heartbeat'] # don't touch first_heartbeat
self.rr.table('services').get(role).update(candidate).run()
results = list(self.rr.table(
'services', read_mode='majority').get_all(role).filter(
lambda row: row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3).run())
if results:
return results[0]
else:
return None
def healthy_service(self, role):
'''

View File

@ -52,12 +52,12 @@ def rr():
assert result["dbs_created"] == 1
return RethinkerForTesting(db="doublethink_test_db")
def test_leader_election(rr):
def test_unique_service(rr):
svcreg = doublethink.ServiceRegistry(rr)
assert svcreg.leader('example-role') == None
assert svcreg.unique_service('example-role') == None
# this raises an exception: no heartbeat_interval.
with pytest.raises(Exception) as excinfo:
svcreg.leader('example-role', default={})
svcreg.unique_service('example-role', candidate={})
svc01 = {
"role": "example-role",
"load": 0.0,
@ -71,10 +71,10 @@ def test_leader_election(rr):
"node": "test02.example.com"
}
# register svc01. output should be svc01.
output = svcreg.leader('example-role', default=svc01)
output = svcreg.unique_service('example-role', candidate=svc01)
assert output['node'] == svc01['node']
# try to register svc02. Output should still be svc01.
output = svcreg.leader('example-role', default=svc02)
output = svcreg.unique_service('example-role', candidate=svc02)
assert output['node'] == svc01['node']
svcreg.unregister('example-role')