From 5570ca9241043e91e68c10eff0b25c03a9bc8030 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 30 Oct 2015 19:50:33 +0000 Subject: [PATCH] service registry, with unit tests! --- rethinkstuff/__init__.py | 74 ++++++++++++++++++++++++++++++++++++---- tests/test_rethinker.py | 49 ++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 7 deletions(-) diff --git a/rethinkstuff/__init__.py b/rethinkstuff/__init__.py index 538fd37..ab8f540 100644 --- a/rethinkstuff/__init__.py +++ b/rethinkstuff/__init__.py @@ -4,6 +4,7 @@ import logging import random import time import types +import socket class RethinkerWrapper(object): logger = logging.getLogger('rethinkstuff.RethinkerWrapper') @@ -19,7 +20,7 @@ class RethinkerWrapper(object): return self.rethinker.wrap(self.wrapped.__getitem__)(key) def __repr__(self): - return "".format(repr(self.wrapped)) + return ''.format(repr(self.wrapped)) def run(self, db=None): self.wrapped.run # raise AttributeError early @@ -28,7 +29,7 @@ class RethinkerWrapper(object): is_iter = False try: result = self.wrapped.run(conn, db=db or self.rethinker.dbname) - if hasattr(result, "__next__"): + if hasattr(result, '__next__'): is_iter = True def gen(): try: @@ -54,14 +55,17 @@ class RethinkerWrapper(object): conn.close(noreply_wait=False) class Rethinker(object): - """ - >>> r = Rethinker(db="my_db") - >>> doc = r.table("my_table").get(1).run() - """ + ''' + >>> r = Rethinker(db='my_db') + >>> doc = r.table('my_table').get(1).run() + ''' logger = logging.getLogger('rethinkstuff.Rethinker') def __init__(self, servers=['localhost'], db=None): - self.servers = servers + if isinstance(servers, str): + self.servers = [servers] + else: + self.servers = servers self.dbname = db # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py @@ -95,3 +99,59 @@ class Rethinker(object): delegate = getattr(r, name) return self.wrap(delegate) +class ServiceRegistry(object): + ''' + status_info is dict, should have at least these fields + { + 'id': ..., # generated by rethinkdb + 'role': 'brozzler-worker', + 'load': 0.5, # load score + 'heartbeat_interval': 20.0, + 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback + 'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat() + ... plus anything else you want... + } + ''' + + logger = logging.getLogger('rethinkstuff.ServiceRegistry') + + def __init__(self, rethinker): + self.r = rethinker + self._ensure_table() + + def _ensure_table(self): + dbs = self.r.db_list().run() + if not self.r.dbname in dbs: + self.logger.info('creating rethinkdb database %s', repr(self.r.dbname)) + self.r.db_create(self.r.dbname).run() + tables = self.r.table_list().run() + if not 'services' in tables: + self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname)) + self.r.table_create('services', shards=1, replicas=min(len(self.r.servers), 1)).run() + # self.r.table('sites').index_create...? + + def heartbeat(self, status_info): + '''Returns id of rethinkdb document (generated by rethinkdb on first insert)''' + status_info['last_heartbeat'] = r.now() + if not 'host' in status_info: + status_info['host'] = socket.gethostname() + result = self.r.table('services').insert(status_info, conflict='replace').run() + if 'generated_keys' in result: + return result['generated_keys'][0] + else: + return status_info['id'] + + def unregister(self, id): + result = self.r.table('services').get(id).delete().run() + if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}: + self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result) + + def available_service(self, role): + try: + result = self.r.table('services').filter({"role":role}).filter( + lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) + ).order_by("load")[0].run() + return result + except r.ReqlNonExistenceError: + return None + diff --git a/tests/test_rethinker.py b/tests/test_rethinker.py index 3648b38..22dd378 100644 --- a/tests/test_rethinker.py +++ b/tests/test_rethinker.py @@ -5,6 +5,7 @@ import types import gc import pytest import rethinkdb +import time logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") @@ -88,3 +89,51 @@ def test_slice(r, my_table): # connection should be closed after finished iterating over results assert not r.last_conn.is_open() assert n == 5 + +def test_svcreg(r): + # import pdb; pdb.set_trace() + svcreg = rethinkstuff.ServiceRegistry(r) + assert svcreg.available_service("yes-such-role") == None + svc0 = { + "role": "yes-such-role", + "load": 100.0, + "heartbeat_interval": 0.2, + } + svc1 = { + "role": "yes-such-role", + "load": 200.0, + "heartbeat_interval": 0.2, + } + svc0["id"] = svcreg.heartbeat(svc0) + svc1["id"] = svcreg.heartbeat(svc1) + assert svc0["id"] is not None + assert svc1["id"] is not None + assert svc0["id"] != svc1["id"] + time.sleep(0.1) + assert svcreg.available_service("no-such-role") == None + assert svcreg.available_service("yes-such-role")["id"] == svc0["id"] + + svc1["load"] = 50.0 + svcreg.heartbeat(svc1) + time.sleep(0.1) + assert svcreg.available_service("no-such-role") == None + assert svcreg.available_service("yes-such-role")["id"] == svc1["id"] + + svc1["load"] = 200.0 + svcreg.heartbeat(svc1) + time.sleep(0.1) + assert svcreg.available_service("no-such-role") == None + assert svcreg.available_service("yes-such-role")["id"] == svc0["id"] + svcreg.heartbeat(svc1) + time.sleep(0.1) + + svcreg.heartbeat(svc1) + time.sleep(0.4) + assert svcreg.available_service("no-such-role") == None + assert svcreg.available_service("yes-such-role")["id"] == svc1["id"] + + svcreg.unregister(svc1["id"]) + time.sleep(0.1) + assert svcreg.available_service("no-such-role") == None + assert svcreg.available_service("yes-such-role") == None +