From a877fa0fd809929b891299a9296215224af27fe3 Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 26 Sep 2017 15:51:11 -0700 Subject: [PATCH 1/7] Adds cron garbage collector --- doublethink/services.py | 5 +++++ scripts/purge_stale_services.py | 35 +++++++++++++++++++++++++++++++++ setup.py | 1 + 3 files changed, 41 insertions(+) create mode 100644 scripts/purge_stale_services.py diff --git a/doublethink/services.py b/doublethink/services.py index 38a0f93..616b306 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -312,3 +312,8 @@ class ServiceRegistry(object): available_service = healthy_service available_services = healthy_services + def purge_stale_services(self, grace_period=0): + result = self.rr.table('services').filter( + lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] + grace_period) + ).delete().run() + return result diff --git a/scripts/purge_stale_services.py b/scripts/purge_stale_services.py new file mode 100644 index 0000000..31794c5 --- /dev/null +++ b/scripts/purge_stale_services.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +import sys +from optparse import OptionParser +from doublethink import Rethinker +from doublethink.services import ServiceRegistry + +usage = """usage: %prog [options] db +where 'db' is the the name of a RethinkDB database that contains a "services" table. + +This script can be used to periodically purge stale entries from the "services" table. + +It is designed to be used in conjunction with cron. + +Example: +%prog -s rethink-host0,rethink-host1,rethink-host2 doublethink_database""" +parser = OptionParser(usage=usage) +parser.add_option("-s", "--rethinkdb-servers", + metavar="SERVERS", dest="servers", + help="a comma-separated list of hostnames of rethinkdb servers. Required. [default: none]") +parser.add_option("-g", "--grace-period", + metavar="SECONDS", dest="grace_period", type="int", + help="leave records that have been stale for up to SECONDS seconds. [default: 0]") +(options, args) = parser.parse_args() + +if len(args) < 1: + sys.exit('"db" is a required argument and should be the name of a RethinkDB database that contains a "services" table. See "--help" for a list of options') + +if not options.servers: + sys.exit('--rethinkdb-servers (-s) is a required argument. It should be a comma-separated list of rethinkdb servers. See --help for more information') + +options.servers = [srv.strip() for srv in options.servers.split(",")] + +rethinker = Rethinker(servers=options.servers, db=args[0]) +registry = ServiceRegistry(rethinker) +registry.purge_stale_services(grace_period=options.grace_period) \ No newline at end of file diff --git a/setup.py b/setup.py index 48d7ea7..e16e560 100644 --- a/setup.py +++ b/setup.py @@ -18,4 +18,5 @@ setuptools.setup( description='rethinkdb python library', long_description=codecs.open( 'README.rst', mode='r', encoding='utf-8').read(), + scripts=glob.glob('scripts/*.py'), ) From 872ef2d93b20e3a46fd7474c30e325728b765a3f Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 26 Sep 2017 16:43:37 -0700 Subject: [PATCH 2/7] changed after reviewing merge request --- doublethink/cli.py | 45 +++++++++++++++++++++++++++++++++ doublethink/services.py | 4 +-- scripts/purge_stale_services.py | 35 ------------------------- setup.py | 6 ++++- 4 files changed, 52 insertions(+), 38 deletions(-) create mode 100644 doublethink/cli.py delete mode 100644 scripts/purge_stale_services.py diff --git a/doublethink/cli.py b/doublethink/cli.py new file mode 100644 index 0000000..de6543b --- /dev/null +++ b/doublethink/cli.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +''' +doublethink/orm.py - rethinkdb ORM Command Line Interface + +Copyright (C) 2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import os, sys +import argparse +import doublethink + +def purge_stale_services(argv=None): + """Command-line utility to periodically purge stale entries from the "services" table. + + It is designed to be used in conjunction with cron. + """ + argv = argv or sys.argv + arg_parser = argparse.ArgumentParser( + prog=os.path.basename(argv[0]), + description='purge-stale-services: utility to periodically purge stale entries from the "services" table.') + + arg_parser.add_argument("db", help="A RethinkDB database containing a 'services' table") + + arg_parser.add_argument("-s", "--rethinkdb-servers", + metavar="SERVERS", dest="servers", required=True, + help="a comma-separated list of hostnames of rethinkdb servers. Required.") + args = arg_parser.parse_args(argv[1:]) + + args.servers = [srv.strip() for srv in args.servers.split(",")] + + rethinker = doublethink.Rethinker(servers=args.servers, db=args) + registry = doublethink.services.ServiceRegistry(rethinker) + registry.purge_stale_services() \ No newline at end of file diff --git a/doublethink/services.py b/doublethink/services.py index 616b306..aeeed29 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -312,8 +312,8 @@ class ServiceRegistry(object): available_service = healthy_service available_services = healthy_services - def purge_stale_services(self, grace_period=0): + def purge_stale_services(self, ttls_until_deletion=2): result = self.rr.table('services').filter( - lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] + grace_period) + lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion) ).delete().run() return result diff --git a/scripts/purge_stale_services.py b/scripts/purge_stale_services.py deleted file mode 100644 index 31794c5..0000000 --- a/scripts/purge_stale_services.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python -import sys -from optparse import OptionParser -from doublethink import Rethinker -from doublethink.services import ServiceRegistry - -usage = """usage: %prog [options] db -where 'db' is the the name of a RethinkDB database that contains a "services" table. - -This script can be used to periodically purge stale entries from the "services" table. - -It is designed to be used in conjunction with cron. - -Example: -%prog -s rethink-host0,rethink-host1,rethink-host2 doublethink_database""" -parser = OptionParser(usage=usage) -parser.add_option("-s", "--rethinkdb-servers", - metavar="SERVERS", dest="servers", - help="a comma-separated list of hostnames of rethinkdb servers. Required. [default: none]") -parser.add_option("-g", "--grace-period", - metavar="SECONDS", dest="grace_period", type="int", - help="leave records that have been stale for up to SECONDS seconds. [default: 0]") -(options, args) = parser.parse_args() - -if len(args) < 1: - sys.exit('"db" is a required argument and should be the name of a RethinkDB database that contains a "services" table. See "--help" for a list of options') - -if not options.servers: - sys.exit('--rethinkdb-servers (-s) is a required argument. It should be a comma-separated list of rethinkdb servers. See --help for more information') - -options.servers = [srv.strip() for srv in options.servers.split(",")] - -rethinker = Rethinker(servers=options.servers, db=args[0]) -registry = ServiceRegistry(rethinker) -registry.purge_stale_services(grace_period=options.grace_period) \ No newline at end of file diff --git a/setup.py b/setup.py index e16e560..3e28469 100644 --- a/setup.py +++ b/setup.py @@ -18,5 +18,9 @@ setuptools.setup( description='rethinkdb python library', long_description=codecs.open( 'README.rst', mode='r', encoding='utf-8').read(), - scripts=glob.glob('scripts/*.py'), + entry_points={ + 'console_scripts': [ + 'purge-stale-services=doublethink.cli:purge_stale_services', + ] + }, ) From 8f5232ac739b3d24b3c4ad39479093f3cd5e148f Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 26 Sep 2017 17:00:17 -0700 Subject: [PATCH 3/7] a few more revisions after consultation with noah. --- doublethink/cli.py | 14 ++++++++++++-- doublethink/services.py | 7 +++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/doublethink/cli.py b/doublethink/cli.py index de6543b..ef76019 100644 --- a/doublethink/cli.py +++ b/doublethink/cli.py @@ -31,12 +31,22 @@ def purge_stale_services(argv=None): prog=os.path.basename(argv[0]), description='purge-stale-services: utility to periodically purge stale entries from the "services" table.') - arg_parser.add_argument("db", help="A RethinkDB database containing a 'services' table") + arg_parser.add_argument("-d", "--rethinkdb-db", required=True, + help="A RethinkDB database containing a 'services' table") arg_parser.add_argument("-s", "--rethinkdb-servers", metavar="SERVERS", dest="servers", required=True, - help="a comma-separated list of hostnames of rethinkdb servers. Required.") + help="rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") + + arg_parser.add_argument( + '-v', '--verbose', dest='log_level', action='store_const', + default=logging.INFO, const=logging.DEBUG, help=( + 'verbose logging')) args = arg_parser.parse_args(argv[1:]) + logging.basicConfig( + stream=sys.stdout, level=args.log_level, format=( + '%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) args.servers = [srv.strip() for srv in args.servers.split(",")] diff --git a/doublethink/services.py b/doublethink/services.py index aeeed29..2289190 100644 --- a/doublethink/services.py +++ b/doublethink/services.py @@ -313,7 +313,10 @@ class ServiceRegistry(object): available_services = healthy_services def purge_stale_services(self, ttls_until_deletion=2): - result = self.rr.table('services').filter( + query = self.rr.table('services').filter( lambda svc: r.now().sub(svc["last_heartbeat"]).gt(svc["ttl"] * ttls_until_deletion) - ).delete().run() + ).delete() + logging.debug("Running query: %s", query) + result = query.run() + logging.debug("Results: %s", result) return result From a57b4484d38224d3287570675b64df1d971531ef Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 3 Oct 2017 13:56:32 -0700 Subject: [PATCH 4/7] initial (failing) version of tests file for CLI, changes to CLI to get it minimally working --- doublethink/cli.py | 6 ++++-- tests/test_cli.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 tests/test_cli.py diff --git a/doublethink/cli.py b/doublethink/cli.py index ef76019..517e52e 100644 --- a/doublethink/cli.py +++ b/doublethink/cli.py @@ -20,6 +20,7 @@ limitations under the License. import os, sys import argparse import doublethink +import logging def purge_stale_services(argv=None): """Command-line utility to periodically purge stale entries from the "services" table. @@ -32,10 +33,11 @@ def purge_stale_services(argv=None): description='purge-stale-services: utility to periodically purge stale entries from the "services" table.') arg_parser.add_argument("-d", "--rethinkdb-db", required=True, + dest="database", help="A RethinkDB database containing a 'services' table") arg_parser.add_argument("-s", "--rethinkdb-servers", - metavar="SERVERS", dest="servers", required=True, + metavar="SERVERS", dest="servers", default='localhost', help="rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") arg_parser.add_argument( @@ -50,6 +52,6 @@ def purge_stale_services(argv=None): args.servers = [srv.strip() for srv in args.servers.split(",")] - rethinker = doublethink.Rethinker(servers=args.servers, db=args) + rethinker = doublethink.Rethinker(servers=args.servers, db=args.database) registry = doublethink.services.ServiceRegistry(rethinker) registry.purge_stale_services() \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..fb7f43a --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,42 @@ +''' +test_cli.py - unit tests for doublethink CLI + +Copyright (C) 2015-2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import doublethink +import logging +import sys +import pytest +import rethinkdb as r + +logging.basicConfig(stream=sys.stderr, level=logging.INFO, + format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") + +@pytest.fixture(scope="module") +def rr(): + rr = RethinkerForTesting() + try: + rr.db_drop("doublethink_test_db").run() + except r.errors.ReqlOpFailedError: + pass + result = rr.db_create("doublethink_test_db").run() + assert not rr.last_conn.is_open() + assert result["dbs_created"] == 1 + return RethinkerForTesting(db="doublethink_test_db") + +def test_cli(rr): + print(rr) + doublethink.cli.purge_stale_services(['test']) From dd5b2122cf1e03c0303d56332f29a3b2b9fe0610 Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 3 Oct 2017 13:56:52 -0700 Subject: [PATCH 5/7] improve the git diff here so this runs --- tests/run-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 78464ba..ff06b7f 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -11,7 +11,7 @@ do docker run --rm -it --volume="$script_dir/..:/doublethink" internetarchive/rethinkdb /sbin/my_init -- \ bash -x -c "cd /tmp && git clone /doublethink \ && cd /tmp/doublethink \ - && (cd /doublethink && git diff) | patch -p1 \ + && (cd /doublethink && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ && pip install pytest . \ From df7c0b8e32c448b1bd3a2a9471e17b181aabdac3 Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 3 Oct 2017 14:38:31 -0700 Subject: [PATCH 6/7] added tests for purging stale services and minimal tests for command line tool --- doublethink/cli.py | 3 ++- tests/test_cli.py | 27 ++++++++++++++++++++++++--- tests/test_svcreg.py | 10 ++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/doublethink/cli.py b/doublethink/cli.py index 517e52e..fab424c 100644 --- a/doublethink/cli.py +++ b/doublethink/cli.py @@ -54,4 +54,5 @@ def purge_stale_services(argv=None): rethinker = doublethink.Rethinker(servers=args.servers, db=args.database) registry = doublethink.services.ServiceRegistry(rethinker) - registry.purge_stale_services() \ No newline at end of file + registry.purge_stale_services() + sys.exit(0) \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index fb7f43a..45c88b4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -17,14 +17,25 @@ limitations under the License. ''' import doublethink +import doublethink.cli import logging import sys import pytest import rethinkdb as r +import pkg_resources logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") +class RethinkerForTesting(doublethink.Rethinker): + def __init__(self, *args, **kwargs): + super(RethinkerForTesting, self).__init__(*args, **kwargs) + + def _random_server_connection(self): + self.last_conn = super(RethinkerForTesting, self)._random_server_connection() + # logging.info("self.last_conn=%s", self.last_conn) + return self.last_conn + @pytest.fixture(scope="module") def rr(): rr = RethinkerForTesting() @@ -37,6 +48,16 @@ def rr(): assert result["dbs_created"] == 1 return RethinkerForTesting(db="doublethink_test_db") -def test_cli(rr): - print(rr) - doublethink.cli.purge_stale_services(['test']) +def test_cli(capsys, rr): + entrypoint = pkg_resources.get_entry_map( + 'doublethink')['console_scripts']['purge-stale-services'] + callable = entrypoint.resolve() + with pytest.raises(SystemExit) as exit: + callable(['purge-stale-services']) + print(dir(exit)) + assert exit.value.code != 0 + out, err = capsys.readouterr() + with pytest.raises(SystemExit) as exit: + callable(['purge-stale-services', '-d', 'test']) + assert exit.value.code == 0 + out, err = capsys.readouterr() diff --git a/tests/test_svcreg.py b/tests/test_svcreg.py index aae5e22..380d115 100644 --- a/tests/test_svcreg.py +++ b/tests/test_svcreg.py @@ -258,3 +258,13 @@ def test_svcreg_heartbeat_server_down(rr): assert not 'host' in svc0 assert not 'pid' in svc0 +def test_purge_stale_services(rr): + rr.table('services').delete().run() + rr.table('services').insert({ 'id': 'old-service', "last_heartbeat": r.now(), 'ttl': 0.4 }).run() + time.sleep(1) + rr.table('services').insert({ 'id': 'new-service', "last_heartbeat": r.now(), 'ttl': 0.4 }).run() + svcreg = doublethink.ServiceRegistry(rr) + assert rr.table('services').count().run() == 2 + svcreg.purge_stale_services() + assert rr.table('services').count().run() == 1 + rr.table('services').delete().run() From e1b4153712b958e878a85ebf117d2689e8ad15ab Mon Sep 17 00:00:00 2001 From: James Kafader Date: Tue, 3 Oct 2017 16:13:42 -0700 Subject: [PATCH 7/7] clean up small items, typos, change command name, clean up tests in re: exit code testing. --- doublethink/cli.py | 6 +++--- setup.py | 2 +- tests/test_cli.py | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/doublethink/cli.py b/doublethink/cli.py index fab424c..630ef6a 100644 --- a/doublethink/cli.py +++ b/doublethink/cli.py @@ -1,6 +1,6 @@ #!/usr/bin/env python ''' -doublethink/orm.py - rethinkdb ORM Command Line Interface +doublethink/cli.py - doublethink Command Line Tools Copyright (C) 2017 Internet Archive @@ -30,7 +30,7 @@ def purge_stale_services(argv=None): argv = argv or sys.argv arg_parser = argparse.ArgumentParser( prog=os.path.basename(argv[0]), - description='purge-stale-services: utility to periodically purge stale entries from the "services" table.') + description='doublethink-purge-stale-services: utility to periodically purge stale entries from the "services" table.') arg_parser.add_argument("-d", "--rethinkdb-db", required=True, dest="database", @@ -55,4 +55,4 @@ def purge_stale_services(argv=None): rethinker = doublethink.Rethinker(servers=args.servers, db=args.database) registry = doublethink.services.ServiceRegistry(rethinker) registry.purge_stale_services() - sys.exit(0) \ No newline at end of file + return 0 \ No newline at end of file diff --git a/setup.py b/setup.py index 3e28469..f4b5174 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setuptools.setup( 'README.rst', mode='r', encoding='utf-8').read(), entry_points={ 'console_scripts': [ - 'purge-stale-services=doublethink.cli:purge_stale_services', + 'doublethink-purge-stale-services=doublethink.cli:purge_stale_services', ] }, ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 45c88b4..f26c6bd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -50,14 +50,15 @@ def rr(): def test_cli(capsys, rr): entrypoint = pkg_resources.get_entry_map( - 'doublethink')['console_scripts']['purge-stale-services'] + 'doublethink')['console_scripts']['doublethink-purge-stale-services'] callable = entrypoint.resolve() with pytest.raises(SystemExit) as exit: - callable(['purge-stale-services']) + callable(['doublethink-purge-stale-services']) print(dir(exit)) assert exit.value.code != 0 out, err = capsys.readouterr() with pytest.raises(SystemExit) as exit: - callable(['purge-stale-services', '-d', 'test']) + # this wrap with sys.exit matches what occurs in the generated command + sys.exit(callable(['doublethink-purge-stale-services', '-d', 'test'])) assert exit.value.code == 0 out, err = capsys.readouterr()