mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
fail after 20 "recoverable" exception in iterator
it turns out that when iterating over results sometimes (always?) errors that are recoverable when running a query are not recoverable, so we've been ending up in infinite loops
This commit is contained in:
parent
a9f764fb45
commit
c5b1b0a620
@ -39,6 +39,33 @@ class RethinkerWrapper(object):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
|
return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
|
||||||
|
|
||||||
|
def _result_iter(self, conn, result):
|
||||||
|
error_count = 0
|
||||||
|
try:
|
||||||
|
yield # empty yield, see comment below
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
yield next(result)
|
||||||
|
except StopIteration:
|
||||||
|
break
|
||||||
|
except r.ReqlOpFailedError as e:
|
||||||
|
if e.args and re.match(
|
||||||
|
'^Cannot perform.*replica.*', e.args[0]):
|
||||||
|
if error_count < 20:
|
||||||
|
error_count += 1
|
||||||
|
self.logger.warn(
|
||||||
|
'will keep trying after potentially '
|
||||||
|
'recoverable error (%s/20): %s',
|
||||||
|
error_count, e)
|
||||||
|
time.sleep(0.5)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
result.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
def run(self, db=None):
|
def run(self, db=None):
|
||||||
self.wrapped.run # raise AttributeError early
|
self.wrapped.run # raise AttributeError early
|
||||||
while True:
|
while True:
|
||||||
@ -48,32 +75,7 @@ class RethinkerWrapper(object):
|
|||||||
result = self.wrapped.run(conn, db=db or self.rr.dbname)
|
result = self.wrapped.run(conn, db=db or self.rr.dbname)
|
||||||
if hasattr(result, '__next__'):
|
if hasattr(result, '__next__'):
|
||||||
is_iter = True
|
is_iter = True
|
||||||
|
g = self._result_iter(conn, result)
|
||||||
def gen():
|
|
||||||
try:
|
|
||||||
yield # empty yield, see comment below
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
x = next(result)
|
|
||||||
yield x
|
|
||||||
except StopIteration:
|
|
||||||
break
|
|
||||||
except r.ReqlOpFailedError as e:
|
|
||||||
if e.args and re.match(
|
|
||||||
'^Cannot perform.*replica.*',
|
|
||||||
e.args[0]):
|
|
||||||
self.logger.error(
|
|
||||||
'will keep trying after '
|
|
||||||
'potentially recoverable '
|
|
||||||
'error: %s', e)
|
|
||||||
time.sleep(0.5)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
result.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
g = gen()
|
|
||||||
# Start executing the generator, leaving off after the
|
# Start executing the generator, leaving off after the
|
||||||
# empty yield. If we didn't do this, and the caller never
|
# empty yield. If we didn't do this, and the caller never
|
||||||
# started the generator, the finally block would never run
|
# started the generator, the finally block would never run
|
||||||
@ -87,7 +89,7 @@ class RethinkerWrapper(object):
|
|||||||
except r.ReqlOpFailedError as e:
|
except r.ReqlOpFailedError as e:
|
||||||
if e.args and re.match(
|
if e.args and re.match(
|
||||||
'^Cannot perform.*replica.*', e.args[0]):
|
'^Cannot perform.*replica.*', e.args[0]):
|
||||||
self.logger.error(
|
self.logger.warn(
|
||||||
'will keep trying after potentially recoverable '
|
'will keep trying after potentially recoverable '
|
||||||
'error: %s', e)
|
'error: %s', e)
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
@ -174,7 +174,7 @@ class MockRethinker(doublethink.Rethinker):
|
|||||||
count = {'value': 0}
|
count = {'value': 0}
|
||||||
def next_(*args, **kwargs):
|
def next_(*args, **kwargs):
|
||||||
count['value'] += 1
|
count['value'] += 1
|
||||||
if count['value'] <= 2:
|
if count['value'] % 2 == 1:
|
||||||
raise e
|
raise e
|
||||||
else:
|
else:
|
||||||
return mock.MagicMock()
|
return mock.MagicMock()
|
||||||
@ -209,5 +209,9 @@ def test_error_handling():
|
|||||||
next(it) # exception here
|
next(it) # exception here
|
||||||
|
|
||||||
it = rr.table('recoverable_err_in_iterator').run() # no exception yet
|
it = rr.table('recoverable_err_in_iterator').run() # no exception yet
|
||||||
next(it) # no exception
|
# next(it)
|
||||||
|
for i in range(20):
|
||||||
|
next(it) # no exception
|
||||||
|
with pytest.raises(r.ReqlOpFailedError):
|
||||||
|
next(it) # out of retries
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user