mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
split into multiple source files
This commit is contained in:
parent
000e4d9cf6
commit
6d667c77b5
@ -17,19 +17,19 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
'''
|
||||
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
import types
|
||||
import socket
|
||||
import os
|
||||
import rethinkdb
|
||||
import datetime
|
||||
|
||||
from rethinkstuff.orm import Document
|
||||
from rethinkstuff.rethinker import Rethinker
|
||||
from rethinkstuff.services import ServiceRegistry
|
||||
|
||||
__all__ = ['Document', 'Rethinker', 'ServiceRegistry', 'UTC', 'utcnow']
|
||||
|
||||
try:
|
||||
UTC = datetime.timezone.utc
|
||||
except:
|
||||
UTC = r.make_timezone("00:00")
|
||||
UTC = rethinkdb.make_timezone("00:00")
|
||||
|
||||
def utcnow():
|
||||
"""Convenience function to get timezone-aware UTC datetime. RethinkDB
|
||||
@ -38,379 +38,3 @@ def utcnow():
|
||||
2 doesn't come with a timezone implementation."""
|
||||
return datetime.datetime.now(UTC)
|
||||
|
||||
class RethinkerWrapper(object):
|
||||
logger = logging.getLogger('rethinkstuff.RethinkerWrapper')
|
||||
def __init__(self, rethinker, wrapped):
|
||||
self.rethinker = rethinker
|
||||
self.wrapped = wrapped
|
||||
|
||||
def __getattr__(self, name):
|
||||
delegate = getattr(self.wrapped, name)
|
||||
return self.rethinker.wrap(delegate)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.rethinker.wrap(self.wrapped.__getitem__)(key)
|
||||
|
||||
def __repr__(self):
|
||||
return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
|
||||
|
||||
def run(self, db=None):
|
||||
self.wrapped.run # raise AttributeError early
|
||||
while True:
|
||||
conn = self.rethinker._random_server_connection()
|
||||
is_iter = False
|
||||
try:
|
||||
result = self.wrapped.run(conn, db=db or self.rethinker.dbname)
|
||||
if hasattr(result, '__next__'):
|
||||
is_iter = True
|
||||
def gen():
|
||||
try:
|
||||
yield # empty yield, see comment below
|
||||
for x in result:
|
||||
yield x
|
||||
finally:
|
||||
result.close()
|
||||
conn.close()
|
||||
g = gen()
|
||||
# Start executing the generator, leaving off after the
|
||||
# empty yield. If we didn't do this, and the caller never
|
||||
# started the generator, the finally block would never run
|
||||
# and the connection would stay open.
|
||||
next(g)
|
||||
return g
|
||||
else:
|
||||
return result
|
||||
except r.ReqlTimeoutError as e:
|
||||
time.sleep(0.5)
|
||||
finally:
|
||||
if not is_iter:
|
||||
conn.close(noreply_wait=False)
|
||||
|
||||
class Rethinker(object):
|
||||
'''
|
||||
>>> r = Rethinker(db='my_db')
|
||||
>>> doc = r.table('my_table').get(1).run()
|
||||
'''
|
||||
logger = logging.getLogger('rethinkstuff.Rethinker')
|
||||
|
||||
def __init__(self, servers=['localhost'], db=None):
|
||||
if isinstance(servers, str):
|
||||
self.servers = [servers]
|
||||
else:
|
||||
self.servers = servers
|
||||
self.dbname = db
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
while True:
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
try:
|
||||
host, port = server.split(':')
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
'will keep trying after failure connecting to '
|
||||
'rethinkdb server at %s: %s', server, e)
|
||||
time.sleep(0.5)
|
||||
|
||||
def wrap(self, delegate):
|
||||
if isinstance(delegate, (types.FunctionType, types.MethodType)):
|
||||
def wrapper(*args, **kwargs):
|
||||
result = delegate(*args, **kwargs)
|
||||
if result is not None:
|
||||
return RethinkerWrapper(self, result)
|
||||
else:
|
||||
return None
|
||||
return wrapper
|
||||
else:
|
||||
return delegate
|
||||
|
||||
def __getattr__(self, name):
|
||||
delegate = getattr(r, name)
|
||||
return self.wrap(delegate)
|
||||
|
||||
class ServiceRegistry(object):
|
||||
'''
|
||||
status_info is dict, should have at least these fields
|
||||
{
|
||||
'id': ..., # generated by rethinkdb
|
||||
'role': 'brozzler-worker',
|
||||
'load': 0.5, # load score
|
||||
'heartbeat_interval': 20.0,
|
||||
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
|
||||
'pid': 1234, # set in svcreg.heartbeat() as a fallback
|
||||
'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
|
||||
'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat()
|
||||
... plus anything else you want...
|
||||
}
|
||||
'''
|
||||
|
||||
logger = logging.getLogger('rethinkstuff.ServiceRegistry')
|
||||
|
||||
def __init__(self, rethinker):
|
||||
self.r = rethinker
|
||||
self._ensure_table()
|
||||
|
||||
def _ensure_table(self):
|
||||
dbs = self.r.db_list().run()
|
||||
if not self.r.dbname in dbs:
|
||||
self.logger.info('creating rethinkdb database %s', repr(self.r.dbname))
|
||||
self.r.db_create(self.r.dbname).run()
|
||||
tables = self.r.table_list().run()
|
||||
if not 'services' in tables:
|
||||
self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname))
|
||||
self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run()
|
||||
# self.r.table('sites').index_create...?
|
||||
|
||||
def heartbeat(self, status_info):
|
||||
'''
|
||||
Returns updated status info on success, un-updated status info on
|
||||
failure.
|
||||
'''
|
||||
updated_status_info = dict(status_info)
|
||||
updated_status_info['last_heartbeat'] = r.now()
|
||||
if not 'first_heartbeat' in updated_status_info:
|
||||
updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat']
|
||||
if not 'host' in updated_status_info:
|
||||
updated_status_info['host'] = socket.gethostname()
|
||||
if not 'pid' in updated_status_info:
|
||||
updated_status_info['pid'] = os.getpid()
|
||||
try:
|
||||
result = self.r.table('services').insert(
|
||||
updated_status_info, conflict='replace',
|
||||
return_changes=True).run()
|
||||
return result['changes'][0]['new_val'] # XXX check
|
||||
except:
|
||||
self.logger.error('error updating service registry', exc_info=True)
|
||||
return status_info
|
||||
|
||||
def unregister(self, id):
|
||||
result = self.r.table('services').get(id).delete().run()
|
||||
if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
|
||||
self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
|
||||
|
||||
def available_service(self, role):
|
||||
try:
|
||||
result = self.r.table('services').filter({"role":role}).filter(
|
||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
|
||||
).order_by("load")[0].run()
|
||||
return result
|
||||
except r.ReqlNonExistenceError:
|
||||
return None
|
||||
|
||||
def available_services(self, role=None):
|
||||
try:
|
||||
query = self.r.table('services')
|
||||
if role:
|
||||
query = query.filter({"role":role})
|
||||
query = query.filter(
|
||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
|
||||
).order_by("load")
|
||||
result = query.run()
|
||||
return result
|
||||
except r.ReqlNonExistenceError:
|
||||
return []
|
||||
|
||||
class WatchedDict(dict):
|
||||
def __init__(self, d, callback, field):
|
||||
self.callback = callback
|
||||
self.field = field
|
||||
for key in d:
|
||||
dict.__setitem__(self, key, watch(
|
||||
d[key], callback=self.callback, field=self.field))
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.callback(self.field)
|
||||
return dict.__setitem__(self, key, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def __delitem__(self, key):
|
||||
self.callback(self.field)
|
||||
return dict.__delitem__(self, key)
|
||||
|
||||
def clear(self):
|
||||
self.callback(self.field)
|
||||
return dict.clear(self)
|
||||
|
||||
def pop(self, *args):
|
||||
self.callback(self.field)
|
||||
return dict.pop(self, *args)
|
||||
|
||||
def popitem(self):
|
||||
self.callback(self.field)
|
||||
return dict.popitem()
|
||||
|
||||
def setdefault(self, *args):
|
||||
self.callback(self.field)
|
||||
if len(args) == 2:
|
||||
return dict.setdefault(self, args[0], watch(
|
||||
args[1], callback=self.callback, field=self.field))
|
||||
else:
|
||||
return dict.setdefault(self, *args)
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
# looks a little tricky
|
||||
raise Exception('not implemented')
|
||||
|
||||
class WatchedList(list):
|
||||
def __init__(self, l, callback, field):
|
||||
self.callback = callback
|
||||
self.field = field
|
||||
for item in l:
|
||||
list.append(self, watch(item, callback=callback, field=self.field))
|
||||
|
||||
def __setitem__(self, index, value):
|
||||
self.callback(self.field)
|
||||
return list.__setitem__(self, index, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def __delitem__(self, index):
|
||||
self.callback(self.field)
|
||||
return list.__delitem__(self, index)
|
||||
|
||||
def append(self, value):
|
||||
self.callback(self.field)
|
||||
return list.append(self, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def extend(self, value):
|
||||
self.callback(self.field)
|
||||
return list.extend(self, watch(
|
||||
list(value), callback=self.callback, field=self.field))
|
||||
|
||||
def insert(self, index, value):
|
||||
self.callback(self.field)
|
||||
return list.insert(self, index, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def remove(self, value):
|
||||
self.callback(self.field)
|
||||
return list.remove(self, value)
|
||||
|
||||
def pop(self, index=-1):
|
||||
self.callback(self.field)
|
||||
return list.pop(self, index)
|
||||
|
||||
def clear(self):
|
||||
self.callback(self.field)
|
||||
return list.clear(self)
|
||||
|
||||
def sort(self, key=None, reverse=False):
|
||||
self.callback(self.field)
|
||||
return list.sort(self, key, reverse)
|
||||
|
||||
def reverse(self):
|
||||
self.callback(self.field)
|
||||
return list.reverse(self)
|
||||
|
||||
def watch(obj, callback, field):
|
||||
if isinstance(obj, dict):
|
||||
return WatchedDict(obj, callback, field)
|
||||
elif isinstance(obj, list):
|
||||
return WatchedList(obj, callback, field)
|
||||
else:
|
||||
return obj
|
||||
|
||||
class Document(dict, object):
|
||||
'''
|
||||
Base class for documents in rethinkdb.
|
||||
|
||||
You should subclass this class for each of your rethinkdb tables. You can
|
||||
add custom functionality in your subclass if appropriate.
|
||||
|
||||
This class keeps track of changes made to the object and any nested fields.
|
||||
After you have made some changes, call update() to persist them to the
|
||||
database.
|
||||
|
||||
Changes in nested fields result in updates to their first-level ancestor
|
||||
field. For example, if your document starts as {'a': {'b': 'c'}}, then
|
||||
you run d['a']['x'] = 'y', then the update will replace the whole 'a'
|
||||
field. Nested field updates get too complicated any other way.
|
||||
|
||||
The primary key must be `id`, the rethinkdb default. (XXX we could find out
|
||||
what the primary key is from the "table_config" system table.)
|
||||
'''
|
||||
def __init__(self, rethinker, d={}):
|
||||
dict.__setattr__(self, '_r', rethinker)
|
||||
for k in d:
|
||||
dict.__setitem__(
|
||||
self, k, watch(d[k], callback=self._updated, field=k))
|
||||
self._clear_updates()
|
||||
|
||||
def _clear_updates(self):
|
||||
dict.__setattr__(self, '_updates', {})
|
||||
dict.__setattr__(self, '_deletes', set())
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
dict.__setitem__(
|
||||
self, key, watch(value, callback=self._updated, field=key))
|
||||
self._updated(key)
|
||||
|
||||
__setattr__ = __setitem__
|
||||
__getattr__ = dict.__getitem__
|
||||
|
||||
def __delitem__(self, key):
|
||||
dict.__delitem__(self, key)
|
||||
self._deletes.add(key)
|
||||
if key in self._updates:
|
||||
del self._updates[key]
|
||||
|
||||
# XXX do we need the other stuff like in WatchedDict?
|
||||
|
||||
def _updated(self, field):
|
||||
# callback for all updates
|
||||
self._updates[field] = self[field]
|
||||
if field in self._deletes:
|
||||
self._deletes.remove(field)
|
||||
|
||||
@property
|
||||
def table(self):
|
||||
'''
|
||||
Name of the rethinkdb table.
|
||||
|
||||
Defaults to the name of the class, lowercased. Can be overridden.
|
||||
'''
|
||||
return self.__class__.__name__.lower()
|
||||
|
||||
def table_create(self):
|
||||
'''
|
||||
Creates the table.
|
||||
|
||||
Subclasses may want to override to do more things, such as creating
|
||||
indexes.
|
||||
'''
|
||||
self._r.table_create(self.table).run()
|
||||
|
||||
def insert(self):
|
||||
result = self._r.table(self.table).insert(self).run()
|
||||
if 'generated_keys' in result:
|
||||
dict.__setitem__(self, 'id', result['generated_keys'][0])
|
||||
self._clear_updates()
|
||||
|
||||
def update(self):
|
||||
# hmm, masks dict.update()
|
||||
if self._updates:
|
||||
# r.literal() to replace, not merge with, nested fields
|
||||
updates = {
|
||||
field: r.literal(
|
||||
self._updates[field]) for field in self._updates}
|
||||
self._r.table(self.table).get(self.id).update(updates).run()
|
||||
if self._deletes:
|
||||
self._r.table(self.table).replace(
|
||||
r.row.without(self._deletes)).run()
|
||||
self._clear_updates()
|
||||
|
||||
def refresh(self):
|
||||
'''
|
||||
Refresh from the database.
|
||||
'''
|
||||
d = self._r.table(self.table).get(self.id).run()
|
||||
for k in d:
|
||||
dict.__setitem__(
|
||||
self, k, watch(d[k], callback=self._updated, field=k))
|
||||
|
||||
|
||||
|
220
rethinkstuff/orm.py
Normal file
220
rethinkstuff/orm.py
Normal file
@ -0,0 +1,220 @@
|
||||
'''
|
||||
rethinkstuff/orm.py - rethinkdb ORM
|
||||
|
||||
Copyright (C) 2017 Internet Archive
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
'''
|
||||
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
|
||||
class WatchedDict(dict):
|
||||
def __init__(self, d, callback, field):
|
||||
self.callback = callback
|
||||
self.field = field
|
||||
for key in d:
|
||||
dict.__setitem__(self, key, watch(
|
||||
d[key], callback=self.callback, field=self.field))
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.callback(self.field)
|
||||
return dict.__setitem__(self, key, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def __delitem__(self, key):
|
||||
self.callback(self.field)
|
||||
return dict.__delitem__(self, key)
|
||||
|
||||
def clear(self):
|
||||
self.callback(self.field)
|
||||
return dict.clear(self)
|
||||
|
||||
def pop(self, *args):
|
||||
self.callback(self.field)
|
||||
return dict.pop(self, *args)
|
||||
|
||||
def popitem(self):
|
||||
self.callback(self.field)
|
||||
return dict.popitem()
|
||||
|
||||
def setdefault(self, *args):
|
||||
self.callback(self.field)
|
||||
if len(args) == 2:
|
||||
return dict.setdefault(self, args[0], watch(
|
||||
args[1], callback=self.callback, field=self.field))
|
||||
else:
|
||||
return dict.setdefault(self, *args)
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
# looks a little tricky
|
||||
raise Exception('not implemented')
|
||||
|
||||
class WatchedList(list):
|
||||
def __init__(self, l, callback, field):
|
||||
self.callback = callback
|
||||
self.field = field
|
||||
for item in l:
|
||||
list.append(self, watch(item, callback=callback, field=self.field))
|
||||
|
||||
def __setitem__(self, index, value):
|
||||
self.callback(self.field)
|
||||
return list.__setitem__(self, index, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def __delitem__(self, index):
|
||||
self.callback(self.field)
|
||||
return list.__delitem__(self, index)
|
||||
|
||||
def append(self, value):
|
||||
self.callback(self.field)
|
||||
return list.append(self, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def extend(self, value):
|
||||
self.callback(self.field)
|
||||
return list.extend(self, watch(
|
||||
list(value), callback=self.callback, field=self.field))
|
||||
|
||||
def insert(self, index, value):
|
||||
self.callback(self.field)
|
||||
return list.insert(self, index, watch(
|
||||
value, callback=self.callback, field=self.field))
|
||||
|
||||
def remove(self, value):
|
||||
self.callback(self.field)
|
||||
return list.remove(self, value)
|
||||
|
||||
def pop(self, index=-1):
|
||||
self.callback(self.field)
|
||||
return list.pop(self, index)
|
||||
|
||||
def clear(self):
|
||||
self.callback(self.field)
|
||||
return list.clear(self)
|
||||
|
||||
def sort(self, key=None, reverse=False):
|
||||
self.callback(self.field)
|
||||
return list.sort(self, key, reverse)
|
||||
|
||||
def reverse(self):
|
||||
self.callback(self.field)
|
||||
return list.reverse(self)
|
||||
|
||||
def watch(obj, callback, field):
|
||||
if isinstance(obj, dict):
|
||||
return WatchedDict(obj, callback, field)
|
||||
elif isinstance(obj, list):
|
||||
return WatchedList(obj, callback, field)
|
||||
else:
|
||||
return obj
|
||||
|
||||
class Document(dict, object):
|
||||
'''
|
||||
Base class for ORM.
|
||||
|
||||
You should subclass this class for each of your rethinkdb tables. You can
|
||||
add custom functionality in your subclass if appropriate.
|
||||
|
||||
This class keeps track of changes made to the object and any nested fields.
|
||||
After you have made some changes, call update() to persist them to the
|
||||
database.
|
||||
|
||||
Changes in nested fields result in updates to their first-level ancestor
|
||||
field. For example, if your document starts as {'a': {'b': 'c'}}, then
|
||||
you run d['a']['x'] = 'y', then the update will replace the whole 'a'
|
||||
field. Nested field updates get too complicated any other way.
|
||||
|
||||
The primary key must be `id`, the rethinkdb default. (XXX we could find out
|
||||
what the primary key is from the "table_config" system table.)
|
||||
'''
|
||||
def __init__(self, rethinker, d={}):
|
||||
dict.__setattr__(self, '_r', rethinker)
|
||||
for k in d:
|
||||
dict.__setitem__(
|
||||
self, k, watch(d[k], callback=self._updated, field=k))
|
||||
self._clear_updates()
|
||||
|
||||
def _clear_updates(self):
|
||||
dict.__setattr__(self, '_updates', {})
|
||||
dict.__setattr__(self, '_deletes', set())
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
dict.__setitem__(
|
||||
self, key, watch(value, callback=self._updated, field=key))
|
||||
self._updated(key)
|
||||
|
||||
__setattr__ = __setitem__
|
||||
__getattr__ = dict.__getitem__
|
||||
|
||||
def __delitem__(self, key):
|
||||
dict.__delitem__(self, key)
|
||||
self._deletes.add(key)
|
||||
if key in self._updates:
|
||||
del self._updates[key]
|
||||
|
||||
# XXX do we need the other stuff like in WatchedDict?
|
||||
|
||||
def _updated(self, field):
|
||||
# callback for all updates
|
||||
self._updates[field] = self[field]
|
||||
if field in self._deletes:
|
||||
self._deletes.remove(field)
|
||||
|
||||
@property
|
||||
def table(self):
|
||||
'''
|
||||
Name of the rethinkdb table.
|
||||
|
||||
Defaults to the name of the class, lowercased. Can be overridden.
|
||||
'''
|
||||
return self.__class__.__name__.lower()
|
||||
|
||||
def table_create(self):
|
||||
'''
|
||||
Creates the table.
|
||||
|
||||
Subclasses may want to override this method to do more things, such as
|
||||
creating indexes.
|
||||
'''
|
||||
self._r.table_create(self.table).run()
|
||||
|
||||
def insert(self):
|
||||
result = self._r.table(self.table).insert(self).run()
|
||||
if 'generated_keys' in result:
|
||||
dict.__setitem__(self, 'id', result['generated_keys'][0])
|
||||
self._clear_updates()
|
||||
|
||||
def update(self):
|
||||
# hmm, masks dict.update()
|
||||
if self._updates:
|
||||
# r.literal() to replace, not merge with, nested fields
|
||||
updates = {
|
||||
field: r.literal(
|
||||
self._updates[field]) for field in self._updates}
|
||||
self._r.table(self.table).get(self.id).update(updates).run()
|
||||
if self._deletes:
|
||||
self._r.table(self.table).replace(
|
||||
r.row.without(self._deletes)).run()
|
||||
self._clear_updates()
|
||||
|
||||
def refresh(self):
|
||||
'''
|
||||
Refresh from the database.
|
||||
'''
|
||||
d = self._r.table(self.table).get(self.id).run()
|
||||
for k in d:
|
||||
dict.__setitem__(
|
||||
self, k, watch(d[k], callback=self._updated, field=k))
|
||||
|
||||
|
119
rethinkstuff/rethinker.py
Normal file
119
rethinkstuff/rethinker.py
Normal file
@ -0,0 +1,119 @@
|
||||
'''
|
||||
rethinkstuff/rethinker.py - rethinkdb connection-manager
|
||||
|
||||
Copyright (C) 2015-2017 Internet Archive
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
'''
|
||||
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
import types
|
||||
|
||||
class RethinkerWrapper(object):
|
||||
logger = logging.getLogger('rethinkstuff.RethinkerWrapper')
|
||||
def __init__(self, rethinker, wrapped):
|
||||
self.rethinker = rethinker
|
||||
self.wrapped = wrapped
|
||||
|
||||
def __getattr__(self, name):
|
||||
delegate = getattr(self.wrapped, name)
|
||||
return self.rethinker.wrap(delegate)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.rethinker.wrap(self.wrapped.__getitem__)(key)
|
||||
|
||||
def __repr__(self):
|
||||
return '<RethinkerWrapper{}>'.format(repr(self.wrapped))
|
||||
|
||||
def run(self, db=None):
|
||||
self.wrapped.run # raise AttributeError early
|
||||
while True:
|
||||
conn = self.rethinker._random_server_connection()
|
||||
is_iter = False
|
||||
try:
|
||||
result = self.wrapped.run(conn, db=db or self.rethinker.dbname)
|
||||
if hasattr(result, '__next__'):
|
||||
is_iter = True
|
||||
def gen():
|
||||
try:
|
||||
yield # empty yield, see comment below
|
||||
for x in result:
|
||||
yield x
|
||||
finally:
|
||||
result.close()
|
||||
conn.close()
|
||||
g = gen()
|
||||
# Start executing the generator, leaving off after the
|
||||
# empty yield. If we didn't do this, and the caller never
|
||||
# started the generator, the finally block would never run
|
||||
# and the connection would stay open.
|
||||
next(g)
|
||||
return g
|
||||
else:
|
||||
return result
|
||||
except r.ReqlTimeoutError as e:
|
||||
time.sleep(0.5)
|
||||
finally:
|
||||
if not is_iter:
|
||||
conn.close(noreply_wait=False)
|
||||
|
||||
class Rethinker(object):
|
||||
'''
|
||||
>>> r = Rethinker(db='my_db')
|
||||
>>> doc = r.table('my_table').get(1).run()
|
||||
'''
|
||||
logger = logging.getLogger('rethinkstuff.Rethinker')
|
||||
|
||||
def __init__(self, servers=['localhost'], db=None):
|
||||
if isinstance(servers, str):
|
||||
self.servers = [servers]
|
||||
else:
|
||||
self.servers = servers
|
||||
self.dbname = db
|
||||
|
||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||
# "Best practices: Managing connections: a connection per request"
|
||||
def _random_server_connection(self):
|
||||
while True:
|
||||
server = random.choice(self.servers)
|
||||
try:
|
||||
try:
|
||||
host, port = server.split(':')
|
||||
return r.connect(host=host, port=port)
|
||||
except ValueError:
|
||||
return r.connect(host=server)
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
'will keep trying after failure connecting to '
|
||||
'rethinkdb server at %s: %s', server, e)
|
||||
time.sleep(0.5)
|
||||
|
||||
def wrap(self, delegate):
|
||||
if isinstance(delegate, (types.FunctionType, types.MethodType)):
|
||||
def wrapper(*args, **kwargs):
|
||||
result = delegate(*args, **kwargs)
|
||||
if result is not None:
|
||||
return RethinkerWrapper(self, result)
|
||||
else:
|
||||
return None
|
||||
return wrapper
|
||||
else:
|
||||
return delegate
|
||||
|
||||
def __getattr__(self, name):
|
||||
delegate = getattr(r, name)
|
||||
return self.wrap(delegate)
|
||||
|
105
rethinkstuff/services.py
Normal file
105
rethinkstuff/services.py
Normal file
@ -0,0 +1,105 @@
|
||||
'''
|
||||
rethinkstuff/services.py - rethinkdb service registry
|
||||
|
||||
Copyright (C) 2015-2017 Internet Archive
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
'''
|
||||
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
import socket
|
||||
import os
|
||||
|
||||
class ServiceRegistry(object):
|
||||
'''
|
||||
status_info is dict, should have at least these fields
|
||||
{
|
||||
'id': ..., # generated by rethinkdb
|
||||
'role': 'brozzler-worker',
|
||||
'load': 0.5, # load score
|
||||
'heartbeat_interval': 20.0,
|
||||
'host': 'wbgrp-svc999.us.archive.org', # set in svcreg.heartbeat() as a fallback
|
||||
'pid': 1234, # set in svcreg.heartbeat() as a fallback
|
||||
'first_heartbeat': '2015-10-30T03:39:40.080814', # set in svcreg.heartbeat()
|
||||
'last_heartbeat': '2015-10-30T05:54:35.422866', # set in svcreg.heartbeat()
|
||||
... plus anything else you want...
|
||||
}
|
||||
'''
|
||||
|
||||
logger = logging.getLogger('rethinkstuff.ServiceRegistry')
|
||||
|
||||
def __init__(self, rethinker):
|
||||
self.r = rethinker
|
||||
self._ensure_table()
|
||||
|
||||
def _ensure_table(self):
|
||||
dbs = self.r.db_list().run()
|
||||
if not self.r.dbname in dbs:
|
||||
self.logger.info('creating rethinkdb database %s', repr(self.r.dbname))
|
||||
self.r.db_create(self.r.dbname).run()
|
||||
tables = self.r.table_list().run()
|
||||
if not 'services' in tables:
|
||||
self.logger.info("creating rethinkdb table 'services' in database %s", repr(self.r.dbname))
|
||||
self.r.table_create('services', shards=1, replicas=min(3, len(self.r.servers))).run()
|
||||
# self.r.table('sites').index_create...?
|
||||
|
||||
def heartbeat(self, status_info):
|
||||
'''
|
||||
Returns updated status info on success, un-updated status info on
|
||||
failure.
|
||||
'''
|
||||
updated_status_info = dict(status_info)
|
||||
updated_status_info['last_heartbeat'] = r.now()
|
||||
if not 'first_heartbeat' in updated_status_info:
|
||||
updated_status_info['first_heartbeat'] = updated_status_info['last_heartbeat']
|
||||
if not 'host' in updated_status_info:
|
||||
updated_status_info['host'] = socket.gethostname()
|
||||
if not 'pid' in updated_status_info:
|
||||
updated_status_info['pid'] = os.getpid()
|
||||
try:
|
||||
result = self.r.table('services').insert(
|
||||
updated_status_info, conflict='replace',
|
||||
return_changes=True).run()
|
||||
return result['changes'][0]['new_val'] # XXX check
|
||||
except:
|
||||
self.logger.error('error updating service registry', exc_info=True)
|
||||
return status_info
|
||||
|
||||
def unregister(self, id):
|
||||
result = self.r.table('services').get(id).delete().run()
|
||||
if result != {'deleted':1,'errors':0,'inserted':0,'replaced':0,'skipped':0,'unchanged':0}:
|
||||
self.logger.warn('unexpected result attempting to delete id=%s from rethinkdb services table: %s', id, result)
|
||||
|
||||
def available_service(self, role):
|
||||
try:
|
||||
result = self.r.table('services').filter({"role":role}).filter(
|
||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
|
||||
).order_by("load")[0].run()
|
||||
return result
|
||||
except r.ReqlNonExistenceError:
|
||||
return None
|
||||
|
||||
def available_services(self, role=None):
|
||||
try:
|
||||
query = self.r.table('services')
|
||||
if role:
|
||||
query = query.filter({"role":role})
|
||||
query = query.filter(
|
||||
lambda svc: r.now().sub(svc["last_heartbeat"]) < 3 * svc["heartbeat_interval"] #.default(20.0)
|
||||
).order_by("load")
|
||||
result = query.run()
|
||||
return result
|
||||
except r.ReqlNonExistenceError:
|
||||
return []
|
||||
|
2
setup.py
2
setup.py
@ -3,7 +3,7 @@ import codecs
|
||||
|
||||
setuptools.setup(
|
||||
name='rethinkstuff',
|
||||
version='0.2.0.dev57',
|
||||
version='0.2.0.dev58',
|
||||
packages=['rethinkstuff'],
|
||||
classifiers=[
|
||||
'Programming Language :: Python :: 2.7',
|
||||
|
Loading…
x
Reference in New Issue
Block a user