diff --git a/rethinkstuff/__init__.py b/rethinkstuff/__init__.py index 8be31a7..a4d5488 100644 --- a/rethinkstuff/__init__.py +++ b/rethinkstuff/__init__.py @@ -94,7 +94,9 @@ class Rethinker(object): except ValueError: return r.connect(host=server) except Exception as e: - self.logger.error('will keep trying to get a connection after failure connecting to %s: %s', server, e) + self.logger.error( + 'will keep trying after failure connecting to ' + 'rethinkdb server at %s: %s', server, e) time.sleep(0.5) def wrap(self, delegate): @@ -147,18 +149,26 @@ class ServiceRegistry(object): # self.r.table('sites').index_create...? def heartbeat(self, status_info): - '''Returns id of rethinkdb document (generated by rethinkdb on first insert)''' - status_info['last_heartbeat'] = r.now() - if not 'first_heartbeat' in status_info: - status_info['first_heartbeat'] = status_info['last_heartbeat'] - status_info['last_heartbeat'] = r.now() - if not 'host' in status_info: - status_info['host'] = socket.gethostname() - if not 'pid' in status_info: - status_info['pid'] = os.getpid() - result = self.r.table('services').insert(status_info, conflict='replace', return_changes=True).run() - # XXX check - return result['changes'][0]['new_val'] + ''' + Returns updated status info on success, un-updated status info on + failure. + ''' + updated_status_info = dict(status_info) + updated_status_info['last_heartbeat'] = r.now() + if not 'first_heartbeat' in updated_status_info: + updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat'] + if not 'host' in updated_status_info: + updated_status_info['host'] = socket.gethostname() + if not 'pid' in updated_status_info: + updated_status_info['pid'] = os.getpid() + try: + result = self.r.table('services').insert( + updated_status_info, conflict='replace', + return_changes=True).run() + return result['changes'][0]['new_val'] # XXX check + except: + self.logger.error('error updating service registry', exc_info=True) + return status_info def unregister(self, id): result = self.r.table('services').get(id).delete().run() diff --git a/tests/test_rethinker.py b/tests/test_rethinker.py index 4ca224b..be5b15d 100644 --- a/tests/test_rethinker.py +++ b/tests/test_rethinker.py @@ -213,6 +213,34 @@ def test_service_registry(r): assert len(svcreg.available_services("yes-such-role")) == 2 assert len(svcreg.available_services()) == 4 +def test_svcreg_heartbeat_server_down(r): + class MockRethinker: + def table(self, *args, **kwargs): + raise Exception('catch me if you can') + + class SortOfFakeServiceRegistry(rethinkstuff.ServiceRegistry): + def __init__(self, rethinker): + self.r = rethinker + # self._ensure_table() # not doing this here + + # no such rethinkdb server + r = MockRethinker() + svcreg = SortOfFakeServiceRegistry(r) + svc0 = { + "role": "role-foo", + "load": 100.0, + "heartbeat_interval": 0.4, + } + # no exception thrown + svc0 = svcreg.heartbeat(svc0) + + # check that status_info was *not* updated + assert not 'id' in svc0 + assert not 'last_heartbeat' in svc0 + assert not 'first_heartbeat' in svc0 + assert not 'host' in svc0 + assert not 'pid' in svc0 + def test_utcnow(): now_notz = datetime.datetime.utcnow() # has no timezone :( assert not now_notz.tzinfo