diff --git a/rethinkstuff/__init__.py b/rethinkstuff/__init__.py index 3765b76..cd93753 100644 --- a/rethinkstuff/__init__.py +++ b/rethinkstuff/__init__.py @@ -17,19 +17,19 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import rethinkdb as r -import logging -import random -import time -import types -import socket -import os +import rethinkdb import datetime +from rethinkstuff.orm import Document +from rethinkstuff.rethinker import Rethinker +from rethinkstuff.services import ServiceRegistry + +__all__ = ['Document', 'Rethinker', 'ServiceRegistry', 'UTC', 'utcnow'] + try: UTC = datetime.timezone.utc except: - UTC = r.make_timezone("00:00") + UTC = rethinkdb.make_timezone("00:00") def utcnow(): """Convenience function to get timezone-aware UTC datetime. RethinkDB @@ -38,379 +38,3 @@ def utcnow(): 2 doesn't come with a timezone implementation.""" return datetime.datetime.now(UTC) -class RethinkerWrapper(object): - logger = logging.getLogger('rethinkstuff.RethinkerWrapper') - def __init__(self, rethinker, wrapped): - self.rethinker = rethinker - self.wrapped = wrapped - - def __getattr__(self, name): - delegate = getattr(self.wrapped, name) - return self.rethinker.wrap(delegate) - - def __getitem__(self, key): - return self.rethinker.wrap(self.wrapped.__getitem__)(key) - - def __repr__(self): - return ''.format(repr(self.wrapped)) - - def run(self, db=None): - self.wrapped.run # raise AttributeError early - while True: - conn = self.rethinker._random_server_connection() - is_iter = False - try: - result = self.wrapped.run(conn, db=db or self.rethinker.dbname) - if hasattr(result, '__next__'): - is_iter = True - def gen(): - try: - yield # empty yield, see comment below - for x in result: - yield x - finally: - result.close() - conn.close() - g = gen() - # Start executing the generator, leaving off after the - # empty yield. If we didn't do this, and the caller never - # started the generator, the finally block would never run - # and the connection would stay open. - next(g) - return g - else: - return result - except r.ReqlTimeoutError as e: - time.sleep(0.5) - finally: - if not is_iter: - conn.close(noreply_wait=False) - -class Rethinker(object): - ''' - >>> r = Rethinker(db='my_db') - >>> doc = r.table('my_table').get(1).run() - ''' - logger = logging.getLogger('rethinkstuff.Rethinker') - - def __init__(self, servers=['localhost'], db=None): - if isinstance(servers, str): - self.servers = [servers] - else: - self.servers = servers - self.dbname = db - - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - while True: - server = random.choice(self.servers) - try: - try: - host, port = server.split(':') - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - except Exception as e: - self.logger.error( - 'will keep trying after failure connecting to ' - 'rethinkdb server at %s: %s', server, e) - time.sleep(0.5) - - def wrap(self, delegate): - if isinstance(delegate, (types.FunctionType, types.MethodType)): - def wrapper(*args, **kwargs): - result = delegate(*args, **kwargs) - if result is not None: - return RethinkerWrapper(self, result) - else: - return None - return wrapper - else: - return delegate - - def __getattr__(self, name): - delegate = getattr(r, name) - return self.wrap(delegate) - -class ServiceRegistry(object): - ''' - status_info is dict, should have at least these fields - { - 'id': ..., # generated by rethinkdb - 'role': 'brozzler-worker', - 'load': 0.5, # load score - 'heartbeat_interval': 20.0, - 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback - 'pid': 1234, # set in svcreg.heartbeat() as a fallback - 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() - 'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat() - ... plus anything else you want... - } - ''' - - logger = logging.getLogger('rethinkstuff.ServiceRegistry') - - def __init__(self, rethinker): - self.r = rethinker - self._ensure_table() - - def _ensure_table(self): - dbs = self.r.db_list().run() - if not self.r.dbname in dbs: - self.logger.info('creating rethinkdb database %s', repr(self.r.dbname)) - self.r.db_create(self.r.dbname).run() - tables = self.r.table_list().run() - if not 'services' in tables: - self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname)) - self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run() - # self.r.table('sites').index_create...? - - def heartbeat(self, status_info): - ''' - Returns updated status info on success, un-updated status info on - failure. - ''' - updated_status_info = dict(status_info) - updated_status_info['last_heartbeat'] = r.now() - if not 'first_heartbeat' in updated_status_info: - updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat'] - if not 'host' in updated_status_info: - updated_status_info['host'] = socket.gethostname() - if not 'pid' in updated_status_info: - updated_status_info['pid'] = os.getpid() - try: - result = self.r.table('services').insert( - updated_status_info, conflict='replace', - return_changes=True).run() - return result['changes'][0]['new_val'] # XXX check - except: - self.logger.error('error updating service registry', exc_info=True) - return status_info - - def unregister(self, id): - result = self.r.table('services').get(id).delete().run() - if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}: - self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result) - - def available_service(self, role): - try: - result = self.r.table('services').filter({"role":role}).filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) - ).order_by("load")[0].run() - return result - except r.ReqlNonExistenceError: - return None - - def available_services(self, role=None): - try: - query = self.r.table('services') - if role: - query = query.filter({"role":role}) - query = query.filter( - lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) - ).order_by("load") - result = query.run() - return result - except r.ReqlNonExistenceError: - return [] - -class WatchedDict(dict): - def __init__(self, d, callback, field): - self.callback = callback - self.field = field - for key in d: - dict.__setitem__(self, key, watch( - d[key], callback=self.callback, field=self.field)) - - def __setitem__(self, key, value): - self.callback(self.field) - return dict.__setitem__(self, key, watch( - value, callback=self.callback, field=self.field)) - - def __delitem__(self, key): - self.callback(self.field) - return dict.__delitem__(self, key) - - def clear(self): - self.callback(self.field) - return dict.clear(self) - - def pop(self, *args): - self.callback(self.field) - return dict.pop(self, *args) - - def popitem(self): - self.callback(self.field) - return dict.popitem() - - def setdefault(self, *args): - self.callback(self.field) - if len(args) == 2: - return dict.setdefault(self, args[0], watch( - args[1], callback=self.callback, field=self.field)) - else: - return dict.setdefault(self, *args) - - def update(self, *args, **kwargs): - # looks a little tricky - raise Exception('not implemented') - -class WatchedList(list): - def __init__(self, l, callback, field): - self.callback = callback - self.field = field - for item in l: - list.append(self, watch(item, callback=callback, field=self.field)) - - def __setitem__(self, index, value): - self.callback(self.field) - return list.__setitem__(self, index, watch( - value, callback=self.callback, field=self.field)) - - def __delitem__(self, index): - self.callback(self.field) - return list.__delitem__(self, index) - - def append(self, value): - self.callback(self.field) - return list.append(self, watch( - value, callback=self.callback, field=self.field)) - - def extend(self, value): - self.callback(self.field) - return list.extend(self, watch( - list(value), callback=self.callback, field=self.field)) - - def insert(self, index, value): - self.callback(self.field) - return list.insert(self, index, watch( - value, callback=self.callback, field=self.field)) - - def remove(self, value): - self.callback(self.field) - return list.remove(self, value) - - def pop(self, index=-1): - self.callback(self.field) - return list.pop(self, index) - - def clear(self): - self.callback(self.field) - return list.clear(self) - - def sort(self, key=None, reverse=False): - self.callback(self.field) - return list.sort(self, key, reverse) - - def reverse(self): - self.callback(self.field) - return list.reverse(self) - -def watch(obj, callback, field): - if isinstance(obj, dict): - return WatchedDict(obj, callback, field) - elif isinstance(obj, list): - return WatchedList(obj, callback, field) - else: - return obj - -class Document(dict, object): - ''' - Base class for documents in rethinkdb. - - You should subclass this class for each of your rethinkdb tables. You can - add custom functionality in your subclass if appropriate. - - This class keeps track of changes made to the object and any nested fields. - After you have made some changes, call update() to persist them to the - database. - - Changes in nested fields result in updates to their first-level ancestor - field. For example, if your document starts as {'a': {'b': 'c'}}, then - you run d['a']['x'] = 'y', then the update will replace the whole 'a' - field. Nested field updates get too complicated any other way. - - The primary key must be `id`, the rethinkdb default. (XXX we could find out - what the primary key is from the "table_config" system table.) - ''' - def __init__(self, rethinker, d={}): - dict.__setattr__(self, '_r', rethinker) - for k in d: - dict.__setitem__( - self, k, watch(d[k], callback=self._updated, field=k)) - self._clear_updates() - - def _clear_updates(self): - dict.__setattr__(self, '_updates', {}) - dict.__setattr__(self, '_deletes', set()) - - def __setitem__(self, key, value): - dict.__setitem__( - self, key, watch(value, callback=self._updated, field=key)) - self._updated(key) - - __setattr__ = __setitem__ - __getattr__ = dict.__getitem__ - - def __delitem__(self, key): - dict.__delitem__(self, key) - self._deletes.add(key) - if key in self._updates: - del self._updates[key] - - # XXX do we need the other stuff like in WatchedDict? - - def _updated(self, field): - # callback for all updates - self._updates[field] = self[field] - if field in self._deletes: - self._deletes.remove(field) - - @property - def table(self): - ''' - Name of the rethinkdb table. - - Defaults to the name of the class, lowercased. Can be overridden. - ''' - return self.__class__.__name__.lower() - - def table_create(self): - ''' - Creates the table. - - Subclasses may want to override to do more things, such as creating - indexes. - ''' - self._r.table_create(self.table).run() - - def insert(self): - result = self._r.table(self.table).insert(self).run() - if 'generated_keys' in result: - dict.__setitem__(self, 'id', result['generated_keys'][0]) - self._clear_updates() - - def update(self): - # hmm, masks dict.update() - if self._updates: - # r.literal() to replace, not merge with, nested fields - updates = { - field: r.literal( - self._updates[field]) for field in self._updates} - self._r.table(self.table).get(self.id).update(updates).run() - if self._deletes: - self._r.table(self.table).replace( - r.row.without(self._deletes)).run() - self._clear_updates() - - def refresh(self): - ''' - Refresh from the database. - ''' - d = self._r.table(self.table).get(self.id).run() - for k in d: - dict.__setitem__( - self, k, watch(d[k], callback=self._updated, field=k)) - - diff --git a/rethinkstuff/orm.py b/rethinkstuff/orm.py new file mode 100644 index 0000000..7dd6eb7 --- /dev/null +++ b/rethinkstuff/orm.py @@ -0,0 +1,220 @@ +''' +rethinkstuff/orm.py - rethinkdb ORM + +Copyright (C) 2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import rethinkdb as r +import logging + +class WatchedDict(dict): + def __init__(self, d, callback, field): + self.callback = callback + self.field = field + for key in d: + dict.__setitem__(self, key, watch( + d[key], callback=self.callback, field=self.field)) + + def __setitem__(self, key, value): + self.callback(self.field) + return dict.__setitem__(self, key, watch( + value, callback=self.callback, field=self.field)) + + def __delitem__(self, key): + self.callback(self.field) + return dict.__delitem__(self, key) + + def clear(self): + self.callback(self.field) + return dict.clear(self) + + def pop(self, *args): + self.callback(self.field) + return dict.pop(self, *args) + + def popitem(self): + self.callback(self.field) + return dict.popitem() + + def setdefault(self, *args): + self.callback(self.field) + if len(args) == 2: + return dict.setdefault(self, args[0], watch( + args[1], callback=self.callback, field=self.field)) + else: + return dict.setdefault(self, *args) + + def update(self, *args, **kwargs): + # looks a little tricky + raise Exception('not implemented') + +class WatchedList(list): + def __init__(self, l, callback, field): + self.callback = callback + self.field = field + for item in l: + list.append(self, watch(item, callback=callback, field=self.field)) + + def __setitem__(self, index, value): + self.callback(self.field) + return list.__setitem__(self, index, watch( + value, callback=self.callback, field=self.field)) + + def __delitem__(self, index): + self.callback(self.field) + return list.__delitem__(self, index) + + def append(self, value): + self.callback(self.field) + return list.append(self, watch( + value, callback=self.callback, field=self.field)) + + def extend(self, value): + self.callback(self.field) + return list.extend(self, watch( + list(value), callback=self.callback, field=self.field)) + + def insert(self, index, value): + self.callback(self.field) + return list.insert(self, index, watch( + value, callback=self.callback, field=self.field)) + + def remove(self, value): + self.callback(self.field) + return list.remove(self, value) + + def pop(self, index=-1): + self.callback(self.field) + return list.pop(self, index) + + def clear(self): + self.callback(self.field) + return list.clear(self) + + def sort(self, key=None, reverse=False): + self.callback(self.field) + return list.sort(self, key, reverse) + + def reverse(self): + self.callback(self.field) + return list.reverse(self) + +def watch(obj, callback, field): + if isinstance(obj, dict): + return WatchedDict(obj, callback, field) + elif isinstance(obj, list): + return WatchedList(obj, callback, field) + else: + return obj + +class Document(dict, object): + ''' + Base class for ORM. + + You should subclass this class for each of your rethinkdb tables. You can + add custom functionality in your subclass if appropriate. + + This class keeps track of changes made to the object and any nested fields. + After you have made some changes, call update() to persist them to the + database. + + Changes in nested fields result in updates to their first-level ancestor + field. For example, if your document starts as {'a': {'b': 'c'}}, then + you run d['a']['x'] = 'y', then the update will replace the whole 'a' + field. Nested field updates get too complicated any other way. + + The primary key must be `id`, the rethinkdb default. (XXX we could find out + what the primary key is from the "table_config" system table.) + ''' + def __init__(self, rethinker, d={}): + dict.__setattr__(self, '_r', rethinker) + for k in d: + dict.__setitem__( + self, k, watch(d[k], callback=self._updated, field=k)) + self._clear_updates() + + def _clear_updates(self): + dict.__setattr__(self, '_updates', {}) + dict.__setattr__(self, '_deletes', set()) + + def __setitem__(self, key, value): + dict.__setitem__( + self, key, watch(value, callback=self._updated, field=key)) + self._updated(key) + + __setattr__ = __setitem__ + __getattr__ = dict.__getitem__ + + def __delitem__(self, key): + dict.__delitem__(self, key) + self._deletes.add(key) + if key in self._updates: + del self._updates[key] + + # XXX do we need the other stuff like in WatchedDict? + + def _updated(self, field): + # callback for all updates + self._updates[field] = self[field] + if field in self._deletes: + self._deletes.remove(field) + + @property + def table(self): + ''' + Name of the rethinkdb table. + + Defaults to the name of the class, lowercased. Can be overridden. + ''' + return self.__class__.__name__.lower() + + def table_create(self): + ''' + Creates the table. + + Subclasses may want to override this method to do more things, such as + creating indexes. + ''' + self._r.table_create(self.table).run() + + def insert(self): + result = self._r.table(self.table).insert(self).run() + if 'generated_keys' in result: + dict.__setitem__(self, 'id', result['generated_keys'][0]) + self._clear_updates() + + def update(self): + # hmm, masks dict.update() + if self._updates: + # r.literal() to replace, not merge with, nested fields + updates = { + field: r.literal( + self._updates[field]) for field in self._updates} + self._r.table(self.table).get(self.id).update(updates).run() + if self._deletes: + self._r.table(self.table).replace( + r.row.without(self._deletes)).run() + self._clear_updates() + + def refresh(self): + ''' + Refresh from the database. + ''' + d = self._r.table(self.table).get(self.id).run() + for k in d: + dict.__setitem__( + self, k, watch(d[k], callback=self._updated, field=k)) + + diff --git a/rethinkstuff/rethinker.py b/rethinkstuff/rethinker.py new file mode 100644 index 0000000..ed8b8b5 --- /dev/null +++ b/rethinkstuff/rethinker.py @@ -0,0 +1,119 @@ +''' +rethinkstuff/rethinker.py - rethinkdb connection-manager + +Copyright (C) 2015-2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import rethinkdb as r +import logging +import random +import time +import types + +class RethinkerWrapper(object): + logger = logging.getLogger('rethinkstuff.RethinkerWrapper') + def __init__(self, rethinker, wrapped): + self.rethinker = rethinker + self.wrapped = wrapped + + def __getattr__(self, name): + delegate = getattr(self.wrapped, name) + return self.rethinker.wrap(delegate) + + def __getitem__(self, key): + return self.rethinker.wrap(self.wrapped.__getitem__)(key) + + def __repr__(self): + return ''.format(repr(self.wrapped)) + + def run(self, db=None): + self.wrapped.run # raise AttributeError early + while True: + conn = self.rethinker._random_server_connection() + is_iter = False + try: + result = self.wrapped.run(conn, db=db or self.rethinker.dbname) + if hasattr(result, '__next__'): + is_iter = True + def gen(): + try: + yield # empty yield, see comment below + for x in result: + yield x + finally: + result.close() + conn.close() + g = gen() + # Start executing the generator, leaving off after the + # empty yield. If we didn't do this, and the caller never + # started the generator, the finally block would never run + # and the connection would stay open. + next(g) + return g + else: + return result + except r.ReqlTimeoutError as e: + time.sleep(0.5) + finally: + if not is_iter: + conn.close(noreply_wait=False) + +class Rethinker(object): + ''' + >>> r = Rethinker(db='my_db') + >>> doc = r.table('my_table').get(1).run() + ''' + logger = logging.getLogger('rethinkstuff.Rethinker') + + def __init__(self, servers=['localhost'], db=None): + if isinstance(servers, str): + self.servers = [servers] + else: + self.servers = servers + self.dbname = db + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + while True: + server = random.choice(self.servers) + try: + try: + host, port = server.split(':') + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + except Exception as e: + self.logger.error( + 'will keep trying after failure connecting to ' + 'rethinkdb server at %s: %s', server, e) + time.sleep(0.5) + + def wrap(self, delegate): + if isinstance(delegate, (types.FunctionType, types.MethodType)): + def wrapper(*args, **kwargs): + result = delegate(*args, **kwargs) + if result is not None: + return RethinkerWrapper(self, result) + else: + return None + return wrapper + else: + return delegate + + def __getattr__(self, name): + delegate = getattr(r, name) + return self.wrap(delegate) + diff --git a/rethinkstuff/services.py b/rethinkstuff/services.py new file mode 100644 index 0000000..9f88a5f --- /dev/null +++ b/rethinkstuff/services.py @@ -0,0 +1,105 @@ +''' +rethinkstuff/services.py - rethinkdb service registry + +Copyright (C) 2015-2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import rethinkdb as r +import logging +import socket +import os + +class ServiceRegistry(object): + ''' + status_info is dict, should have at least these fields + { + 'id': ..., # generated by rethinkdb + 'role': 'brozzler-worker', + 'load': 0.5, # load score + 'heartbeat_interval': 20.0, + 'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback + 'pid': 1234, # set in svcreg.heartbeat() as a fallback + 'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat() + 'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat() + ... plus anything else you want... + } + ''' + + logger = logging.getLogger('rethinkstuff.ServiceRegistry') + + def __init__(self, rethinker): + self.r = rethinker + self._ensure_table() + + def _ensure_table(self): + dbs = self.r.db_list().run() + if not self.r.dbname in dbs: + self.logger.info('creating rethinkdb database %s', repr(self.r.dbname)) + self.r.db_create(self.r.dbname).run() + tables = self.r.table_list().run() + if not 'services' in tables: + self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname)) + self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run() + # self.r.table('sites').index_create...? + + def heartbeat(self, status_info): + ''' + Returns updated status info on success, un-updated status info on + failure. + ''' + updated_status_info = dict(status_info) + updated_status_info['last_heartbeat'] = r.now() + if not 'first_heartbeat' in updated_status_info: + updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat'] + if not 'host' in updated_status_info: + updated_status_info['host'] = socket.gethostname() + if not 'pid' in updated_status_info: + updated_status_info['pid'] = os.getpid() + try: + result = self.r.table('services').insert( + updated_status_info, conflict='replace', + return_changes=True).run() + return result['changes'][0]['new_val'] # XXX check + except: + self.logger.error('error updating service registry', exc_info=True) + return status_info + + def unregister(self, id): + result = self.r.table('services').get(id).delete().run() + if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}: + self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result) + + def available_service(self, role): + try: + result = self.r.table('services').filter({"role":role}).filter( + lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) + ).order_by("load")[0].run() + return result + except r.ReqlNonExistenceError: + return None + + def available_services(self, role=None): + try: + query = self.r.table('services') + if role: + query = query.filter({"role":role}) + query = query.filter( + lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0) + ).order_by("load") + result = query.run() + return result + except r.ReqlNonExistenceError: + return [] + diff --git a/setup.py b/setup.py index 60b25bc..62a5d10 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ import codecs setuptools.setup( name='rethinkstuff', - version='0.2.0.dev57', + version='0.2.0.dev58', packages=['rethinkstuff'], classifiers=[ 'Programming Language :: Python :: 2.7',