From 6d667c77b55a435cbf856c1a015660811faee3c8 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Tue, 21 Feb 2017 16:27:49 -0800
Subject: [PATCH] split into multiple source files

---
 rethinkstuff/__init__.py  | 392 +-------------------------------------
 rethinkstuff/orm.py       | 220 +++++++++++++++++++++
 rethinkstuff/rethinker.py | 119 ++++++++++++
 rethinkstuff/services.py  | 105 ++++++++++
 setup.py                  |   2 +-
 5 files changed, 453 insertions(+), 385 deletions(-)
 create mode 100644 rethinkstuff/orm.py
 create mode 100644 rethinkstuff/rethinker.py
 create mode 100644 rethinkstuff/services.py

diff --git a/rethinkstuff/__init__.py b/rethinkstuff/__init__.py
index 3765b76..cd93753 100644
--- a/rethinkstuff/__init__.py
+++ b/rethinkstuff/__init__.py
@@ -17,19 +17,19 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import rethinkdb as r
-import logging
-import random
-import time
-import types
-import socket
-import os
+import rethinkdb
 import datetime
 
+from rethinkstuff.orm import Document
+from rethinkstuff.rethinker import Rethinker
+from rethinkstuff.services import ServiceRegistry
+
+__all__ = ['Document', 'Rethinker', 'ServiceRegistry', 'UTC', 'utcnow']
+
 try:
     UTC = datetime.timezone.utc
 except:
-    UTC = r.make_timezone("00:00")
+    UTC = rethinkdb.make_timezone("00:00")
 
 def utcnow():
     """Convenience function to get timezone-aware UTC datetime. RethinkDB
@@ -38,379 +38,3 @@ def utcnow():
     2 doesn't come with a timezone implementation."""
     return datetime.datetime.now(UTC)
 
-class RethinkerWrapper(object):
-    logger = logging.getLogger('rethinkstuff.RethinkerWrapper')
-    def __init__(self, rethinker, wrapped):
-        self.rethinker = rethinker
-        self.wrapped = wrapped
-
-    def __getattr__(self, name):
-        delegate = getattr(self.wrapped, name)
-        return self.rethinker.wrap(delegate)
-
-    def __getitem__(self, key):
-        return self.rethinker.wrap(self.wrapped.__getitem__)(key)
-
-    def __repr__(self):
-        return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
-
-    def run(self, db=None):
-        self.wrapped.run  # raise AttributeError early
-        while True:
-            conn = self.rethinker._random_server_connection()
-            is_iter = False
-            try:
-                result = self.wrapped.run(conn, db=db or self.rethinker.dbname)
-                if hasattr(result, '__next__'):
-                    is_iter = True
-                    def gen():
-                        try:
-                            yield  # empty yield, see comment below
-                            for x in result:
-                                yield x
-                        finally:
-                            result.close()
-                            conn.close()
-                    g = gen()
-                    # Start executing the generator, leaving off after the
-                    # empty yield. If we didn't do this, and the caller never
-                    # started the generator, the finally block would never run
-                    # and the connection would stay open.
-                    next(g)
-                    return g
-                else:
-                    return result
-            except r.ReqlTimeoutError as e:
-                time.sleep(0.5)
-            finally:
-                if not is_iter:
-                    conn.close(noreply_wait=False)
-
-class Rethinker(object):
-    '''
-    >>> r = Rethinker(db='my_db')
-    >>> doc = r.table('my_table').get(1).run()
-    '''
-    logger = logging.getLogger('rethinkstuff.Rethinker')
-
-    def __init__(self, servers=['localhost'], db=None):
-        if isinstance(servers, str):
-            self.servers = [servers]
-        else:
-            self.servers = servers
-        self.dbname = db
-
-    # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
-    # "Best practices: Managing connections: a connection per request"
-    def _random_server_connection(self):
-        while True:
-            server = random.choice(self.servers)
-            try:
-                try:
-                    host, port = server.split(':')
-                    return r.connect(host=host, port=port)
-                except ValueError:
-                    return r.connect(host=server)
-            except Exception as e:
-                self.logger.error(
-                        'will keep trying after failure connecting to '
-                        'rethinkdb server at %s: %s', server, e)
-                time.sleep(0.5)
-
-    def wrap(self, delegate):
-        if isinstance(delegate, (types.FunctionType, types.MethodType)):
-            def wrapper(*args, **kwargs):
-                result = delegate(*args, **kwargs)
-                if result is not None:
-                    return RethinkerWrapper(self, result)
-                else:
-                    return None
-            return wrapper
-        else:
-            return delegate
-
-    def __getattr__(self, name):
-        delegate = getattr(r, name)
-        return self.wrap(delegate)
-
-class ServiceRegistry(object):
-    '''
-    status_info is dict, should have at least these fields
-    {
-        'id': ...,   # generated by rethinkdb
-        'role': 'brozzler-worker',
-        'load': 0.5, # load score
-        'heartbeat_interval': 20.0,
-        'host': 'wbgrp-svc999.us.archive.org',           # set in svcreg.heartbeat() as a fallback
-        'pid': 1234,                                     # set in svcreg.heartbeat() as a fallback
-        'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
-        'last_heartbeat': '2015-10-30T05:54:35.422866',  # set in svcreg.heartbeat()
-        ... plus anything else you want...
-    }
-    '''
-
-    logger = logging.getLogger('rethinkstuff.ServiceRegistry')
-
-    def __init__(self, rethinker):
-        self.r = rethinker
-        self._ensure_table()
-
-    def _ensure_table(self):
-        dbs = self.r.db_list().run()
-        if not self.r.dbname in dbs:
-            self.logger.info('creating rethinkdb database %s', repr(self.r.dbname))
-            self.r.db_create(self.r.dbname).run()
-        tables = self.r.table_list().run()
-        if not 'services' in tables:
-            self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname))
-            self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run()
-            # self.r.table('sites').index_create...?
-
-    def heartbeat(self, status_info):
-        '''
-        Returns updated status info on success, un-updated status info on
-        failure.
-        '''
-        updated_status_info = dict(status_info)
-        updated_status_info['last_heartbeat'] = r.now()
-        if not 'first_heartbeat' in updated_status_info:
-            updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat']
-        if not 'host' in updated_status_info:
-            updated_status_info['host'] = socket.gethostname()
-        if not 'pid' in updated_status_info:
-            updated_status_info['pid'] = os.getpid()
-        try:
-            result = self.r.table('services').insert(
-                    updated_status_info, conflict='replace',
-                    return_changes=True).run()
-            return result['changes'][0]['new_val'] # XXX check
-        except:
-            self.logger.error('error updating service registry', exc_info=True)
-            return status_info
-
-    def unregister(self, id):
-        result = self.r.table('services').get(id).delete().run()
-        if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
-            self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
-
-    def available_service(self, role):
-        try:
-            result = self.r.table('services').filter({"role":role}).filter(
-                lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"]   #.default(20.0)
-            ).order_by("load")[0].run()
-            return result
-        except r.ReqlNonExistenceError:
-            return None
-
-    def available_services(self, role=None):
-        try:
-            query = self.r.table('services')
-            if role:
-                query = query.filter({"role":role})
-            query = query.filter(
-                lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"]   #.default(20.0)
-            ).order_by("load")
-            result = query.run()
-            return result
-        except r.ReqlNonExistenceError:
-            return []
-
-class WatchedDict(dict):
-    def __init__(self, d, callback, field):
-        self.callback = callback
-        self.field = field
-        for key in d:
-            dict.__setitem__(self, key, watch(
-                d[key], callback=self.callback, field=self.field))
-
-    def __setitem__(self, key, value):
-        self.callback(self.field)
-        return dict.__setitem__(self, key, watch(
-            value, callback=self.callback, field=self.field))
-
-    def __delitem__(self, key):
-        self.callback(self.field)
-        return dict.__delitem__(self, key)
-
-    def clear(self):
-        self.callback(self.field)
-        return dict.clear(self)
-
-    def pop(self, *args):
-        self.callback(self.field)
-        return dict.pop(self, *args)
-
-    def popitem(self):
-        self.callback(self.field)
-        return dict.popitem()
-
-    def setdefault(self, *args):
-        self.callback(self.field)
-        if len(args) == 2:
-            return dict.setdefault(self, args[0], watch(
-                args[1], callback=self.callback, field=self.field))
-        else:
-            return dict.setdefault(self, *args)
-
-    def update(self, *args, **kwargs):
-        # looks a little tricky
-        raise Exception('not implemented')
-
-class WatchedList(list):
-    def __init__(self, l, callback, field):
-        self.callback = callback
-        self.field = field
-        for item in l:
-            list.append(self, watch(item, callback=callback, field=self.field))
-
-    def __setitem__(self, index, value):
-        self.callback(self.field)
-        return list.__setitem__(self, index, watch(
-            value, callback=self.callback, field=self.field))
-
-    def __delitem__(self, index):
-        self.callback(self.field)
-        return list.__delitem__(self, index)
-
-    def append(self, value):
-        self.callback(self.field)
-        return list.append(self, watch(
-            value, callback=self.callback, field=self.field))
-
-    def extend(self, value):
-        self.callback(self.field)
-        return list.extend(self, watch(
-            list(value), callback=self.callback, field=self.field))
-
-    def insert(self, index, value):
-        self.callback(self.field)
-        return list.insert(self, index, watch(
-            value, callback=self.callback, field=self.field))
-
-    def remove(self, value):
-        self.callback(self.field)
-        return list.remove(self, value)
-
-    def pop(self, index=-1):
-        self.callback(self.field)
-        return list.pop(self, index)
-
-    def clear(self):
-        self.callback(self.field)
-        return list.clear(self)
-
-    def sort(self, key=None, reverse=False):
-        self.callback(self.field)
-        return list.sort(self, key, reverse)
-
-    def reverse(self):
-        self.callback(self.field)
-        return list.reverse(self)
-
-def watch(obj, callback, field):
-    if isinstance(obj, dict):
-        return WatchedDict(obj, callback, field)
-    elif isinstance(obj, list):
-        return WatchedList(obj, callback, field)
-    else:
-        return obj
-
-class Document(dict, object):
-    '''
-    Base class for documents in rethinkdb.
-
-    You should subclass this class for each of your rethinkdb tables. You can
-    add custom functionality in your subclass if appropriate.
-
-    This class keeps track of changes made to the object and any nested fields.
-    After you have made some changes, call update() to persist them to the
-    database.
-
-    Changes in nested fields result in updates to their first-level ancestor
-    field. For example, if your document starts as {'a': {'b': 'c'}}, then
-    you run d['a']['x'] = 'y', then the update will replace the whole 'a'
-    field. Nested field updates get too complicated any other way.
-
-    The primary key must be `id`, the rethinkdb default. (XXX we could find out
-    what the primary key is from the "table_config" system table.)
-    '''
-    def __init__(self, rethinker, d={}):
-        dict.__setattr__(self, '_r', rethinker)
-        for k in d:
-            dict.__setitem__(
-                    self, k, watch(d[k], callback=self._updated, field=k))
-        self._clear_updates()
-
-    def _clear_updates(self):
-        dict.__setattr__(self, '_updates', {})
-        dict.__setattr__(self, '_deletes', set())
-
-    def __setitem__(self, key, value):
-        dict.__setitem__(
-                self, key, watch(value, callback=self._updated, field=key))
-        self._updated(key)
-
-    __setattr__ = __setitem__
-    __getattr__ = dict.__getitem__
-
-    def __delitem__(self, key):
-        dict.__delitem__(self, key)
-        self._deletes.add(key)
-        if key in self._updates:
-            del self._updates[key]
-
-    # XXX do we need the other stuff like in WatchedDict?
-
-    def _updated(self, field):
-        # callback for all updates
-        self._updates[field] = self[field]
-        if field in self._deletes:
-            self._deletes.remove(field)
-
-    @property
-    def table(self):
-        '''
-        Name of the rethinkdb table.
-
-        Defaults to the name of the class, lowercased. Can be overridden.
-        '''
-        return self.__class__.__name__.lower()
-
-    def table_create(self):
-        '''
-        Creates the table.
-
-        Subclasses may want to override to do more things, such as creating
-        indexes.
-        '''
-        self._r.table_create(self.table).run()
-
-    def insert(self):
-        result = self._r.table(self.table).insert(self).run()
-        if 'generated_keys' in result:
-            dict.__setitem__(self, 'id', result['generated_keys'][0])
-        self._clear_updates()
-
-    def update(self):
-        # hmm, masks dict.update()
-        if self._updates:
-            # r.literal() to replace, not merge with, nested fields
-            updates = {
-                    field: r.literal(
-                        self._updates[field]) for field in self._updates}
-            self._r.table(self.table).get(self.id).update(updates).run()
-        if self._deletes:
-            self._r.table(self.table).replace(
-                    r.row.without(self._deletes)).run()
-        self._clear_updates()
-
-    def refresh(self):
-        '''
-        Refresh from the database.
-        '''
-        d = self._r.table(self.table).get(self.id).run()
-        for k in d:
-            dict.__setitem__(
-                    self, k, watch(d[k], callback=self._updated, field=k))
-
-
diff --git a/rethinkstuff/orm.py b/rethinkstuff/orm.py
new file mode 100644
index 0000000..7dd6eb7
--- /dev/null
+++ b/rethinkstuff/orm.py
@@ -0,0 +1,220 @@
+'''
+rethinkstuff/orm.py - rethinkdb ORM
+
+Copyright (C) 2017 Internet Archive
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import rethinkdb as r
+import logging
+
+class WatchedDict(dict):
+    def __init__(self, d, callback, field):
+        self.callback = callback
+        self.field = field
+        for key in d:
+            dict.__setitem__(self, key, watch(
+                d[key], callback=self.callback, field=self.field))
+
+    def __setitem__(self, key, value):
+        self.callback(self.field)
+        return dict.__setitem__(self, key, watch(
+            value, callback=self.callback, field=self.field))
+
+    def __delitem__(self, key):
+        self.callback(self.field)
+        return dict.__delitem__(self, key)
+
+    def clear(self):
+        self.callback(self.field)
+        return dict.clear(self)
+
+    def pop(self, *args):
+        self.callback(self.field)
+        return dict.pop(self, *args)
+
+    def popitem(self):
+        self.callback(self.field)
+        return dict.popitem()
+
+    def setdefault(self, *args):
+        self.callback(self.field)
+        if len(args) == 2:
+            return dict.setdefault(self, args[0], watch(
+                args[1], callback=self.callback, field=self.field))
+        else:
+            return dict.setdefault(self, *args)
+
+    def update(self, *args, **kwargs):
+        # looks a little tricky
+        raise Exception('not implemented')
+
+class WatchedList(list):
+    def __init__(self, l, callback, field):
+        self.callback = callback
+        self.field = field
+        for item in l:
+            list.append(self, watch(item, callback=callback, field=self.field))
+
+    def __setitem__(self, index, value):
+        self.callback(self.field)
+        return list.__setitem__(self, index, watch(
+            value, callback=self.callback, field=self.field))
+
+    def __delitem__(self, index):
+        self.callback(self.field)
+        return list.__delitem__(self, index)
+
+    def append(self, value):
+        self.callback(self.field)
+        return list.append(self, watch(
+            value, callback=self.callback, field=self.field))
+
+    def extend(self, value):
+        self.callback(self.field)
+        return list.extend(self, watch(
+            list(value), callback=self.callback, field=self.field))
+
+    def insert(self, index, value):
+        self.callback(self.field)
+        return list.insert(self, index, watch(
+            value, callback=self.callback, field=self.field))
+
+    def remove(self, value):
+        self.callback(self.field)
+        return list.remove(self, value)
+
+    def pop(self, index=-1):
+        self.callback(self.field)
+        return list.pop(self, index)
+
+    def clear(self):
+        self.callback(self.field)
+        return list.clear(self)
+
+    def sort(self, key=None, reverse=False):
+        self.callback(self.field)
+        return list.sort(self, key, reverse)
+
+    def reverse(self):
+        self.callback(self.field)
+        return list.reverse(self)
+
+def watch(obj, callback, field):
+    if isinstance(obj, dict):
+        return WatchedDict(obj, callback, field)
+    elif isinstance(obj, list):
+        return WatchedList(obj, callback, field)
+    else:
+        return obj
+
+class Document(dict, object):
+    '''
+    Base class for ORM.
+
+    You should subclass this class for each of your rethinkdb tables. You can
+    add custom functionality in your subclass if appropriate.
+
+    This class keeps track of changes made to the object and any nested fields.
+    After you have made some changes, call update() to persist them to the
+    database.
+
+    Changes in nested fields result in updates to their first-level ancestor
+    field. For example, if your document starts as {'a': {'b': 'c'}}, then
+    you run d['a']['x'] = 'y', then the update will replace the whole 'a'
+    field. Nested field updates get too complicated any other way.
+
+    The primary key must be `id`, the rethinkdb default. (XXX we could find out
+    what the primary key is from the "table_config" system table.)
+    '''
+    def __init__(self, rethinker, d={}):
+        dict.__setattr__(self, '_r', rethinker)
+        for k in d:
+            dict.__setitem__(
+                    self, k, watch(d[k], callback=self._updated, field=k))
+        self._clear_updates()
+
+    def _clear_updates(self):
+        dict.__setattr__(self, '_updates', {})
+        dict.__setattr__(self, '_deletes', set())
+
+    def __setitem__(self, key, value):
+        dict.__setitem__(
+                self, key, watch(value, callback=self._updated, field=key))
+        self._updated(key)
+
+    __setattr__ = __setitem__
+    __getattr__ = dict.__getitem__
+
+    def __delitem__(self, key):
+        dict.__delitem__(self, key)
+        self._deletes.add(key)
+        if key in self._updates:
+            del self._updates[key]
+
+    # XXX do we need the other stuff like in WatchedDict?
+
+    def _updated(self, field):
+        # callback for all updates
+        self._updates[field] = self[field]
+        if field in self._deletes:
+            self._deletes.remove(field)
+
+    @property
+    def table(self):
+        '''
+        Name of the rethinkdb table.
+
+        Defaults to the name of the class, lowercased. Can be overridden.
+        '''
+        return self.__class__.__name__.lower()
+
+    def table_create(self):
+        '''
+        Creates the table.
+
+        Subclasses may want to override this method to do more things, such as
+        creating indexes.
+        '''
+        self._r.table_create(self.table).run()
+
+    def insert(self):
+        result = self._r.table(self.table).insert(self).run()
+        if 'generated_keys' in result:
+            dict.__setitem__(self, 'id', result['generated_keys'][0])
+        self._clear_updates()
+
+    def update(self):
+        # hmm, masks dict.update()
+        if self._updates:
+            # r.literal() to replace, not merge with, nested fields
+            updates = {
+                    field: r.literal(
+                        self._updates[field]) for field in self._updates}
+            self._r.table(self.table).get(self.id).update(updates).run()
+        if self._deletes:
+            self._r.table(self.table).replace(
+                    r.row.without(self._deletes)).run()
+        self._clear_updates()
+
+    def refresh(self):
+        '''
+        Refresh from the database.
+        '''
+        d = self._r.table(self.table).get(self.id).run()
+        for k in d:
+            dict.__setitem__(
+                    self, k, watch(d[k], callback=self._updated, field=k))
+
+
diff --git a/rethinkstuff/rethinker.py b/rethinkstuff/rethinker.py
new file mode 100644
index 0000000..ed8b8b5
--- /dev/null
+++ b/rethinkstuff/rethinker.py
@@ -0,0 +1,119 @@
+'''
+rethinkstuff/rethinker.py - rethinkdb connection-manager
+
+Copyright (C) 2015-2017 Internet Archive
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import rethinkdb as r
+import logging
+import random
+import time
+import types
+
+class RethinkerWrapper(object):
+    logger = logging.getLogger('rethinkstuff.RethinkerWrapper')
+    def __init__(self, rethinker, wrapped):
+        self.rethinker = rethinker
+        self.wrapped = wrapped
+
+    def __getattr__(self, name):
+        delegate = getattr(self.wrapped, name)
+        return self.rethinker.wrap(delegate)
+
+    def __getitem__(self, key):
+        return self.rethinker.wrap(self.wrapped.__getitem__)(key)
+
+    def __repr__(self):
+        return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
+
+    def run(self, db=None):
+        self.wrapped.run  # raise AttributeError early
+        while True:
+            conn = self.rethinker._random_server_connection()
+            is_iter = False
+            try:
+                result = self.wrapped.run(conn, db=db or self.rethinker.dbname)
+                if hasattr(result, '__next__'):
+                    is_iter = True
+                    def gen():
+                        try:
+                            yield  # empty yield, see comment below
+                            for x in result:
+                                yield x
+                        finally:
+                            result.close()
+                            conn.close()
+                    g = gen()
+                    # Start executing the generator, leaving off after the
+                    # empty yield. If we didn't do this, and the caller never
+                    # started the generator, the finally block would never run
+                    # and the connection would stay open.
+                    next(g)
+                    return g
+                else:
+                    return result
+            except r.ReqlTimeoutError as e:
+                time.sleep(0.5)
+            finally:
+                if not is_iter:
+                    conn.close(noreply_wait=False)
+
+class Rethinker(object):
+    '''
+    >>> r = Rethinker(db='my_db')
+    >>> doc = r.table('my_table').get(1).run()
+    '''
+    logger = logging.getLogger('rethinkstuff.Rethinker')
+
+    def __init__(self, servers=['localhost'], db=None):
+        if isinstance(servers, str):
+            self.servers = [servers]
+        else:
+            self.servers = servers
+        self.dbname = db
+
+    # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
+    # "Best practices: Managing connections: a connection per request"
+    def _random_server_connection(self):
+        while True:
+            server = random.choice(self.servers)
+            try:
+                try:
+                    host, port = server.split(':')
+                    return r.connect(host=host, port=port)
+                except ValueError:
+                    return r.connect(host=server)
+            except Exception as e:
+                self.logger.error(
+                        'will keep trying after failure connecting to '
+                        'rethinkdb server at %s: %s', server, e)
+                time.sleep(0.5)
+
+    def wrap(self, delegate):
+        if isinstance(delegate, (types.FunctionType, types.MethodType)):
+            def wrapper(*args, **kwargs):
+                result = delegate(*args, **kwargs)
+                if result is not None:
+                    return RethinkerWrapper(self, result)
+                else:
+                    return None
+            return wrapper
+        else:
+            return delegate
+
+    def __getattr__(self, name):
+        delegate = getattr(r, name)
+        return self.wrap(delegate)
+
diff --git a/rethinkstuff/services.py b/rethinkstuff/services.py
new file mode 100644
index 0000000..9f88a5f
--- /dev/null
+++ b/rethinkstuff/services.py
@@ -0,0 +1,105 @@
+'''
+rethinkstuff/services.py - rethinkdb service registry
+
+Copyright (C) 2015-2017 Internet Archive
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import rethinkdb as r
+import logging
+import socket
+import os
+
+class ServiceRegistry(object):
+    '''
+    status_info is dict, should have at least these fields
+    {
+        'id': ...,   # generated by rethinkdb
+        'role': 'brozzler-worker',
+        'load': 0.5, # load score
+        'heartbeat_interval': 20.0,
+        'host': 'wbgrp-svc999.us.archive.org',           # set in svcreg.heartbeat() as a fallback
+        'pid': 1234,                                     # set in svcreg.heartbeat() as a fallback
+        'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
+        'last_heartbeat': '2015-10-30T05:54:35.422866',  # set in svcreg.heartbeat()
+        ... plus anything else you want...
+    }
+    '''
+
+    logger = logging.getLogger('rethinkstuff.ServiceRegistry')
+
+    def __init__(self, rethinker):
+        self.r = rethinker
+        self._ensure_table()
+
+    def _ensure_table(self):
+        dbs = self.r.db_list().run()
+        if not self.r.dbname in dbs:
+            self.logger.info('creating rethinkdb database %s', repr(self.r.dbname))
+            self.r.db_create(self.r.dbname).run()
+        tables = self.r.table_list().run()
+        if not 'services' in tables:
+            self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname))
+            self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run()
+            # self.r.table('sites').index_create...?
+
+    def heartbeat(self, status_info):
+        '''
+        Returns updated status info on success, un-updated status info on
+        failure.
+        '''
+        updated_status_info = dict(status_info)
+        updated_status_info['last_heartbeat'] = r.now()
+        if not 'first_heartbeat' in updated_status_info:
+            updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat']
+        if not 'host' in updated_status_info:
+            updated_status_info['host'] = socket.gethostname()
+        if not 'pid' in updated_status_info:
+            updated_status_info['pid'] = os.getpid()
+        try:
+            result = self.r.table('services').insert(
+                    updated_status_info, conflict='replace',
+                    return_changes=True).run()
+            return result['changes'][0]['new_val'] # XXX check
+        except:
+            self.logger.error('error updating service registry', exc_info=True)
+            return status_info
+
+    def unregister(self, id):
+        result = self.r.table('services').get(id).delete().run()
+        if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
+            self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
+
+    def available_service(self, role):
+        try:
+            result = self.r.table('services').filter({"role":role}).filter(
+                lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"]   #.default(20.0)
+            ).order_by("load")[0].run()
+            return result
+        except r.ReqlNonExistenceError:
+            return None
+
+    def available_services(self, role=None):
+        try:
+            query = self.r.table('services')
+            if role:
+                query = query.filter({"role":role})
+            query = query.filter(
+                lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"]   #.default(20.0)
+            ).order_by("load")
+            result = query.run()
+            return result
+        except r.ReqlNonExistenceError:
+            return []
+
diff --git a/setup.py b/setup.py
index 60b25bc..62a5d10 100644
--- a/setup.py
+++ b/setup.py
@@ -3,7 +3,7 @@ import codecs
 
 setuptools.setup(
     name='rethinkstuff',
-    version='0.2.0.dev57',
+    version='0.2.0.dev58',
     packages=['rethinkstuff'],
     classifiers=[
         'Programming Language :: Python :: 2.7',