From dcccc3fa234d117525e22fb104308c934209906c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 28 Apr 2017 15:23:55 -0700 Subject: [PATCH] lots of documentation on the service registry --- doublethink/services.py | 166 ++++++++++++++++++++++++++++++++++------ setup.py | 2 +- 2 files changed, 142 insertions(+), 26 deletions(-) diff --git a/doublethink/services.py b/doublethink/services.py index 7f80415..c710386 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -23,46 +23,125 @@ import os class ServiceRegistry(object): ''' - status_info is dict, should have at least these fields - { - 'id': ..., # generated by rethinkdb - 'role': 'brozzler-worker', - 'load': 0.5, # load score - 'heartbeat_interval': 20.0, - 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback - 'pid': 1234, # set in svcreg.heartbeat() as a fallback - 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() - 'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat() - ... plus anything else you want... - } - ''' + Simple service registry which stores service information in the rethinkdb + table 'services'. + Services are responsible for keeping their status information up to date + by calling `heartbeat(status_info)` periodically. + + `status_info` is a dict and must have at least the fields 'role', 'load', + and 'heartbeat_interval'. Certain other fields are populated automatically + as in the example below. In addition, services may set arbitrary other + fields. + + Some information about required fields: + + 'role': The role of the service. `healthy_service()` and + `healthy_services()` look up services using this field. + 'heartbeat_interval': Specifies the expected time between heartbeats. If + a service's last heartbeat was more than `3 * heartbeat_interval` + seconds ago, it is considered to be "down". `healthy_services()` + and `healthy_service()` never return entries for services that are + considered "down". + 'load': An arbitrary numeric value. It is up to each service to populate + this field in a way that makes sense to the particular service. + `healthy_service(role)` returns the service with the lowest load + for the supplied role. Thus load values need to be comparable to + within the context of a single service, but comparing loads of + services of different roles does not necessarily make any sense. + + About the 'id' field: + + The only way that the service registry uniquely identifies a particular + instance of a service is using the 'id' field. + + Services can supply their own 'id', or let rethinkdb generate a random + one. + + If a service provides its own 'id', it should make it something + predictable and unique to each instance of the service. For example + `'%s:%s:%s' % (role, host, port)` might work for some services. + + If, on the other hand, a server lets rethinkdb generate 'id', it will + need to remember the result returned by calls to `heartbeat()` and + supply the `id` value from there with every subsequent heartbeat. + + Example service registry entry, with notes: + + { + 'id': 'd0bed0be-d000-d000-f00d-abeefface0ff' # generated by rethinkdb if not supplied + 'role': 'brozzler-worker', + 'load': 0.5, # load score + 'heartbeat_interval': 20.0, + 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback + 'pid': 1234, # set in svcreg.heartbeat() as a fallback + 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() + 'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat() + # ... plus anything else you want... + } + ''' logger = logging.getLogger('doublethink.ServiceRegistry') def __init__(self, rr): + ''' + Initialize the service registry. + + Creates the database table if it does not exist. + + Args: + rr (doublethink.Rethinker): a doublethink.Rethinker, which must + have `dbname` set + ''' self.rr = rr self._ensure_table() def _ensure_table(self): dbs = self.rr.db_list().run() + assert self.rr.dbname if not self.rr.dbname in dbs: - self.logger.info('creating rethinkdb database %s', repr(self.rr.dbname)) + self.logger.info( + 'creating rethinkdb database %s', repr(self.rr.dbname)) self.rr.db_create(self.rr.dbname).run() tables = self.rr.table_list().run() if not 'services' in tables: - self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.rr.dbname)) - self.rr.table_create('services', shards=1, replicas=min(3, len(self.rr.servers))).run() - # self.rr.table('sites').index_create...? + self.logger.info( + "creating rethinkdb table 'services' in database %s", + repr(self.rr.dbname)) + self.rr.table_create( + 'services', shards=1, + replicas=min(3, len(self.rr.servers))).run() + # self.rr.table('services').index_create...? def heartbeat(self, status_info): ''' - Returns updated status info on success, un-updated status info on - failure. + Update service status, indicating "up"-ness. + + Args: + status_info (dict): a dictionary representing the status of the + service + + `status_info` must have at least the fields 'role', 'load', and + 'heartbeat_interval'. Some additional fields are populated + automatically by this method. If the field 'id' is absent, it will be + generated by rethinkdb. + + See the ServiceRegistry class-level documentation for more information + about the various fields. + + Returns: + On success, returns the modified status info dict. On failure + communicating with rethinkdb, returns `status_info` unmodified. + + Raises: + Exception: if `status_info` is missing a required field, or a + `status_info['heartbeat_interval']` is not a number greater + than zero ''' for field in 'role', 'heartbeat_interval', 'load': if not field in status_info: raise Exception( - 'status_info is missing required field %s', field) + 'status_info is missing required field %s', + repr(field)) val = status_info['heartbeat_interval'] if not (isinstance(val, float) or isinstance(val, int)) or val <= 0: raise Exception('heartbeat_interval must be a number > 0') @@ -84,9 +163,16 @@ class ServiceRegistry(object): return status_info def unregister(self, id): + ''' + Remove the service with id `id` from the 'services' table. + ''' result = self.rr.table('services').get(id).delete().run() - if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}: - self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result) + if result != { + 'deleted':1, 'errors':0,'inserted':0, + 'replaced':0,'skipped':0,'unchanged':0}: + self.logger.warn( + 'unexpected result attempting to delete id=%s from ' + 'rethinkdb services table: %s', id, result) def leader(self, role_name, default=None): ''' @@ -108,16 +194,43 @@ class ServiceRegistry(object): self.rr.table('services', read_mode='majority').get(role_name).replace(lambda row: r.branch(r.branch(row, row['last_heartbeat'] > r.now() - row['heartbeat_interval'] * 3, False), row, default)).run() return self.rr.table('services', read_mode='majority').get(role_name).run() - def available_service(self, role): + def healthy_service(self, role): + ''' + Find least loaded healthy service in the registry. + + A service is considered healthy if its 'last_heartbeat' is in the last + `3 * heartbeat_interval` seconds. + + Args: + role (str): role name + + Returns: + the healthy service with the supplied `role` with the smallest + value of 'load' + ''' try: result = self.rr.table('services').filter({"role":role}).filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) + lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] ).order_by("load")[0].run() return result except r.ReqlNonExistenceError: return None - def available_services(self, role=None): + def healthy_services(self, role=None): + ''' + Look up healthy services in the registry. + + A service is considered healthy if its `last_heartbeat` is in the last + `3 * heartbeat_interval` seconds. + + Args: + role (str, optional): role name + + Returns: + If `role` is supplied, returns list of healthy services for the + given role, otherwise returns list of all healthy services. May + return an empty list. + ''' try: query = self.rr.table('services') if role: @@ -130,3 +243,6 @@ class ServiceRegistry(object): except r.ReqlNonExistenceError: return [] + available_service = healthy_service + available_services = healthy_services + diff --git a/setup.py b/setup.py index 47f9fc0..d740c4d 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ import codecs setuptools.setup( name='doublethink', - version='0.2.0.dev74', + version='0.2.0.dev75', packages=['doublethink'], classifiers=[ 'Programming Language :: Python :: 2.7',