mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
service registry, with unit tests!
This commit is contained in:
parent
7c7b0dc991
commit
5570ca9241
@ -4,6 +4,7 @@ import logging
|
||||
import random
|
||||
import time
|
||||
import types
|
||||
import socket
|
||||
|
||||
class RethinkerWrapper(object):
|
||||
logger = logging.getLogger('rethinkstuff.RethinkerWrapper')
|
||||
@ -19,7 +20,7 @@ class RethinkerWrapper(object):
|
||||
return self.rethinker.wrap(self.wrapped.__getitem__)(key)
|
||||
|
||||
def __repr__(self):
|
||||
return "<RethinkerWrapper{}>".format(repr(self.wrapped))
|
||||
return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
|
||||
|
||||
def run(self, db=None):
|
||||
self.wrapped.run # raise AttributeError early
|
||||
@ -28,7 +29,7 @@ class RethinkerWrapper(object):
|
||||
is_iter = False
|
||||
try:
|
||||
result = self.wrapped.run(conn, db=db or self.rethinker.dbname)
|
||||
if hasattr(result, "__next__"):
|
||||
if hasattr(result, '__next__'):
|
||||
is_iter = True
|
||||
def gen():
|
||||
try:
|
||||
@ -54,14 +55,17 @@ class RethinkerWrapper(object):
|
||||
conn.close(noreply_wait=False)
|
||||
|
||||
class Rethinker(object):
|
||||
"""
|
||||
>>> r = Rethinker(db="my_db")
|
||||
>>> doc = r.table("my_table").get(1).run()
|
||||
"""
|
||||
'''
|
||||
>>> r = Rethinker(db='my_db')
|
||||
>>> doc = r.table('my_table').get(1).run()
|
||||
'''
|
||||
logger = logging.getLogger('rethinkstuff.Rethinker')
|
||||
|
||||
def __init__(self, servers=['localhost'], db=None):
|
||||
self.servers = servers
|
||||
if isinstance(servers, str):
|
||||
self.servers = [servers]
|
||||
else:
|
||||
self.servers = servers
|
||||
self.dbname = db
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
@ -95,3 +99,59 @@ class Rethinker(object):
|
||||
delegate = getattr(r, name)
|
||||
return self.wrap(delegate)
|
||||
|
||||
class ServiceRegistry(object):
|
||||
'''
|
||||
status_info is dict, should have at least these fields
|
||||
{
|
||||
'id': ..., # generated by rethinkdb
|
||||
'role': 'brozzler-worker',
|
||||
'load': 0.5, # load score
|
||||
'heartbeat_interval': 20.0,
|
||||
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
|
||||
'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat()
|
||||
... plus anything else you want...
|
||||
}
|
||||
'''
|
||||
|
||||
logger = logging.getLogger('rethinkstuff.ServiceRegistry')
|
||||
|
||||
def __init__(self, rethinker):
|
||||
self.r = rethinker
|
||||
self._ensure_table()
|
||||
|
||||
def _ensure_table(self):
|
||||
dbs = self.r.db_list().run()
|
||||
if not self.r.dbname in dbs:
|
||||
self.logger.info('creating rethinkdb database %s', repr(self.r.dbname))
|
||||
self.r.db_create(self.r.dbname).run()
|
||||
tables = self.r.table_list().run()
|
||||
if not 'services' in tables:
|
||||
self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname))
|
||||
self.r.table_create('services', shards=1, replicas=min(len(self.r.servers), 1)).run()
|
||||
# self.r.table('sites').index_create...?
|
||||
|
||||
def heartbeat(self, status_info):
|
||||
'''Returns id of rethinkdb document (generated by rethinkdb on first insert)'''
|
||||
status_info['last_heartbeat'] = r.now()
|
||||
if not 'host' in status_info:
|
||||
status_info['host'] = socket.gethostname()
|
||||
result = self.r.table('services').insert(status_info, conflict='replace').run()
|
||||
if 'generated_keys' in result:
|
||||
return result['generated_keys'][0]
|
||||
else:
|
||||
return status_info['id']
|
||||
|
||||
def unregister(self, id):
|
||||
result = self.r.table('services').get(id).delete().run()
|
||||
if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
|
||||
self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
|
||||
|
||||
def available_service(self, role):
|
||||
try:
|
||||
result = self.r.table('services').filter({"role":role}).filter(
|
||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
|
||||
).order_by("load")[0].run()
|
||||
return result
|
||||
except r.ReqlNonExistenceError:
|
||||
return None
|
||||
|
||||
|
@ -5,6 +5,7 @@ import types
|
||||
import gc
|
||||
import pytest
|
||||
import rethinkdb
|
||||
import time
|
||||
|
||||
logging.basicConfig(stream=sys.stderr, level=logging.INFO,
|
||||
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
|
||||
@ -88,3 +89,51 @@ def test_slice(r, my_table):
|
||||
# connection should be closed after finished iterating over results
|
||||
assert not r.last_conn.is_open()
|
||||
assert n == 5
|
||||
|
||||
def test_svcreg(r):
|
||||
# import pdb; pdb.set_trace()
|
||||
svcreg = rethinkstuff.ServiceRegistry(r)
|
||||
assert svcreg.available_service("yes-such-role") == None
|
||||
svc0 = {
|
||||
"role": "yes-such-role",
|
||||
"load": 100.0,
|
||||
"heartbeat_interval": 0.2,
|
||||
}
|
||||
svc1 = {
|
||||
"role": "yes-such-role",
|
||||
"load": 200.0,
|
||||
"heartbeat_interval": 0.2,
|
||||
}
|
||||
svc0["id"] = svcreg.heartbeat(svc0)
|
||||
svc1["id"] = svcreg.heartbeat(svc1)
|
||||
assert svc0["id"] is not None
|
||||
assert svc1["id"] is not None
|
||||
assert svc0["id"] != svc1["id"]
|
||||
time.sleep(0.1)
|
||||
assert svcreg.available_service("no-such-role") == None
|
||||
assert svcreg.available_service("yes-such-role")["id"] == svc0["id"]
|
||||
|
||||
svc1["load"] = 50.0
|
||||
svcreg.heartbeat(svc1)
|
||||
time.sleep(0.1)
|
||||
assert svcreg.available_service("no-such-role") == None
|
||||
assert svcreg.available_service("yes-such-role")["id"] == svc1["id"]
|
||||
|
||||
svc1["load"] = 200.0
|
||||
svcreg.heartbeat(svc1)
|
||||
time.sleep(0.1)
|
||||
assert svcreg.available_service("no-such-role") == None
|
||||
assert svcreg.available_service("yes-such-role")["id"] == svc0["id"]
|
||||
svcreg.heartbeat(svc1)
|
||||
time.sleep(0.1)
|
||||
|
||||
svcreg.heartbeat(svc1)
|
||||
time.sleep(0.4)
|
||||
assert svcreg.available_service("no-such-role") == None
|
||||
assert svcreg.available_service("yes-such-role")["id"] == svc1["id"]
|
||||
|
||||
svcreg.unregister(svc1["id"])
|
||||
time.sleep(0.1)
|
||||
assert svcreg.available_service("no-such-role") == None
|
||||
assert svcreg.available_service("yes-such-role") == None
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user