Merge pull request #8 from internetarchive/adds-cron-garbage-collector

Adds cron garbage collector
This commit is contained in:
Noah Levitt 2017-10-03 16:36:38 -07:00 committed by GitHub
commit e5b2e2c327
6 changed files with 146 additions and 1 deletions

58
doublethink/cli.py Normal file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python
'''
doublethink/cli.py - doublethink Command Line Tools
Copyright (C) 2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import os, sys
import argparse
import doublethink
import logging
def purge_stale_services(argv=None):
"""Command-line utility to periodically purge stale entries from the "services" table.
It is designed to be used in conjunction with cron.
"""
argv = argv or sys.argv
arg_parser = argparse.ArgumentParser(
prog=os.path.basename(argv[0]),
description='doublethink-purge-stale-services: utility to periodically purge stale entries from the "services" table.')
arg_parser.add_argument("-d", "--rethinkdb-db", required=True,
dest="database",
help="A RethinkDB database containing a 'services' table")
arg_parser.add_argument("-s", "--rethinkdb-servers",
metavar="SERVERS", dest="servers", default='localhost',
help="rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org")
arg_parser.add_argument(
'-v', '--verbose', dest='log_level', action='store_const',
default=logging.INFO, const=logging.DEBUG, help=(
'verbose logging'))
args = arg_parser.parse_args(argv[1:])
logging.basicConfig(
stream=sys.stdout, level=args.log_level, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
args.servers = [srv.strip() for srv in args.servers.split(",")]
rethinker = doublethink.Rethinker(servers=args.servers, db=args.database)
registry = doublethink.services.ServiceRegistry(rethinker)
registry.purge_stale_services()
return 0

View File

@ -312,3 +312,11 @@ class ServiceRegistry(object):
available_service = healthy_service
available_services = healthy_services
def purge_stale_services(self, ttls_until_deletion=2):
query = self.rr.table('services').filter(
lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion)
).delete()
logging.debug("Running query: %s", query)
result = query.run()
logging.debug("Results: %s", result)
return result

View File

@ -18,4 +18,9 @@ setuptools.setup(
description='rethinkdb python library',
long_description=codecs.open(
'README.rst', mode='r', encoding='utf-8').read(),
entry_points={
'console_scripts': [
'doublethink-purge-stale-services=doublethink.cli:purge_stale_services',
]
},
)

View File

@ -11,7 +11,7 @@ do
docker run --rm -it --volume="$script_dir/..:/doublethink" internetarchive/rethinkdb /sbin/my_init -- \
bash -x -c "cd /tmp && git clone /doublethink \
&& cd /tmp/doublethink \
&& (cd /doublethink && git diff) | patch -p1 \
&& (cd /doublethink && git diff HEAD) | patch -p1 \
&& virtualenv -p $python /tmp/venv \
&& source /tmp/venv/bin/activate \
&& pip install pytest . \

64
tests/test_cli.py Normal file
View File

@ -0,0 +1,64 @@
'''
test_cli.py - unit tests for doublethink CLI
Copyright (C) 2015-2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import doublethink
import doublethink.cli
import logging
import sys
import pytest
import rethinkdb as r
import pkg_resources
logging.basicConfig(stream=sys.stderr, level=logging.INFO,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
class RethinkerForTesting(doublethink.Rethinker):
def __init__(self, *args, **kwargs):
super(RethinkerForTesting, self).__init__(*args, **kwargs)
def _random_server_connection(self):
self.last_conn = super(RethinkerForTesting, self)._random_server_connection()
# logging.info("self.last_conn=%s", self.last_conn)
return self.last_conn
@pytest.fixture(scope="module")
def rr():
rr = RethinkerForTesting()
try:
rr.db_drop("doublethink_test_db").run()
except r.errors.ReqlOpFailedError:
pass
result = rr.db_create("doublethink_test_db").run()
assert not rr.last_conn.is_open()
assert result["dbs_created"] == 1
return RethinkerForTesting(db="doublethink_test_db")
def test_cli(capsys, rr):
entrypoint = pkg_resources.get_entry_map(
'doublethink')['console_scripts']['doublethink-purge-stale-services']
callable = entrypoint.resolve()
with pytest.raises(SystemExit) as exit:
callable(['doublethink-purge-stale-services'])
print(dir(exit))
assert exit.value.code != 0
out, err = capsys.readouterr()
with pytest.raises(SystemExit) as exit:
# this wrap with sys.exit matches what occurs in the generated command
sys.exit(callable(['doublethink-purge-stale-services', '-d', 'test']))
assert exit.value.code == 0
out, err = capsys.readouterr()

View File

@ -258,3 +258,13 @@ def test_svcreg_heartbeat_server_down(rr):
assert not 'host' in svc0
assert not 'pid' in svc0
def test_purge_stale_services(rr):
rr.table('services').delete().run()
rr.table('services').insert({ 'id': 'old-service', "last_heartbeat": r.now(), 'ttl': 0.4 }).run()
time.sleep(1)
rr.table('services').insert({ 'id': 'new-service', "last_heartbeat": r.now(), 'ttl': 0.4 }).run()
svcreg = doublethink.ServiceRegistry(rr)
assert rr.table('services').count().run() == 2
svcreg.purge_stale_services()
assert rr.table('services').count().run() == 1
rr.table('services').delete().run()