diff --git a/doublethink/services.py b/doublethink/services.py index 1a22af5..aeca2d2 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -30,7 +30,7 @@ class ServiceRegistry(object): by calling `heartbeat(status_info)` periodically. `status_info` is a dict and must have at least the fields 'role', 'load', - and 'heartbeat_interval'. Certain other fields are populated automatically + and 'ttl'. Certain other fields are populated automatically as in the example below. In addition, services may set arbitrary other fields. @@ -38,11 +38,10 @@ class ServiceRegistry(object): 'role': The role of the service. `healthy_service()` and `healthy_services()` look up services using this field. - 'heartbeat_interval': Specifies the expected time between heartbeats. If - a service's last heartbeat was more than `3 * heartbeat_interval` - seconds ago, it is considered to be "down". `healthy_services()` - and `healthy_service()` never return entries for services that are - considered "down". + 'ttl': Specifies the expected time between heartbeats. If a service's + last heartbeat was more than `ttl` seconds ago, it is considered + to be "down". `healthy_services()` and `healthy_service()` never + return entries for services that are considered "down". 'load': An arbitrary numeric value. It is up to each service to populate this field in a way that makes sense to the particular service. `healthy_service(role)` returns the service with the lowest load @@ -72,7 +71,7 @@ class ServiceRegistry(object): 'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied 'role': 'brozzler-worker', 'load': 0.5, # load score - 'heartbeat_interval': 20.0, + 'ttl': 20.0, 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback 'pid': 1234, # set in svcreg.heartbeat() as a fallback 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() @@ -121,7 +120,7 @@ class ServiceRegistry(object): service `status_info` must have at least the fields 'role', 'load', and - 'heartbeat_interval'. Some additional fields are populated + 'ttl'. Some additional fields are populated automatically by this method. If the field 'id' is absent, it will be generated by rethinkdb. @@ -134,17 +133,17 @@ class ServiceRegistry(object): Raises: Exception: if `status_info` is missing a required field, or a - `status_info['heartbeat_interval']` is not a number greater + `status_info['ttl']` is not a number greater than zero ''' - for field in 'role', 'heartbeat_interval', 'load': + for field in 'role', 'ttl', 'load': if not field in status_info: raise Exception( 'status_info is missing required field %s', repr(field)) - val = status_info['heartbeat_interval'] + val = status_info['ttl'] if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: - raise Exception('heartbeat_interval must be a number > 0') + raise Exception('ttl must be a number > 0') updated_status_info = dict(status_info) updated_status_info['last_heartbeat'] = r.now() if not 'first_heartbeat' in updated_status_info: @@ -223,13 +222,13 @@ class ServiceRegistry(object): if candidate is not None: candidate['id'] = role - if not 'heartbeat_interval' in candidate: + if not 'ttl' in candidate: raise Exception( "candidate is missing required field " - "'heartbeat_interval'") - val = candidate['heartbeat_interval'] + "'ttl'") + val = candidate['ttl'] if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: - raise Exception('heartbeat_interval must be a number > 0') + raise Exception('"ttl" must be a number > 0') candidate['first_heartbeat'] = r.now() candidate['last_heartbeat'] = r.now() @@ -243,7 +242,7 @@ class ServiceRegistry(object): lambda row: r.branch( r.branch( row, - row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, + row['last_heartbeat'] > r.now() - row['ttl'], False), row, candidate), return_changes='always').run() @@ -256,7 +255,7 @@ class ServiceRegistry(object): results = list(self.rr.table( 'services', read_mode='majority').get_all(role).filter( - lambda row: row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3).run()) + lambda row: row['last_heartbeat'] > r.now() - row['ttl']).run()) if results: return results[0] else: @@ -267,7 +266,7 @@ class ServiceRegistry(object): Find least loaded healthy service in the registry. A service is considered healthy if its 'last_heartbeat' is in the last - `3 * heartbeat_interval` seconds. + `ttl` seconds. Args: role (str): role name @@ -278,7 +277,7 @@ class ServiceRegistry(object): ''' try: result = self.rr.table('services').filter({"role":role}).filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] + lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] ).order_by("load")[0].run() return result except r.ReqlNonExistenceError: @@ -289,7 +288,7 @@ class ServiceRegistry(object): Look up healthy services in the registry. A service is considered healthy if its `last_heartbeat` is in the last - `3 * heartbeat_interval` seconds. + `ttl` seconds. Args: role (str, optional): role name @@ -304,7 +303,7 @@ class ServiceRegistry(object): if role: query = query.filter({"role":role}) query = query.filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) + lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] #.default(20.0) ).order_by("load") result = query.run() return result diff --git a/tests/test_svcreg.py b/tests/test_svcreg.py index bbe7ace..4f3364c 100644 --- a/tests/test_svcreg.py +++ b/tests/test_svcreg.py @@ -55,17 +55,17 @@ def rr(): def test_unique_service(rr): svcreg = doublethink.ServiceRegistry(rr) assert svcreg.unique_service('example-role') == None - # this raises an exception: no heartbeat_interval. + # this raises an exception: no ttl. with pytest.raises(Exception) as excinfo: svcreg.unique_service('example-role', candidate={}) svc01 = { "role": "example-role", - "heartbeat_interval": 0.4, + "ttl": 0.4, "node": "test01.example.com" } svc02 = { "role": "example-role", - "heartbeat_interval": 0.4, + "ttl": 0.4, "node": "test02.example.com" } # register svc01. output should be svc01. @@ -93,23 +93,23 @@ def test_service_registry(rr): with pytest.raises(Exception) as excinfo: svcreg.heartbeat({"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"role":"foo","heartbeat_interval":1.0}) + svcreg.heartbeat({"role":"foo","ttl":1.0}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":1.0,"load":1}) + svcreg.heartbeat({"ttl":1.0,"load":1}) - # invalid heartbeat interval (we accept anything for load and role) + # invalid ttl (we accept anything for load and role) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":-1,"role":"foo","load":1}) + svcreg.heartbeat({"ttl":-1,"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":"strang","role":"foo","load":1}) + svcreg.heartbeat({"ttl":"strang","role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":[],"role":"foo","load":1}) + svcreg.heartbeat({"ttl":[],"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":[1],"role":"foo","load":1}) + svcreg.heartbeat({"ttl":[1],"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":{},"role":"foo","load":1}) + svcreg.heartbeat({"ttl":{},"role":"foo","load":1}) with pytest.raises(Exception) as excinfo: - svcreg.heartbeat({"heartbeat_interval":{1:2},"role":"foo","load":1}) + svcreg.heartbeat({"ttl":{1:2},"role":"foo","load":1}) assert svcreg.available_service("yes-such-role") == None assert svcreg.available_services("yes-such-role") == [] @@ -117,12 +117,12 @@ def test_service_registry(rr): svc0 = { "role": "yes-such-role", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc1 = { "role": "yes-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) @@ -188,12 +188,12 @@ def test_service_registry(rr): svc0 = { "role": "yes-such-role", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc1 = { "role": "yes-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) @@ -205,22 +205,22 @@ def test_service_registry(rr): svc0 = { "role": "yes-such-role", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc1 = { "role": "yes-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc2 = { "role": "another-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc3 = { "role": "yet-another-such-role", "load": 200.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) @@ -245,7 +245,7 @@ def test_svcreg_heartbeat_server_down(rr): svc0 = { "role": "role-foo", "load": 100.0, - "heartbeat_interval": 0.4, + "ttl": 0.4, } # no exception thrown svc0 = svcreg.heartbeat(svc0)