diff --git a/doublethink/services.py b/doublethink/services.py index e59c65e..9402519 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -31,25 +31,24 @@ class ServiceRegistry(object): by calling `heartbeat(status_info)` periodically. `status_info` is a dict and must have at least the fields 'role', 'load', - and 'heartbeat_interval'. Certain other fields are populated automatically - as in the example below. In addition, services may set arbitrary other - fields. + and 'ttl'. Certain other fields are populated automatically as in the + example below. In addition, services may set arbitrary other fields. Some information about required fields: 'role': The role of the service. `healthy_service()` and `healthy_services()` look up services using this field. - 'heartbeat_interval': Specifies the expected time between heartbeats. If - a service's last heartbeat was more than `3 * heartbeat_interval` - seconds ago, it is considered to be "down". `healthy_services()` - and `healthy_service()` never return entries for services that are - considered "down". + 'ttl': If a service's last heartbeat was more than 'ttl' seconds ago, it + is considered to be "down". `healthy_services()` and + `healthy_service()` never return entries for services that are + considered "down". A sensible convention is to heartbeat 3 times per + 'ttl', that is, every `ttl/3` seconds. 'load': An arbitrary numeric value. It is up to each service to populate this field in a way that makes sense to the particular service. `healthy_service(role)` returns the service with the lowest load for the supplied role. Thus load values need to be comparable to within the context of a single service, but comparing loads of - services of different roles does not necessarily make any sense. + services of different roles might not make any sense. About the 'id' field: @@ -73,7 +72,7 @@ class ServiceRegistry(object): 'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied 'role': 'brozzler-worker', 'load': 0.5, # load score - 'heartbeat_interval': 20.0, + 'ttl': 60.0, 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback 'pid': 1234, # set in svcreg.heartbeat() as a fallback 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() @@ -122,9 +121,8 @@ class ServiceRegistry(object): service `status_info` must have at least the fields 'role', 'load', and - 'heartbeat_interval'. Some additional fields are populated - automatically by this method. If the field 'id' is absent, it will be - generated by rethinkdb. + 'ttl'. Some additional fields are populated automatically by this + method. If the field 'id' is absent, it will be generated by rethinkdb. See the ServiceRegistry class-level documentation for more information about the various fields. @@ -135,17 +133,16 @@ class ServiceRegistry(object): Raises: Exception: if `status_info` is missing a required field, or a - `status_info['heartbeat_interval']` is not a number greater - than zero + `status_info['ttl']` is not a number greater than zero ''' - for field in 'role', 'heartbeat_interval', 'load': + for field in 'role', 'ttl', 'load': if not field in status_info: raise Exception( 'status_info is missing required field %s', repr(field)) - val = status_info['heartbeat_interval'] + val = status_info['ttl'] if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: - raise Exception('heartbeat_interval must be a number > 0') + raise Exception('ttl must be a number > 0') updated_status_info = dict(status_info) updated_status_info['last_heartbeat'] = r.now() if not 'first_heartbeat' in updated_status_info: @@ -226,13 +223,11 @@ class ServiceRegistry(object): if candidate is not None: candidate['id'] = role - if not 'heartbeat_interval' in candidate: - raise Exception( - "candidate is missing required field " - "'heartbeat_interval'") - val = candidate['heartbeat_interval'] + if not 'ttl' in candidate: + raise Exception("candidate is missing required field 'ttl'") + val = candidate['ttl'] if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: - raise Exception('heartbeat_interval must be a number > 0') + raise Exception("'ttl' must be a number > 0") candidate['first_heartbeat'] = now candidate['last_heartbeat'] = now @@ -246,7 +241,7 @@ class ServiceRegistry(object): lambda row: r.branch( r.branch( row, - row['last_heartbeat'] > now - row['heartbeat_interval'] * 3, + row['last_heartbeat'] > now - row['ttl'], False), row, candidate), return_changes='always').run() @@ -259,7 +254,7 @@ class ServiceRegistry(object): results = list(self.rr.table( 'services', read_mode='majority').get_all(role).filter( - lambda row: row['last_heartbeat'] > now - row['heartbeat_interval'] * 3).run()) + lambda row: row['last_heartbeat'] > now - row['ttl']).run()) if results: return results[0] else: @@ -269,8 +264,8 @@ class ServiceRegistry(object): ''' Find least loaded healthy service in the registry. - A service is considered healthy if its 'last_heartbeat' is in the last - `3 * heartbeat_interval` seconds. + A service is considered healthy if its 'last_heartbeat' was less than + 'ttl' seconds ago Args: role (str): role name @@ -281,7 +276,7 @@ class ServiceRegistry(object): ''' try: result = self.rr.table('services').filter({"role":role}).filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] + lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] ).order_by("load")[0].run() return result except r.ReqlNonExistenceError: @@ -291,8 +286,8 @@ class ServiceRegistry(object): ''' Look up healthy services in the registry. - A service is considered healthy if its `last_heartbeat` is in the last - `3 * heartbeat_interval` seconds. + A service is considered healthy if its 'last_heartbeat' was less than + 'ttl' seconds ago Args: role (str, optional): role name @@ -307,7 +302,7 @@ class ServiceRegistry(object): if role: query = query.filter({"role":role}) query = query.filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) + lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] #.default(20.0) ).order_by("load") result = query.run() return result diff --git a/tests/test_svcreg.py b/tests/test_svcreg.py index bbe7ace..68f4d91 100644 --- a/tests/test_svcreg.py +++ b/tests/test_svcreg.py @@ -55,17 +55,17 @@ def rr(): def test_unique_service(rr): svcreg = doublethink.ServiceRegistry(rr) assert svcreg.unique_service('example-role') == None - # this raises an exception: no heartbeat_interval. + # this raises an exception: no ttl. with pytest.raises(Exception) as excinfo: svcreg.unique_service('example-role', candidate={}) svc01 = { "role": "example-role", - "heartbeat_interval": 0.4, + "ttl": 1.2, "node": "test01.example.com" } svc02 = { "role": "example-role", - "heartbeat_interval": 0.4, + "ttl": 1.2, "node": "test02.example.com" } # register svc01. output should be svc01. @@ -93,23 +93,23 @@ def test_service_registry(rr): with pytest.raises(Exception) as excinfo: svcreg.heartbeat({"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"role":"foo","heartbeat_interval":1.0}) + svcreg.heartbeat({"role":"foo","ttl":1.0}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":1.0,"load":1}) + svcreg.heartbeat({"ttl":1.0,"load":1}) - # invalid heartbeat interval (we accept anything for load and role) + # invalid ttl (we accept anything for load and role) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":-1,"role":"foo","load":1}) + svcreg.heartbeat({"ttl":-1,"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":"strang","role":"foo","load":1}) + svcreg.heartbeat({"ttl":"strang","role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":[],"role":"foo","load":1}) + svcreg.heartbeat({"ttl":[],"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":[1],"role":"foo","load":1}) + svcreg.heartbeat({"ttl":[1],"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":{},"role":"foo","load":1}) + svcreg.heartbeat({"ttl":{},"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":{1:2},"role":"foo","load":1}) + svcreg.heartbeat({"ttl":{1:2},"role":"foo","load":1}) assert svcreg.available_service("yes-such-role") == None assert svcreg.available_services("yes-such-role") == [] @@ -117,12 +117,12 @@ def test_service_registry(rr): svc0 = { "role": "yes-such-role", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc1 = { "role": "yes-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) @@ -188,12 +188,12 @@ def test_service_registry(rr): svc0 = { "role": "yes-such-role", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc1 = { "role": "yes-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) @@ -205,22 +205,22 @@ def test_service_registry(rr): svc0 = { "role": "yes-such-role", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc1 = { "role": "yes-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc2 = { "role": "another-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc3 = { "role": "yet-another-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) @@ -245,7 +245,7 @@ def test_svcreg_heartbeat_server_down(rr): svc0 = { "role": "role-foo", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 1.2, } # no exception thrown svc0 = svcreg.heartbeat(svc0)