From b85566c405aa511c2c99b8798f19b5238a709f7b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 28 Apr 2017 16:29:00 -0700 Subject: [PATCH 1/2] rename ServiceRegistry.leader() to ServiceRegistry.unique_service(); implement heartbeat in that method; only return healthy service; add a detailed docstring --- doublethink/services.py | 100 ++++++++++++++++++++++++++++++++++------ tests/test_svcreg.py | 10 ++-- 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/doublethink/services.py b/doublethink/services.py index c710386..8b7885b 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -174,25 +174,95 @@ class ServiceRegistry(object): 'unexpected result attempting to delete id=%s from ' 'rethinkdb services table: %s', id, result) - def leader(self, role_name, default=None): + def unique_service(self, role, candidate=None): ''' - Perform leader election for a role. + Retrieve a unique service, possibly setting or heartbeating it first. - If only 'role_name' is provided, simply looks up a leader for 'role_name', returning None on failure. + A "unique service" is a service with only one instance for a given + role. Uniqueness is enforced by using the role name as the primary key + `{'id':role, ...}`. - If 'default' is provided, it is considered to be a leader candidate. The supplied leader candidate - will become leader in the case that: - a) there is no registered leader - OR - b) the current leader has missed 3 or more heartbeat intervals. + Args: + role (str): role name + candidate (dict): if supplied, candidate info for the unique + service, explained below + + `candidate` normally represents "myself, this instance of the service". + When a service supplies `candidate`, it is nominating itself for + selection as the unique service, or retaining its claim to the role + (heartbeating). + + If `candidate` is supplied: + + First, atomically in a single rethinkdb query, checks if there is + already a unique healthy instance of this service in rethinkdb, and + if not, sets `candidate` as the unique service. + + Looks at the result of that query to determine if `candidate` is + the unique service or not. If it is, updates 'last_heartbeat' in + rethinkdb. + + To determine whether `candidate` is the unique service, checks that + all the fields other than 'first_heartbeat' and 'last_heartbeat' + have the same value in `candidate` as in the value returned from + rethinkdb. + + ***Important***: this means that the caller must ensure that none + of the fields of the unique service ever change. Don't store things + like 'load' or any other volatile value in there. If you try to do + that, heartbeats will end up not being sent, and the unique service + will flap among the candidates. + + Finally, retrieves the service from rethinkdb and returns it, if it is + healthy. + + Returns: + the unique service, if there is one and it is healthy, otherwise + None ''' - if default is not None: - default['id'] = role_name - default['last_heartbeat'] = r.now() - if not 'heartbeat_interval' in default: - raise Exception('Default service must contain a key called "heartbeat_interval"') - self.rr.table('services', read_mode='majority').get(role_name).replace(lambda row: r.branch(r.branch(row, row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, False), row, default)).run() - return self.rr.table('services', read_mode='majority').get(role_name).run() + if candidate is not None: + candidate['id'] = role + + if not 'heartbeat_interval' in candidate: + raise Exception( + "candidate is missing required field " + "'heartbeat_interval'") + val = candidate['heartbeat_interval'] + if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: + raise Exception('heartbeat_interval must be a number > 0') + + candidate['first_heartbeat'] = r.now() + candidate['last_heartbeat'] = r.now() + if not 'host' in candidate: + candidate['host'] = socket.gethostname() + if not 'pid' in candidate: + candidate['pid'] = os.getpid() + + result = self.rr.table( + 'services', read_mode='majority').get(role).replace( + lambda row: r.branch( + r.branch( + row, + row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, + False), + row.merge({'last_heartbeat': r.now()}), + candidate), + return_changes='always').run() + new_val = result['changes'][0]['new_val'] + if all([new_val[k] == candidate[k] for k in candidate + if k not in ('first_heartbeat', 'last_heartbeat')]): + # candidate is the unique_service, send a heartbeat + del candidate['first_heartbeat'] # don't touch first_heartbeat + self.rr.table('services').get(role).update(candidate).run() + + results = list(self.rr.table( + 'services', read_mode='majority').get_all(role).filter( + lambda row: row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3).run()) + if results: + return results[0] + else: + return None + def healthy_service(self, role): ''' diff --git a/tests/test_svcreg.py b/tests/test_svcreg.py index 738b029..6572df6 100644 --- a/tests/test_svcreg.py +++ b/tests/test_svcreg.py @@ -52,12 +52,12 @@ def rr(): assert result["dbs_created"] == 1 return RethinkerForTesting(db="doublethink_test_db") -def test_leader_election(rr): +def test_unique_service(rr): svcreg = doublethink.ServiceRegistry(rr) - assert svcreg.leader('example-role') == None + assert svcreg.unique_service('example-role') == None # this raises an exception: no heartbeat_interval. with pytest.raises(Exception) as excinfo: - svcreg.leader('example-role', default={}) + svcreg.unique_service('example-role', candidate={}) svc01 = { "role": "example-role", "load": 0.0, @@ -71,10 +71,10 @@ def test_leader_election(rr): "node": "test02.example.com" } # register svc01. output should be svc01. - output = svcreg.leader('example-role', default=svc01) + output = svcreg.unique_service('example-role', candidate=svc01) assert output['node'] == svc01['node'] # try to register svc02. Output should still be svc01. - output = svcreg.leader('example-role', default=svc02) + output = svcreg.unique_service('example-role', candidate=svc02) assert output['node'] == svc01['node'] svcreg.unregister('example-role') From 9c2e7a59e1a1f9b802bef953c4a4573304f0dcce Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 1 May 2017 12:04:26 -0700 Subject: [PATCH 2/2] remove stray code from abandoned heartbeat technique in unique_service() --- doublethink/services.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/doublethink/services.py b/doublethink/services.py index 8b7885b..1a22af5 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -245,8 +245,7 @@ class ServiceRegistry(object): row, row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, False), - row.merge({'last_heartbeat': r.now()}), - candidate), + row, candidate), return_changes='always').run() new_val = result['changes'][0]['new_val'] if all([new_val[k] == candidate[k] for k in candidate @@ -263,7 +262,6 @@ class ServiceRegistry(object): else: return None - def healthy_service(self, role): ''' Find least loaded healthy service in the registry.