mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Merge pull request #5 from internetarchive/rename-heartbeat-interval-to-ttl
rename "heartbeat_interval" -> "ttl", simplify mathematics.
This commit is contained in:
commit
3dbd3f8ae1
@ -31,25 +31,24 @@ class ServiceRegistry(object):
|
|||||||
by calling `heartbeat(status_info)` periodically.
|
by calling `heartbeat(status_info)` periodically.
|
||||||
|
|
||||||
`status_info` is a dict and must have at least the fields 'role', 'load',
|
`status_info` is a dict and must have at least the fields 'role', 'load',
|
||||||
and 'heartbeat_interval'. Certain other fields are populated automatically
|
and 'ttl'. Certain other fields are populated automatically as in the
|
||||||
as in the example below. In addition, services may set arbitrary other
|
example below. In addition, services may set arbitrary other fields.
|
||||||
fields.
|
|
||||||
|
|
||||||
Some information about required fields:
|
Some information about required fields:
|
||||||
|
|
||||||
'role': The role of the service. `healthy_service()` and
|
'role': The role of the service. `healthy_service()` and
|
||||||
`healthy_services()` look up services using this field.
|
`healthy_services()` look up services using this field.
|
||||||
'heartbeat_interval': Specifies the expected time between heartbeats. If
|
'ttl': If a service's last heartbeat was more than 'ttl' seconds ago, it
|
||||||
a service's last heartbeat was more than `3 * heartbeat_interval`
|
is considered to be "down". `healthy_services()` and
|
||||||
seconds ago, it is considered to be "down". `healthy_services()`
|
`healthy_service()` never return entries for services that are
|
||||||
and `healthy_service()` never return entries for services that are
|
considered "down". A sensible convention is to heartbeat 3 times per
|
||||||
considered "down".
|
'ttl', that is, every `ttl/3` seconds.
|
||||||
'load': An arbitrary numeric value. It is up to each service to populate
|
'load': An arbitrary numeric value. It is up to each service to populate
|
||||||
this field in a way that makes sense to the particular service.
|
this field in a way that makes sense to the particular service.
|
||||||
`healthy_service(role)` returns the service with the lowest load
|
`healthy_service(role)` returns the service with the lowest load
|
||||||
for the supplied role. Thus load values need to be comparable to
|
for the supplied role. Thus load values need to be comparable to
|
||||||
within the context of a single service, but comparing loads of
|
within the context of a single service, but comparing loads of
|
||||||
services of different roles does not necessarily make any sense.
|
services of different roles might not make any sense.
|
||||||
|
|
||||||
About the 'id' field:
|
About the 'id' field:
|
||||||
|
|
||||||
@ -73,7 +72,7 @@ class ServiceRegistry(object):
|
|||||||
'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied
|
'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied
|
||||||
'role': 'brozzler-worker',
|
'role': 'brozzler-worker',
|
||||||
'load': 0.5, # load score
|
'load': 0.5, # load score
|
||||||
'heartbeat_interval': 20.0,
|
'ttl': 60.0,
|
||||||
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
|
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
|
||||||
'pid': 1234, # set in svcreg.heartbeat() as a fallback
|
'pid': 1234, # set in svcreg.heartbeat() as a fallback
|
||||||
'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
|
'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
|
||||||
@ -122,9 +121,8 @@ class ServiceRegistry(object):
|
|||||||
service
|
service
|
||||||
|
|
||||||
`status_info` must have at least the fields 'role', 'load', and
|
`status_info` must have at least the fields 'role', 'load', and
|
||||||
'heartbeat_interval'. Some additional fields are populated
|
'ttl'. Some additional fields are populated automatically by this
|
||||||
automatically by this method. If the field 'id' is absent, it will be
|
method. If the field 'id' is absent, it will be generated by rethinkdb.
|
||||||
generated by rethinkdb.
|
|
||||||
|
|
||||||
See the ServiceRegistry class-level documentation for more information
|
See the ServiceRegistry class-level documentation for more information
|
||||||
about the various fields.
|
about the various fields.
|
||||||
@ -135,17 +133,16 @@ class ServiceRegistry(object):
|
|||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
Exception: if `status_info` is missing a required field, or a
|
Exception: if `status_info` is missing a required field, or a
|
||||||
`status_info['heartbeat_interval']` is not a number greater
|
`status_info['ttl']` is not a number greater than zero
|
||||||
than zero
|
|
||||||
'''
|
'''
|
||||||
for field in 'role', 'heartbeat_interval', 'load':
|
for field in 'role', 'ttl', 'load':
|
||||||
if not field in status_info:
|
if not field in status_info:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
'status_info is missing required field %s',
|
'status_info is missing required field %s',
|
||||||
repr(field))
|
repr(field))
|
||||||
val = status_info['heartbeat_interval']
|
val = status_info['ttl']
|
||||||
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
|
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
|
||||||
raise Exception('heartbeat_interval must be a number > 0')
|
raise Exception('ttl must be a number > 0')
|
||||||
updated_status_info = dict(status_info)
|
updated_status_info = dict(status_info)
|
||||||
updated_status_info['last_heartbeat'] = r.now()
|
updated_status_info['last_heartbeat'] = r.now()
|
||||||
if not 'first_heartbeat' in updated_status_info:
|
if not 'first_heartbeat' in updated_status_info:
|
||||||
@ -226,13 +223,11 @@ class ServiceRegistry(object):
|
|||||||
if candidate is not None:
|
if candidate is not None:
|
||||||
candidate['id'] = role
|
candidate['id'] = role
|
||||||
|
|
||||||
if not 'heartbeat_interval' in candidate:
|
if not 'ttl' in candidate:
|
||||||
raise Exception(
|
raise Exception("candidate is missing required field 'ttl'")
|
||||||
"candidate is missing required field "
|
val = candidate['ttl']
|
||||||
"'heartbeat_interval'")
|
|
||||||
val = candidate['heartbeat_interval']
|
|
||||||
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
|
if not (isinstance(val, float) or isinstance(val, int)) or val <= 0:
|
||||||
raise Exception('heartbeat_interval must be a number > 0')
|
raise Exception("'ttl' must be a number > 0")
|
||||||
|
|
||||||
candidate['first_heartbeat'] = now
|
candidate['first_heartbeat'] = now
|
||||||
candidate['last_heartbeat'] = now
|
candidate['last_heartbeat'] = now
|
||||||
@ -246,7 +241,7 @@ class ServiceRegistry(object):
|
|||||||
lambda row: r.branch(
|
lambda row: r.branch(
|
||||||
r.branch(
|
r.branch(
|
||||||
row,
|
row,
|
||||||
row['last_heartbeat'] > now - row['heartbeat_interval'] * 3,
|
row['last_heartbeat'] > now - row['ttl'],
|
||||||
False),
|
False),
|
||||||
row, candidate),
|
row, candidate),
|
||||||
return_changes='always').run()
|
return_changes='always').run()
|
||||||
@ -259,7 +254,7 @@ class ServiceRegistry(object):
|
|||||||
|
|
||||||
results = list(self.rr.table(
|
results = list(self.rr.table(
|
||||||
'services', read_mode='majority').get_all(role).filter(
|
'services', read_mode='majority').get_all(role).filter(
|
||||||
lambda row: row['last_heartbeat'] > now - row['heartbeat_interval'] * 3).run())
|
lambda row: row['last_heartbeat'] > now - row['ttl']).run())
|
||||||
if results:
|
if results:
|
||||||
return results[0]
|
return results[0]
|
||||||
else:
|
else:
|
||||||
@ -269,8 +264,8 @@ class ServiceRegistry(object):
|
|||||||
'''
|
'''
|
||||||
Find least loaded healthy service in the registry.
|
Find least loaded healthy service in the registry.
|
||||||
|
|
||||||
A service is considered healthy if its 'last_heartbeat' is in the last
|
A service is considered healthy if its 'last_heartbeat' was less than
|
||||||
`3 * heartbeat_interval` seconds.
|
'ttl' seconds ago
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
role (str): role name
|
role (str): role name
|
||||||
@ -281,7 +276,7 @@ class ServiceRegistry(object):
|
|||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
result = self.rr.table('services').filter({"role":role}).filter(
|
result = self.rr.table('services').filter({"role":role}).filter(
|
||||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"]
|
lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"]
|
||||||
).order_by("load")[0].run()
|
).order_by("load")[0].run()
|
||||||
return result
|
return result
|
||||||
except r.ReqlNonExistenceError:
|
except r.ReqlNonExistenceError:
|
||||||
@ -291,8 +286,8 @@ class ServiceRegistry(object):
|
|||||||
'''
|
'''
|
||||||
Look up healthy services in the registry.
|
Look up healthy services in the registry.
|
||||||
|
|
||||||
A service is considered healthy if its `last_heartbeat` is in the last
|
A service is considered healthy if its 'last_heartbeat' was less than
|
||||||
`3 * heartbeat_interval` seconds.
|
'ttl' seconds ago
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
role (str, optional): role name
|
role (str, optional): role name
|
||||||
@ -307,7 +302,7 @@ class ServiceRegistry(object):
|
|||||||
if role:
|
if role:
|
||||||
query = query.filter({"role":role})
|
query = query.filter({"role":role})
|
||||||
query = query.filter(
|
query = query.filter(
|
||||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
|
lambda svc: r.now().sub(svc["last_heartbeat"]) < svc["ttl"] #.default(20.0)
|
||||||
).order_by("load")
|
).order_by("load")
|
||||||
result = query.run()
|
result = query.run()
|
||||||
return result
|
return result
|
||||||
|
@ -55,17 +55,17 @@ def rr():
|
|||||||
def test_unique_service(rr):
|
def test_unique_service(rr):
|
||||||
svcreg = doublethink.ServiceRegistry(rr)
|
svcreg = doublethink.ServiceRegistry(rr)
|
||||||
assert svcreg.unique_service('example-role') == None
|
assert svcreg.unique_service('example-role') == None
|
||||||
# this raises an exception: no heartbeat_interval.
|
# this raises an exception: no ttl.
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.unique_service('example-role', candidate={})
|
svcreg.unique_service('example-role', candidate={})
|
||||||
svc01 = {
|
svc01 = {
|
||||||
"role": "example-role",
|
"role": "example-role",
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
"node": "test01.example.com"
|
"node": "test01.example.com"
|
||||||
}
|
}
|
||||||
svc02 = {
|
svc02 = {
|
||||||
"role": "example-role",
|
"role": "example-role",
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
"node": "test02.example.com"
|
"node": "test02.example.com"
|
||||||
}
|
}
|
||||||
# register svc01. output should be svc01.
|
# register svc01. output should be svc01.
|
||||||
@ -93,23 +93,23 @@ def test_service_registry(rr):
|
|||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"role":"foo","load":1})
|
svcreg.heartbeat({"role":"foo","load":1})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"role":"foo","heartbeat_interval":1.0})
|
svcreg.heartbeat({"role":"foo","ttl":1.0})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":1.0,"load":1})
|
svcreg.heartbeat({"ttl":1.0,"load":1})
|
||||||
|
|
||||||
# invalid heartbeat interval (we accept anything for load and role)
|
# invalid ttl (we accept anything for load and role)
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":-1,"role":"foo","load":1})
|
svcreg.heartbeat({"ttl":-1,"role":"foo","load":1})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":"strang","role":"foo","load":1})
|
svcreg.heartbeat({"ttl":"strang","role":"foo","load":1})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":[],"role":"foo","load":1})
|
svcreg.heartbeat({"ttl":[],"role":"foo","load":1})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":[1],"role":"foo","load":1})
|
svcreg.heartbeat({"ttl":[1],"role":"foo","load":1})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":{},"role":"foo","load":1})
|
svcreg.heartbeat({"ttl":{},"role":"foo","load":1})
|
||||||
with pytest.raises(Exception) as excinfo:
|
with pytest.raises(Exception) as excinfo:
|
||||||
svcreg.heartbeat({"heartbeat_interval":{1:2},"role":"foo","load":1})
|
svcreg.heartbeat({"ttl":{1:2},"role":"foo","load":1})
|
||||||
|
|
||||||
assert svcreg.available_service("yes-such-role") == None
|
assert svcreg.available_service("yes-such-role") == None
|
||||||
assert svcreg.available_services("yes-such-role") == []
|
assert svcreg.available_services("yes-such-role") == []
|
||||||
@ -117,12 +117,12 @@ def test_service_registry(rr):
|
|||||||
svc0 = {
|
svc0 = {
|
||||||
"role": "yes-such-role",
|
"role": "yes-such-role",
|
||||||
"load": 100.0,
|
"load": 100.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc1 = {
|
svc1 = {
|
||||||
"role": "yes-such-role",
|
"role": "yes-such-role",
|
||||||
"load": 200.0,
|
"load": 200.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc0 = svcreg.heartbeat(svc0)
|
svc0 = svcreg.heartbeat(svc0)
|
||||||
svc1 = svcreg.heartbeat(svc1)
|
svc1 = svcreg.heartbeat(svc1)
|
||||||
@ -188,12 +188,12 @@ def test_service_registry(rr):
|
|||||||
svc0 = {
|
svc0 = {
|
||||||
"role": "yes-such-role",
|
"role": "yes-such-role",
|
||||||
"load": 100.0,
|
"load": 100.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc1 = {
|
svc1 = {
|
||||||
"role": "yes-such-role",
|
"role": "yes-such-role",
|
||||||
"load": 200.0,
|
"load": 200.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc0 = svcreg.heartbeat(svc0)
|
svc0 = svcreg.heartbeat(svc0)
|
||||||
svc1 = svcreg.heartbeat(svc1)
|
svc1 = svcreg.heartbeat(svc1)
|
||||||
@ -205,22 +205,22 @@ def test_service_registry(rr):
|
|||||||
svc0 = {
|
svc0 = {
|
||||||
"role": "yes-such-role",
|
"role": "yes-such-role",
|
||||||
"load": 100.0,
|
"load": 100.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc1 = {
|
svc1 = {
|
||||||
"role": "yes-such-role",
|
"role": "yes-such-role",
|
||||||
"load": 200.0,
|
"load": 200.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc2 = {
|
svc2 = {
|
||||||
"role": "another-such-role",
|
"role": "another-such-role",
|
||||||
"load": 200.0,
|
"load": 200.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc3 = {
|
svc3 = {
|
||||||
"role": "yet-another-such-role",
|
"role": "yet-another-such-role",
|
||||||
"load": 200.0,
|
"load": 200.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
svc0 = svcreg.heartbeat(svc0)
|
svc0 = svcreg.heartbeat(svc0)
|
||||||
svc1 = svcreg.heartbeat(svc1)
|
svc1 = svcreg.heartbeat(svc1)
|
||||||
@ -245,7 +245,7 @@ def test_svcreg_heartbeat_server_down(rr):
|
|||||||
svc0 = {
|
svc0 = {
|
||||||
"role": "role-foo",
|
"role": "role-foo",
|
||||||
"load": 100.0,
|
"load": 100.0,
|
||||||
"heartbeat_interval": 0.4,
|
"ttl": 1.2,
|
||||||
}
|
}
|
||||||
# no exception thrown
|
# no exception thrown
|
||||||
svc0 = svcreg.heartbeat(svc0)
|
svc0 = svcreg.heartbeat(svc0)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user