mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
catch a failed service registry heartbeat, with unit test
This commit is contained in:
parent
2e76c1e570
commit
82faefde56
@ -94,7 +94,9 @@ class Rethinker(object):
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
except Exception as e:
|
||||
self.logger.error('will keep trying to get a connection after failure connecting to %s: %s', server, e)
|
||||
self.logger.error(
|
||||
'will keep trying after failure connecting to '
|
||||
'rethinkdb server at %s: %s', server, e)
|
||||
time.sleep(0.5)
|
||||
|
||||
def wrap(self, delegate):
|
||||
@ -147,18 +149,26 @@ class ServiceRegistry(object):
|
||||
# self.r.table('sites').index_create...?
|
||||
|
||||
def heartbeat(self, status_info):
|
||||
'''Returns id of rethinkdb document (generated by rethinkdb on first insert)'''
|
||||
status_info['last_heartbeat'] = r.now()
|
||||
if not 'first_heartbeat' in status_info:
|
||||
status_info['first_heartbeat'] = status_info['last_heartbeat']
|
||||
status_info['last_heartbeat'] = r.now()
|
||||
if not 'host' in status_info:
|
||||
status_info['host'] = socket.gethostname()
|
||||
if not 'pid' in status_info:
|
||||
status_info['pid'] = os.getpid()
|
||||
result = self.r.table('services').insert(status_info, conflict='replace', return_changes=True).run()
|
||||
# XXX check
|
||||
return result['changes'][0]['new_val']
|
||||
'''
|
||||
Returns updated status info on success, un-updated status info on
|
||||
failure.
|
||||
'''
|
||||
updated_status_info = dict(status_info)
|
||||
updated_status_info['last_heartbeat'] = r.now()
|
||||
if not 'first_heartbeat' in updated_status_info:
|
||||
updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat']
|
||||
if not 'host' in updated_status_info:
|
||||
updated_status_info['host'] = socket.gethostname()
|
||||
if not 'pid' in updated_status_info:
|
||||
updated_status_info['pid'] = os.getpid()
|
||||
try:
|
||||
result = self.r.table('services').insert(
|
||||
updated_status_info, conflict='replace',
|
||||
return_changes=True).run()
|
||||
return result['changes'][0]['new_val'] # XXX check
|
||||
except:
|
||||
self.logger.error('error updating service registry', exc_info=True)
|
||||
return status_info
|
||||
|
||||
def unregister(self, id):
|
||||
result = self.r.table('services').get(id).delete().run()
|
||||
|
@ -213,6 +213,34 @@ def test_service_registry(r):
|
||||
assert len(svcreg.available_services("yes-such-role")) == 2
|
||||
assert len(svcreg.available_services()) == 4
|
||||
|
||||
def test_svcreg_heartbeat_server_down(r):
|
||||
class MockRethinker:
|
||||
def table(self, *args, **kwargs):
|
||||
raise Exception('catch me if you can')
|
||||
|
||||
class SortOfFakeServiceRegistry(rethinkstuff.ServiceRegistry):
|
||||
def __init__(self, rethinker):
|
||||
self.r = rethinker
|
||||
# self._ensure_table() # not doing this here
|
||||
|
||||
# no such rethinkdb server
|
||||
r = MockRethinker()
|
||||
svcreg = SortOfFakeServiceRegistry(r)
|
||||
svc0 = {
|
||||
"role": "role-foo",
|
||||
"load": 100.0,
|
||||
"heartbeat_interval": 0.4,
|
||||
}
|
||||
# no exception thrown
|
||||
svc0 = svcreg.heartbeat(svc0)
|
||||
|
||||
# check that status_info was *not* updated
|
||||
assert not 'id' in svc0
|
||||
assert not 'last_heartbeat' in svc0
|
||||
assert not 'first_heartbeat' in svc0
|
||||
assert not 'host' in svc0
|
||||
assert not 'pid' in svc0
|
||||
|
||||
def test_utcnow():
|
||||
now_notz = datetime.datetime.utcnow() # has no timezone :(
|
||||
assert not now_notz.tzinfo
|
||||
|
Loading…
x
Reference in New Issue
Block a user