From a5cac606fa421230dde281d8aa0fca2361302e4d Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 18 Apr 2017 16:56:53 -0700 Subject: [PATCH] added a leader election feature and tests --- doublethink/services.py | 20 ++++++++++++++++++++ tests/test_svcreg.py | 26 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/doublethink/services.py b/doublethink/services.py index c1cfc8b..f8a04da 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -81,6 +81,26 @@ class ServiceRegistry(object): if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}: self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result) + def leader(self, role_name, default=None): + ''' + Perform leader election for a role. + + If only 'role_name' is provided, simply looks up a leader for 'role_name', returning None on failure. + + If 'default' is provided, it is considered to be a leader candidate. The supplied leader candidate + will become leader in the case that: + a) there is no registered leader + OR + b) the current leader has missed 3 or more heartbeat intervals. + ''' + if default is not None: + default['id'] = role_name + default['last_heartbeat'] = r.now() + if not 'heartbeat_interval' in default: + raise Exception('Default service must contain a key called "heartbeat_interval"') + self.rr.table('services', read_mode='majority').get(role_name).replace(lambda row: r.branch(r.branch(row, row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, False), row, default)).run() + return self.rr.table('services', read_mode='majority').get(role_name).run() + def available_service(self, role): try: result = self.rr.table('services').filter({"role":role}).filter( diff --git a/tests/test_svcreg.py b/tests/test_svcreg.py index fb45bcd..6e8be6d 100644 --- a/tests/test_svcreg.py +++ b/tests/test_svcreg.py @@ -52,6 +52,32 @@ def rr(): assert result["dbs_created"] == 1 return RethinkerForTesting(db="doublethink_test_db") +def test_leader_election(rr): + svcreg = doublethink.ServiceRegistry(rr) + assert svcreg.leader('example-role') == None + # this raises an exception: no heartbeat_interval. + with pytest.raises(Exception) as excinfo: + svcreg.leader('example-role', default={}) + svc01 = { + "role": "example-role", + "load": 0.0, + "heartbeat_interval": 0.4, + "node": "test01.example.com" + } + svc02 = { + "role": "example-role", + "load": 0.0, + "heartbeat_interval": 0.4, + "node": "test02.example.com" + } + # register svc01. output should be svc01. + output = svcreg.leader('example-role', default=svc01) + assert output['node'] == svc01['node'] + # try to register svc02. Output should still be svc01. + output = svcreg.leader('example-role', default=svc02) + assert output['node'] == svc01['node'] + svcreg.unregister('example-role') + def test_service_registry(rr): svcreg = doublethink.ServiceRegistry(rr) assert svcreg.available_service("yes-such-role") == None