From 3b52b05329b273e5f879253234295f395040d6a0 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 21 Sep 2015 22:17:15 +0000 Subject: [PATCH] rewrite Rethinker to mimic standard rethinkdb api; rename package rethinkstuff --- README.md | 14 +++---- pyrethink/__init__.py | 61 ------------------------------ rethinkstuff/__init__.py | 82 ++++++++++++++++++++++++++++++++++++++++ setup.py | 5 ++- 4 files changed, 90 insertions(+), 72 deletions(-) delete mode 100644 pyrethink/__init__.py create mode 100644 rethinkstuff/__init__.py diff --git a/README.md b/README.md index bc62e1c..fd2d4e2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# pyrethink +# rethinkstuff Rudimentary rethinkdb python library with some smarts (and maybe some dumbs) ## What? Why? @@ -14,13 +14,9 @@ Not really a connection pool, because it doesn't keep any connections open, but ## Usage ``` -import rethinkdb as r -import pyrethink - -rr = pyrethink.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db') - -rr.run(r.table('my_table').insert({'foo':'bar','baz':2})) - -for result in rr.results_iter(r.table('my_table')): +import rethinkstuff +r = rethinkstuff.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db') +r.table('my_table').insert({'foo':'bar','baz':2}).run() +for result in r.table('my_table'): print("result={}".format(result)) ``` diff --git a/pyrethink/__init__.py b/pyrethink/__init__.py deleted file mode 100644 index df9e5f2..0000000 --- a/pyrethink/__init__.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import absolute_import -import rethinkdb as r -import logging -import random -import time - -class Rethinker: - logger = logging.getLogger('pyrethink.Rethinker') - - def __init__(self, servers=['localhost'], db=None): - self.servers = servers - self.db = db - - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - while True: - server = random.choice(self.servers) - try: - try: - host, port = server.split(':') - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - except Exception as e: - self.logger.error('will keep trying to get a connection after failure connecting to %s', server, exc_info=True) - time.sleep(0.5) - - def run(self, query): - while True: - with self._random_server_connection() as conn: - try: - return query.run(conn, db=self.db) - except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: - self.logger.error('will retry rethinkdb query/operation %s which failed like so:', exc_info=True) - - def results_iter(self, query): - """Generator for query results that closes the connection after - iterating over the results, for proper support of cursors, which fetch - from the server more than once.""" - success = False - results = None - try: - while not success: - with self._random_server_connection() as conn: - try: - results = query.run(conn, db=self.db) - success = True - for result in results: - yield result - except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: - if not success: - self.logger.error('will retry rethinkdb query/operation %s which failed like so:', exc_info=True) - else: - # initial query was successful, subsequent fetch - # perhaps failed, only caller can know what to do - raise - finally: - if results and hasattr(results, 'close'): - results.close() - diff --git a/rethinkstuff/__init__.py b/rethinkstuff/__init__.py new file mode 100644 index 0000000..d248c59 --- /dev/null +++ b/rethinkstuff/__init__.py @@ -0,0 +1,82 @@ +from __future__ import absolute_import +import rethinkdb as r +import logging +import random +import time +import types + +class RethinkerWrapper: + def __init__(self, rethinker, wrapped): + self.rethinker = rethinker + self.wrapped = wrapped + + def __getattr__(self, name): + delegate = getattr(self.wrapped, name) + return self.rethinker.wrap(delegate) + + def run(self, db=None): + self.wrapped.run # raise AttributeError early + while True: + conn = self.rethinker._random_server_connection() + is_iter = False + try: + result = self.wrapped.run(conn, db=db or self.rethinker.db) + if hasattr(result, "__next__"): + def gen(): + try: + for x in result: + yield x + finally: + conn.close() + return gen() + else: + return result + except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: + pass + finally: + if not is_iter: + conn.close(noreply_wait=False) + + +class Rethinker: + """ + >>> r = Rethinker(db="my_db") + >>> doc = r.table("my_table").get(1).run() + """ + logger = logging.getLogger('rethinkstuff.Rethinker') + + def __init__(self, servers=['localhost'], db=None): + self.servers = servers + self.db = db + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + while True: + server = random.choice(self.servers) + try: + try: + host, port = server.split(':') + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + except Exception as e: + self.logger.error('will keep trying to get a connection after failure connecting to %s', server, exc_info=True) + time.sleep(0.5) + + def wrap(self, delegate): + if isinstance(delegate, (types.FunctionType, types.MethodType)): + def wrapper(*args, **kwargs): + result = delegate(*args, **kwargs) + if result is not None: + return RethinkerWrapper(self, result) + else: + return None + return wrapper + else: + return delegate + + def __getattr__(self, name): + delegate = getattr(r, name) + return self.wrap(delegate) + diff --git a/setup.py b/setup.py index d3ee4a9..a5f109b 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,11 @@ import setuptools setuptools.setup( - name='pyrethink', - packages=['pyrethink'], + name='rethinkstuff', + packages=['rethinkstuff'], classifiers=[ 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.4', ], + install_requires=['rethinkdb'], )