rename "heartbeat_interval" -> "ttl", simplify mathematics.

This commit is contained in:
James Kafader 2017-05-16 14:31:32 -07:00
parent 28b8c2eaac
commit 6dc3967bd6
2 changed files with 42 additions and 43 deletions

View File

@ -30,7 +30,7 @@ class ServiceRegistry(object):
by calling `heartbeat(status_info)` periodically. by calling `heartbeat(status_info)` periodically.
`status_info` is a dict and must have at least the fields 'role', 'load', `status_info` is a dict and must have at least the fields 'role', 'load',
and 'heartbeat_interval'. Certain other fields are populated automatically and 'ttl'. Certain other fields are populated automatically
as in the example below. In addition, services may set arbitrary other as in the example below. In addition, services may set arbitrary other
fields. fields.
@ -38,11 +38,10 @@ class ServiceRegistry(object):
'role': The role of the service. `healthy_service()` and 'role': The role of the service. `healthy_service()` and
`healthy_services()` look up services using this field. `healthy_services()` look up services using this field.
'heartbeat_interval': Specifies the expected time between heartbeats. If 'ttl': Specifies the expected time between heartbeats. If a service's
a service's last heartbeat was more than `3 * heartbeat_interval` last heartbeat was more than `ttl` seconds ago, it is considered
seconds ago, it is considered to be "down". `healthy_services()` to be "down". `healthy_services()` and `healthy_service()` never
and `healthy_service()` never return entries for services that are return entries for services that are considered "down".
considered "down".
'load': An arbitrary numeric value. It is up to each service to populate 'load': An arbitrary numeric value. It is up to each service to populate
this field in a way that makes sense to the particular service. this field in a way that makes sense to the particular service.
`healthy_service(role)` returns the service with the lowest load `healthy_service(role)` returns the service with the lowest load
@ -72,7 +71,7 @@ class ServiceRegistry(object):
'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied 'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied
'role': 'brozzler-worker', 'role': 'brozzler-worker',
'load': 0.5, # load score 'load': 0.5, # load score
'heartbeat_interval': 20.0, 'ttl': 20.0,
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
'pid': 1234, # set in svcreg.heartbeat() as a fallback 'pid': 1234, # set in svcreg.heartbeat() as a fallback
'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
@ -121,7 +120,7 @@ class ServiceRegistry(object):
service service
`status_info` must have at least the fields 'role', 'load', and `status_info` must have at least the fields 'role', 'load', and
'heartbeat_interval'. Some additional fields are populated 'ttl'. Some additional fields are populated
automatically by this method. If the field 'id' is absent, it will be automatically by this method. If the field 'id' is absent, it will be
generated by rethinkdb. generated by rethinkdb.
@ -134,17 +133,17 @@ class ServiceRegistry(object):
Raises: Raises:
Exception: if `status_info` is missing a required field, or a Exception: if `status_info` is missing a required field, or a
`status_info['heartbeat_interval']` is not a number greater `status_info['ttl']` is not a number greater
than zero than zero
''' '''
for field in 'role', 'heartbeat_interval', 'load': for field in 'role', 'ttl', 'load':
if not field in status_info: if not field in status_info:
raise Exception( raise Exception(
'status_info is missing required field %s', 'status_info is missing required field %s',
repr(field)) repr(field))
val = status_info['heartbeat_interval'] val = status_info['ttl']
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
raise Exception('heartbeat_interval must be a number > 0') raise Exception('ttl must be a number > 0')
updated_status_info = dict(status_info) updated_status_info = dict(status_info)
updated_status_info['last_heartbeat'] = r.now() updated_status_info['last_heartbeat'] = r.now()
if not 'first_heartbeat' in updated_status_info: if not 'first_heartbeat' in updated_status_info:
@ -223,13 +222,13 @@ class ServiceRegistry(object):
if candidate is not None: if candidate is not None:
candidate['id'] = role candidate['id'] = role
if not 'heartbeat_interval' in candidate: if not 'ttl' in candidate:
raise Exception( raise Exception(
"candidate is missing required field " "candidate is missing required field "
"'heartbeat_interval'") "'ttl'")
val = candidate['heartbeat_interval'] val = candidate['ttl']
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
raise Exception('heartbeat_interval must be a number > 0') raise Exception('"ttl" must be a number > 0')
candidate['first_heartbeat'] = r.now() candidate['first_heartbeat'] = r.now()
candidate['last_heartbeat'] = r.now() candidate['last_heartbeat'] = r.now()
@ -243,7 +242,7 @@ class ServiceRegistry(object):
lambda row: r.branch( lambda row: r.branch(
r.branch( r.branch(
row, row,
row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, row['last_heartbeat'] > r.now() - row['ttl'],
False), False),
row, candidate), row, candidate),
return_changes='always').run() return_changes='always').run()
@ -256,7 +255,7 @@ class ServiceRegistry(object):
results = list(self.rr.table( results = list(self.rr.table(
'services', read_mode='majority').get_all(role).filter( 'services', read_mode='majority').get_all(role).filter(
lambda row: row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3).run()) lambda row: row['last_heartbeat'] > r.now() - row['ttl']).run())
if results: if results:
return results[0] return results[0]
else: else:
@ -267,7 +266,7 @@ class ServiceRegistry(object):
Find least loaded healthy service in the registry. Find least loaded healthy service in the registry.
A service is considered healthy if its 'last_heartbeat' is in the last A service is considered healthy if its 'last_heartbeat' is in the last
`3 * heartbeat_interval` seconds. `ttl` seconds.
Args: Args:
role (str): role name role (str): role name
@ -278,7 +277,7 @@ class ServiceRegistry(object):
''' '''
try: try:
result = self.rr.table('services').filter({"role":role}).filter( result = self.rr.table('services').filter({"role":role}).filter(
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"]
).order_by("load")[0].run() ).order_by("load")[0].run()
return result return result
except r.ReqlNonExistenceError: except r.ReqlNonExistenceError:
@ -289,7 +288,7 @@ class ServiceRegistry(object):
Look up healthy services in the registry. Look up healthy services in the registry.
A service is considered healthy if its `last_heartbeat` is in the last A service is considered healthy if its `last_heartbeat` is in the last
`3 * heartbeat_interval` seconds. `ttl` seconds.
Args: Args:
role (str, optional): role name role (str, optional): role name
@ -304,7 +303,7 @@ class ServiceRegistry(object):
if role: if role:
query = query.filter({"role":role}) query = query.filter({"role":role})
query = query.filter( query = query.filter(
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] #.default(20.0)
).order_by("load") ).order_by("load")
result = query.run() result = query.run()
return result return result

View File

@ -55,17 +55,17 @@ def rr():
def test_unique_service(rr): def test_unique_service(rr):
svcreg = doublethink.ServiceRegistry(rr) svcreg = doublethink.ServiceRegistry(rr)
assert svcreg.unique_service('example-role') == None assert svcreg.unique_service('example-role') == None
# this raises an exception: no heartbeat_interval. # this raises an exception: no ttl.
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.unique_service('example-role', candidate={}) svcreg.unique_service('example-role', candidate={})
svc01 = { svc01 = {
"role": "example-role", "role": "example-role",
"heartbeat_interval": 0.4, "ttl": 0.4,
"node": "test01.example.com" "node": "test01.example.com"
} }
svc02 = { svc02 = {
"role": "example-role", "role": "example-role",
"heartbeat_interval": 0.4, "ttl": 0.4,
"node": "test02.example.com" "node": "test02.example.com"
} }
# register svc01. output should be svc01. # register svc01. output should be svc01.
@ -93,23 +93,23 @@ def test_service_registry(rr):
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"role":"foo","load":1}) svcreg.heartbeat({"role":"foo","load":1})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"role":"foo","heartbeat_interval":1.0}) svcreg.heartbeat({"role":"foo","ttl":1.0})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":1.0,"load":1}) svcreg.heartbeat({"ttl":1.0,"load":1})
# invalid heartbeat interval (we accept anything for load and role) # invalid ttl (we accept anything for load and role)
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":-1,"role":"foo","load":1}) svcreg.heartbeat({"ttl":-1,"role":"foo","load":1})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":"strang","role":"foo","load":1}) svcreg.heartbeat({"ttl":"strang","role":"foo","load":1})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":[],"role":"foo","load":1}) svcreg.heartbeat({"ttl":[],"role":"foo","load":1})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":[1],"role":"foo","load":1}) svcreg.heartbeat({"ttl":[1],"role":"foo","load":1})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":{},"role":"foo","load":1}) svcreg.heartbeat({"ttl":{},"role":"foo","load":1})
with pytest.raises(Exception) as excinfo: with pytest.raises(Exception) as excinfo:
svcreg.heartbeat({"heartbeat_interval":{1:2},"role":"foo","load":1}) svcreg.heartbeat({"ttl":{1:2},"role":"foo","load":1})
assert svcreg.available_service("yes-such-role") == None assert svcreg.available_service("yes-such-role") == None
assert svcreg.available_services("yes-such-role") == [] assert svcreg.available_services("yes-such-role") == []
@ -117,12 +117,12 @@ def test_service_registry(rr):
svc0 = { svc0 = {
"role": "yes-such-role", "role": "yes-such-role",
"load": 100.0, "load": 100.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc1 = { svc1 = {
"role": "yes-such-role", "role": "yes-such-role",
"load": 200.0, "load": 200.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc0 = svcreg.heartbeat(svc0) svc0 = svcreg.heartbeat(svc0)
svc1 = svcreg.heartbeat(svc1) svc1 = svcreg.heartbeat(svc1)
@ -188,12 +188,12 @@ def test_service_registry(rr):
svc0 = { svc0 = {
"role": "yes-such-role", "role": "yes-such-role",
"load": 100.0, "load": 100.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc1 = { svc1 = {
"role": "yes-such-role", "role": "yes-such-role",
"load": 200.0, "load": 200.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc0 = svcreg.heartbeat(svc0) svc0 = svcreg.heartbeat(svc0)
svc1 = svcreg.heartbeat(svc1) svc1 = svcreg.heartbeat(svc1)
@ -205,22 +205,22 @@ def test_service_registry(rr):
svc0 = { svc0 = {
"role": "yes-such-role", "role": "yes-such-role",
"load": 100.0, "load": 100.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc1 = { svc1 = {
"role": "yes-such-role", "role": "yes-such-role",
"load": 200.0, "load": 200.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc2 = { svc2 = {
"role": "another-such-role", "role": "another-such-role",
"load": 200.0, "load": 200.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc3 = { svc3 = {
"role": "yet-another-such-role", "role": "yet-another-such-role",
"load": 200.0, "load": 200.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
svc0 = svcreg.heartbeat(svc0) svc0 = svcreg.heartbeat(svc0)
svc1 = svcreg.heartbeat(svc1) svc1 = svcreg.heartbeat(svc1)
@ -245,7 +245,7 @@ def test_svcreg_heartbeat_server_down(rr):
svc0 = { svc0 = {
"role": "role-foo", "role": "role-foo",
"load": 100.0, "load": 100.0,
"heartbeat_interval": 0.4, "ttl": 0.4,
} }
# no exception thrown # no exception thrown
svc0 = svcreg.heartbeat(svc0) svc0 = svcreg.heartbeat(svc0)