diff --git a/doublethink/cli.py b/doublethink/cli.py new file mode 100644 index 0000000..630ef6a --- /dev/null +++ b/doublethink/cli.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +''' +doublethink/cli.py - doublethink Command Line Tools + +Copyright (C) 2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import os, sys +import argparse +import doublethink +import logging + +def purge_stale_services(argv=None): + """Command-line utility to periodically purge stale entries from the "services" table. + + It is designed to be used in conjunction with cron. + """ + argv = argv or sys.argv + arg_parser = argparse.ArgumentParser( + prog=os.path.basename(argv[0]), + description='doublethink-purge-stale-services: utility to periodically purge stale entries from the "services" table.') + + arg_parser.add_argument("-d", "--rethinkdb-db", required=True, + dest="database", + help="A RethinkDB database containing a 'services' table") + + arg_parser.add_argument("-s", "--rethinkdb-servers", + metavar="SERVERS", dest="servers", default='localhost', + help="rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") + + arg_parser.add_argument( + '-v', '--verbose', dest='log_level', action='store_const', + default=logging.INFO, const=logging.DEBUG, help=( + 'verbose logging')) + args = arg_parser.parse_args(argv[1:]) + logging.basicConfig( + stream=sys.stdout, level=args.log_level, format=( + '%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) + + args.servers = [srv.strip() for srv in args.servers.split(",")] + + rethinker = doublethink.Rethinker(servers=args.servers, db=args.database) + registry = doublethink.services.ServiceRegistry(rethinker) + registry.purge_stale_services() + return 0 \ No newline at end of file diff --git a/doublethink/services.py b/doublethink/services.py index 38a0f93..2289190 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -312,3 +312,11 @@ class ServiceRegistry(object): available_service = healthy_service available_services = healthy_services + def purge_stale_services(self, ttls_until_deletion=2): + query = self.rr.table('services').filter( + lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion) + ).delete() + logging.debug("Running query: %s", query) + result = query.run() + logging.debug("Results: %s", result) + return result diff --git a/setup.py b/setup.py index 48d7ea7..f4b5174 100644 --- a/setup.py +++ b/setup.py @@ -18,4 +18,9 @@ setuptools.setup( description='rethinkdb python library', long_description=codecs.open( 'README.rst', mode='r', encoding='utf-8').read(), + entry_points={ + 'console_scripts': [ + 'doublethink-purge-stale-services=doublethink.cli:purge_stale_services', + ] + }, ) diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 78464ba..ff06b7f 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -11,7 +11,7 @@ do docker run --rm -it --volume="$script_dir/..:/doublethink" internetarchive/rethinkdb /sbin/my_init -- \ bash -x -c "cd /tmp && git clone /doublethink \ && cd /tmp/doublethink \ - && (cd /doublethink && git diff) | patch -p1 \ + && (cd /doublethink && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ && pip install pytest . \ diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..f26c6bd --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,64 @@ +''' +test_cli.py - unit tests for doublethink CLI + +Copyright (C) 2015-2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import doublethink +import doublethink.cli +import logging +import sys +import pytest +import rethinkdb as r +import pkg_resources + +logging.basicConfig(stream=sys.stderr, level=logging.INFO, + format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") + +class RethinkerForTesting(doublethink.Rethinker): + def __init__(self, *args, **kwargs): + super(RethinkerForTesting, self).__init__(*args, **kwargs) + + def _random_server_connection(self): + self.last_conn = super(RethinkerForTesting, self)._random_server_connection() + # logging.info("self.last_conn=%s", self.last_conn) + return self.last_conn + +@pytest.fixture(scope="module") +def rr(): + rr = RethinkerForTesting() + try: + rr.db_drop("doublethink_test_db").run() + except r.errors.ReqlOpFailedError: + pass + result = rr.db_create("doublethink_test_db").run() + assert not rr.last_conn.is_open() + assert result["dbs_created"] == 1 + return RethinkerForTesting(db="doublethink_test_db") + +def test_cli(capsys, rr): + entrypoint = pkg_resources.get_entry_map( + 'doublethink')['console_scripts']['doublethink-purge-stale-services'] + callable = entrypoint.resolve() + with pytest.raises(SystemExit) as exit: + callable(['doublethink-purge-stale-services']) + print(dir(exit)) + assert exit.value.code != 0 + out, err = capsys.readouterr() + with pytest.raises(SystemExit) as exit: + # this wrap with sys.exit matches what occurs in the generated command + sys.exit(callable(['doublethink-purge-stale-services', '-d', 'test'])) + assert exit.value.code == 0 + out, err = capsys.readouterr() diff --git a/tests/test_svcreg.py b/tests/test_svcreg.py index aae5e22..380d115 100644 --- a/tests/test_svcreg.py +++ b/tests/test_svcreg.py @@ -258,3 +258,13 @@ def test_svcreg_heartbeat_server_down(rr): assert not 'host' in svc0 assert not 'pid' in svc0 +def test_purge_stale_services(rr): + rr.table('services').delete().run() + rr.table('services').insert({ 'id': 'old-service', "last_heartbeat": r.now(), 'ttl': 0.4 }).run() + time.sleep(1) + rr.table('services').insert({ 'id': 'new-service', "last_heartbeat": r.now(), 'ttl': 0.4 }).run() + svcreg = doublethink.ServiceRegistry(rr) + assert rr.table('services').count().run() == 2 + svcreg.purge_stale_services() + assert rr.table('services').count().run() == 1 + rr.table('services').delete().run()