Merge pull request #9 from internetarchive/err-iterating

handle recoverable errors that happen while iterating over result
This commit is contained in:
jkafader 2018-09-18 15:56:55 -07:00 committed by GitHub
commit 99398a83ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 3 deletions

View File

@ -16,6 +16,8 @@ matrix:
allow_failures:
- python: nightly
- python: 3.7-dev
- python: 2.7
- python: pypy
services:
- docker
@ -25,7 +27,7 @@ before_install:
- docker run -d --publish=28015:28015 rethinkdb
install:
- pip install . pytest
- pip install .[test]
script:
- py.test -v tests

View File

@ -48,14 +48,31 @@ class RethinkerWrapper(object):
result = self.wrapped.run(conn, db=db or self.rr.dbname)
if hasattr(result, '__next__'):
is_iter = True
def gen():
try:
yield # empty yield, see comment below
for x in result:
yield x
while True:
try:
x = next(result)
yield x
except StopIteration:
break
except r.ReqlOpFailedError as e:
if e.args and re.match(
'^Cannot perform.*replica.*',
e.args[0]):
self.logger.error(
'will keep trying after '
'potentially recoverable '
'error: %s', e)
time.sleep(0.5)
else:
raise
finally:
result.close()
conn.close()
g = gen()
# Start executing the generator, leaving off after the
# empty yield. If we didn't do this, and the caller never

View File

@ -1,6 +1,13 @@
import setuptools
import codecs
test_deps = ['pytest']
try:
import unittest.mock
except:
test_deps.append('mock')
setuptools.setup(
name='doublethink',
version='0.2.0.dev88',
@ -10,8 +17,10 @@ setuptools.setup(
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
install_requires=['rethinkdb'],
extras_require={'test': test_deps},
url='https://github.com/internetarchive/doublethink',
author='Noah Levitt',
author_email='nlevitt@archive.org',

View File

@ -24,6 +24,10 @@ import gc
import pytest
import rethinkdb as r
import datetime
try:
from unittest import mock
except:
import mock
logging.basicConfig(stream=sys.stderr, level=logging.INFO,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
@ -128,3 +132,82 @@ def test_utcnow():
## XXX what else can we test without jumping through hoops?
class SpecificException(Exception):
pass
class MockRethinker(doublethink.Rethinker):
def __init__(self, *args, **kwargs):
self.m = mock.MagicMock()
def table(name):
mm = mock.MagicMock()
if name in ('err_running_query', 'recoverable_err_running_query'):
if name == 'recoverable_err_running_query':
e = r.ReqlOpFailedError(
'Cannot perform read: The primary replica '
"isn't connected... THIS IS A TEST!")
else:
e = SpecificException
# dict because: https://stackoverflow.com/questions/3190706/nonlocal-keyword-in-python-2-x
count = {'value': 0}
def run(*args, **kwargs):
count['value'] += 1
if count['value'] <= 2:
raise e
else:
return mock.MagicMock()
mm.run = run
elif name in ('err_in_iterator', 'recoverable_err_in_iterator'):
if name == 'recoverable_err_in_iterator':
e = r.ReqlOpFailedError(
'Cannot perform read: The primary replica '
"isn't connected... THIS IS A TEST!")
else:
e = SpecificException
def run(*args, **kwargs):
mmm = mock.MagicMock()
# dict because: https://stackoverflow.com/questions/3190706/nonlocal-keyword-in-python-2-x
count = {'value': 0}
def next_(*args, **kwargs):
count['value'] += 1
if count['value'] <= 2:
raise e
else:
return mock.MagicMock()
mmm.__iter__ = lambda *args, **kwargs: mmm
mmm.__next__ = next_
mmm.next = next_
return mmm
mm.run = run
return mm
self.m.table = table
def _random_server_connection(self):
return mock.Mock()
def __getattr__(self, name):
delegate = getattr(self.m, name)
return self.wrap(delegate)
def test_error_handling():
rr = MockRethinker(db='my_db')
with pytest.raises(SpecificException):
rr.table('err_running_query').run()
# should not raise exception
rr.table('recoverable_err_running_query').run()
it = rr.table('err_in_iterator').run() # no exception yet
with pytest.raises(SpecificException):
next(it) # exception here
it = rr.table('recoverable_err_in_iterator').run() # no exception yet
next(it) # no exception