mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Merge pull request #13 from internetarchive/conn-penalty-box
minimize impact of down server
This commit is contained in:
commit
fc9f08f615
@ -113,21 +113,22 @@ class Rethinker(object):
|
|||||||
else:
|
else:
|
||||||
self.servers = servers
|
self.servers = servers
|
||||||
self.dbname = db
|
self.dbname = db
|
||||||
|
self.last_error = {} # {server: time}
|
||||||
|
|
||||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||||
# "Best practices: Managing connections: a connection per request"
|
# "Best practices: Managing connections: a connection per request"
|
||||||
def _random_server_connection(self):
|
def _random_server_connection(self):
|
||||||
retry_wait = 0.01
|
retry_wait = 0.01
|
||||||
while True:
|
while True:
|
||||||
server = random.choice(self.servers)
|
server = random.choice(self._server_whitelist())
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
host, port = server.split(':')
|
host, port = server.split(':')
|
||||||
return r.connect(
|
return r.connect(host=host, port=port)
|
||||||
host=host, port=port, timeout=max(0.1, retry_wait))
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return r.connect(host=server, timeout=max(0.1, retry_wait))
|
return r.connect(host=server)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
self.last_error[server] = time.time()
|
||||||
self.logger.warn(
|
self.logger.warn(
|
||||||
'will keep trying after failure connecting to '
|
'will keep trying after failure connecting to '
|
||||||
'rethinkdb server at %s: %s (sleeping for %s sec)',
|
'rethinkdb server at %s: %s (sleeping for %s sec)',
|
||||||
@ -135,6 +136,24 @@ class Rethinker(object):
|
|||||||
time.sleep(retry_wait)
|
time.sleep(retry_wait)
|
||||||
retry_wait = min(retry_wait * 2, 10.0)
|
retry_wait = min(retry_wait * 2, 10.0)
|
||||||
|
|
||||||
|
# https://en.wikipedia.org/wiki/Penalty_(ice_hockey)#Major_penalty
|
||||||
|
PENALTY_BOX_TIME = 300
|
||||||
|
def _server_whitelist(self):
|
||||||
|
'''
|
||||||
|
Returns list of servers that have not errored in the last five minutes.
|
||||||
|
If all servers have errored in the last five minutes, returns list with
|
||||||
|
one item, the server that errored least recently.
|
||||||
|
'''
|
||||||
|
whitelist = []
|
||||||
|
for server in self.servers:
|
||||||
|
if (server not in self.last_error
|
||||||
|
or self.last_error[server] < time.time() - self.PENALTY_BOX_TIME):
|
||||||
|
whitelist.append(server)
|
||||||
|
if not whitelist:
|
||||||
|
whitelist.append(sorted(
|
||||||
|
self.last_error.items(), key=lambda kv: kv[1])[0][0])
|
||||||
|
return whitelist
|
||||||
|
|
||||||
def wrap(self, delegate):
|
def wrap(self, delegate):
|
||||||
if isinstance(delegate, (types.FunctionType, types.MethodType)):
|
if isinstance(delegate, (types.FunctionType, types.MethodType)):
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user