mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
rewrite Rethinker to mimic standard rethinkdb api; rename package rethinkstuff
This commit is contained in:
parent
cd11a4bf83
commit
3b52b05329
14
README.md
14
README.md
@ -1,4 +1,4 @@
|
||||
# pyrethink
|
||||
# rethinkstuff
|
||||
Rudimentary rethinkdb python library with some smarts (and maybe some dumbs)
|
||||
|
||||
## What? Why?
|
||||
@ -14,13 +14,9 @@ Not really a connection pool, because it doesn't keep any connections open, but
|
||||
|
||||
## Usage
|
||||
```
|
||||
import rethinkdb as r
|
||||
import pyrethink
|
||||
|
||||
rr = pyrethink.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db')
|
||||
|
||||
rr.run(r.table('my_table').insert({'foo':'bar','baz':2}))
|
||||
|
||||
for result in rr.results_iter(r.table('my_table')):
|
||||
import rethinkstuff
|
||||
r = rethinkstuff.Rethinker(['db0.foo.com', 'db0.foo.com:38015', 'db1.foo.com'], 'my_db')
|
||||
r.table('my_table').insert({'foo':'bar','baz':2}).run()
|
||||
for result in r.table('my_table'):
|
||||
print("result={}".format(result))
|
||||
```
|
||||
|
@ -1,61 +0,0 @@
|
||||
from __future__ import absolute_import
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
class Rethinker:
|
||||
logger = logging.getLogger('pyrethink.Rethinker')
|
||||
|
||||
def __init__(self, servers=['localhost'], db=None):
|
||||
self.servers = servers
|
||||
self.db = db
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
while True:
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
try:
|
||||
host, port = server.split(':')
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
except Exception as e:
|
||||
self.logger.error('will keep trying to get a connection after failure connecting to %s', server, exc_info=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
def run(self, query):
|
||||
while True:
|
||||
with self._random_server_connection() as conn:
|
||||
try:
|
||||
return query.run(conn, db=self.db)
|
||||
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
|
||||
self.logger.error('will retry rethinkdb query/operation %s which failed like so:', exc_info=True)
|
||||
|
||||
def results_iter(self, query):
|
||||
"""Generator for query results that closes the connection after
|
||||
iterating over the results, for proper support of cursors, which fetch
|
||||
from the server more than once."""
|
||||
success = False
|
||||
results = None
|
||||
try:
|
||||
while not success:
|
||||
with self._random_server_connection() as conn:
|
||||
try:
|
||||
results = query.run(conn, db=self.db)
|
||||
success = True
|
||||
for result in results:
|
||||
yield result
|
||||
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
|
||||
if not success:
|
||||
self.logger.error('will retry rethinkdb query/operation %s which failed like so:', exc_info=True)
|
||||
else:
|
||||
# initial query was successful, subsequent fetch
|
||||
# perhaps failed, only caller can know what to do
|
||||
raise
|
||||
finally:
|
||||
if results and hasattr(results, 'close'):
|
||||
results.close()
|
||||
|
82
rethinkstuff/__init__.py
Normal file
82
rethinkstuff/__init__.py
Normal file
@ -0,0 +1,82 @@
|
||||
from __future__ import absolute_import
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
import types
|
||||
|
||||
class RethinkerWrapper:
|
||||
def __init__(self, rethinker, wrapped):
|
||||
self.rethinker = rethinker
|
||||
self.wrapped = wrapped
|
||||
|
||||
def __getattr__(self, name):
|
||||
delegate = getattr(self.wrapped, name)
|
||||
return self.rethinker.wrap(delegate)
|
||||
|
||||
def run(self, db=None):
|
||||
self.wrapped.run # raise AttributeError early
|
||||
while True:
|
||||
conn = self.rethinker._random_server_connection()
|
||||
is_iter = False
|
||||
try:
|
||||
result = self.wrapped.run(conn, db=db or self.rethinker.db)
|
||||
if hasattr(result, "__next__"):
|
||||
def gen():
|
||||
try:
|
||||
for x in result:
|
||||
yield x
|
||||
finally:
|
||||
conn.close()
|
||||
return gen()
|
||||
else:
|
||||
return result
|
||||
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
|
||||
pass
|
||||
finally:
|
||||
if not is_iter:
|
||||
conn.close(noreply_wait=False)
|
||||
|
||||
|
||||
class Rethinker:
|
||||
"""
|
||||
>>> r = Rethinker(db="my_db")
|
||||
>>> doc = r.table("my_table").get(1).run()
|
||||
"""
|
||||
logger = logging.getLogger('rethinkstuff.Rethinker')
|
||||
|
||||
def __init__(self, servers=['localhost'], db=None):
|
||||
self.servers = servers
|
||||
self.db = db
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
while True:
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
try:
|
||||
host, port = server.split(':')
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
except Exception as e:
|
||||
self.logger.error('will keep trying to get a connection after failure connecting to %s', server, exc_info=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
def wrap(self, delegate):
|
||||
if isinstance(delegate, (types.FunctionType, types.MethodType)):
|
||||
def wrapper(*args, **kwargs):
|
||||
result = delegate(*args, **kwargs)
|
||||
if result is not None:
|
||||
return RethinkerWrapper(self, result)
|
||||
else:
|
||||
return None
|
||||
return wrapper
|
||||
else:
|
||||
return delegate
|
||||
|
||||
def __getattr__(self, name):
|
||||
delegate = getattr(r, name)
|
||||
return self.wrap(delegate)
|
||||
|
Loading…
x
Reference in New Issue
Block a user