mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Merge pull request #2 from jkafader/master
added a leader election feature and tests
This commit is contained in:
commit
efc999f7d1
@ -81,6 +81,26 @@ class ServiceRegistry(object):
|
||||
if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
|
||||
self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
|
||||
|
||||
def leader(self, role_name, default=None):
|
||||
'''
|
||||
Perform leader election for a role.
|
||||
|
||||
If only 'role_name' is provided, simply looks up a leader for 'role_name', returning None on failure.
|
||||
|
||||
If 'default' is provided, it is considered to be a leader candidate. The supplied leader candidate
|
||||
will become leader in the case that:
|
||||
a) there is no registered leader
|
||||
OR
|
||||
b) the current leader has missed 3 or more heartbeat intervals.
|
||||
'''
|
||||
if default is not None:
|
||||
default['id'] = role_name
|
||||
default['last_heartbeat'] = r.now()
|
||||
if not 'heartbeat_interval' in default:
|
||||
raise Exception('Default service must contain a key called "heartbeat_interval"')
|
||||
self.rr.table('services', read_mode='majority').get(role_name).replace(lambda row: r.branch(r.branch(row, row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, False), row, default)).run()
|
||||
return self.rr.table('services', read_mode='majority').get(role_name).run()
|
||||
|
||||
def available_service(self, role):
|
||||
try:
|
||||
result = self.rr.table('services').filter({"role":role}).filter(
|
||||
|
@ -52,6 +52,32 @@ def rr():
|
||||
assert result["dbs_created"] == 1
|
||||
return RethinkerForTesting(db="doublethink_test_db")
|
||||
|
||||
def test_leader_election(rr):
|
||||
svcreg = doublethink.ServiceRegistry(rr)
|
||||
assert svcreg.leader('example-role') == None
|
||||
# this raises an exception: no heartbeat_interval.
|
||||
with pytest.raises(Exception) as excinfo:
|
||||
svcreg.leader('example-role', default={})
|
||||
svc01 = {
|
||||
"role": "example-role",
|
||||
"load": 0.0,
|
||||
"heartbeat_interval": 0.4,
|
||||
"node": "test01.example.com"
|
||||
}
|
||||
svc02 = {
|
||||
"role": "example-role",
|
||||
"load": 0.0,
|
||||
"heartbeat_interval": 0.4,
|
||||
"node": "test02.example.com"
|
||||
}
|
||||
# register svc01. output should be svc01.
|
||||
output = svcreg.leader('example-role', default=svc01)
|
||||
assert output['node'] == svc01['node']
|
||||
# try to register svc02. Output should still be svc01.
|
||||
output = svcreg.leader('example-role', default=svc02)
|
||||
assert output['node'] == svc01['node']
|
||||
svcreg.unregister('example-role')
|
||||
|
||||
def test_service_registry(rr):
|
||||
svcreg = doublethink.ServiceRegistry(rr)
|
||||
assert svcreg.available_service("yes-such-role") == None
|
||||
|
Loading…
x
Reference in New Issue
Block a user