import rethinkstuff import logging import sys import types import gc import pytest import rethinkdb import time import socket import os import datetime logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") class RethinkerForTesting(rethinkstuff.Rethinker): def __init__(self, *args, **kwargs): super(RethinkerForTesting, self).__init__(*args, **kwargs) def _random_server_connection(self): self.last_conn = super(RethinkerForTesting, self)._random_server_connection() # logging.info("self.last_conn=%s", self.last_conn) return self.last_conn @pytest.fixture(scope="module") def r(): r = RethinkerForTesting() result = r.db_create("my_db").run() assert not r.last_conn.is_open() assert result["dbs_created"] == 1 return RethinkerForTesting(db="my_db") @pytest.fixture(scope="module") def my_table(r): assert r.table_list().run() == [] result = r.table_create("my_table").run() assert not r.last_conn.is_open() assert result["tables_created"] == 1 def test_rethinker(r, my_table): assert r.table("my_table").index_create("foo").run() == {"created": 1} assert not r.last_conn.is_open() result = r.table("my_table").insert(({"foo":i,"bar":"repeat"*i} for i in range(2000))).run() assert not r.last_conn.is_open() assert len(result["generated_keys"]) == 2000 assert result["inserted"] == 2000 result = r.table("my_table").run() assert r.last_conn.is_open() # should still be open this time assert isinstance(result, types.GeneratorType) n = 0 for x in result: n += 1 pass # connection should be closed after finished iterating over results assert not r.last_conn.is_open() assert n == 2000 result = r.table("my_table").run() assert r.last_conn.is_open() # should still be open this time assert isinstance(result, types.GeneratorType) next(result) result = None gc.collect() # connection should be closed after result is garbage-collected assert not r.last_conn.is_open() result = r.table("my_table").run() assert r.last_conn.is_open() # should still be open this time assert isinstance(result, types.GeneratorType) result = None gc.collect() # connection should be closed after result is garbage-collected assert not r.last_conn.is_open() def test_too_many_errors(r): with pytest.raises(rethinkdb.errors.ReqlOpFailedError): r.table_create("too_many_replicas", replicas=99).run() with pytest.raises(rethinkdb.errors.ReqlOpFailedError): r.table_create("too_many_shards", shards=99).run() def test_slice(r, my_table): """Tests RethinkerWrapper.__getitem__()""" result = r.table("my_table")[5:10].run() assert r.last_conn.is_open() # should still be open this time assert isinstance(result, types.GeneratorType) n = 0 for x in result: n += 1 pass # connection should be closed after finished iterating over results assert not r.last_conn.is_open() assert n == 5 def test_service_registry(r): svcreg = rethinkstuff.ServiceRegistry(r) assert svcreg.available_service("yes-such-role") == None assert svcreg.available_services("yes-such-role") == [] assert svcreg.available_services() == [] svc0 = { "role": "yes-such-role", "load": 100.0, "heartbeat_interval": 0.2, } svc1 = { "role": "yes-such-role", "load": 200.0, "heartbeat_interval": 0.2, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) assert "id" in svc0 assert "id" in svc1 assert svc0["id"] != svc1["id"] assert svc0["host"] == socket.gethostname() assert svc1["host"] == socket.gethostname() assert "pid" in svc0 assert "pid" in svc1 assert svc0["pid"] == os.getpid() assert svc1["pid"] == os.getpid() assert "first_heartbeat" in svc0 assert "first_heartbeat" in svc1 assert "last_heartbeat" in svc0 assert "last_heartbeat" in svc1 time.sleep(0.1) assert svcreg.available_service("no-such-role") == None assert svcreg.available_services("no-such-role") == [] assert svcreg.available_service("yes-such-role")["id"] == svc0["id"] assert len(svcreg.available_services("yes-such-role")) == 1 assert len(svcreg.available_services()) == 1 svc1["load"] = 50.0 svc1 = svcreg.heartbeat(svc1) time.sleep(0.1) assert svcreg.available_service("no-such-role") == None assert svcreg.available_service("yes-such-role")["id"] == svc1["id"] assert len(svcreg.available_services("yes-such-role")) == 1 assert len(svcreg.available_services()) == 1 svc1["load"] = 200.0 svc1 = svcreg.heartbeat(svc1) time.sleep(0.1) assert svcreg.available_service("no-such-role") == None assert svcreg.available_service("yes-such-role")["id"] == svc0["id"] assert len(svcreg.available_services("yes-such-role")) == 1 assert len(svcreg.available_services()) == 1 svc1 = svcreg.heartbeat(svc1) time.sleep(0.1) svc1 = svcreg.heartbeat(svc1) time.sleep(0.4) assert svcreg.available_service("no-such-role") == None assert svcreg.available_service("yes-such-role")["id"] == svc1["id"] assert len(svcreg.available_services("yes-such-role")) == 1 assert len(svcreg.available_services()) == 1 svcreg.unregister(svc1["id"]) time.sleep(0.1) assert svcreg.available_service("no-such-role") == None assert svcreg.available_service("yes-such-role") == None assert svcreg.available_services("yes-such-role") == [] assert svcreg.available_services() == [] svc0 = { "role": "yes-such-role", "load": 100.0, "heartbeat_interval": 0.2, } svc1 = { "role": "yes-such-role", "load": 200.0, "heartbeat_interval": 0.2, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) assert len(svcreg.available_services("yes-such-role")) == 2 assert len(svcreg.available_services()) == 2 svc0 = { "role": "yes-such-role", "load": 100.0, "heartbeat_interval": 0.2, } svc1 = { "role": "yes-such-role", "load": 200.0, "heartbeat_interval": 0.2, } svc2 = { "role": "another-such-role", "load": 200.0, "heartbeat_interval": 0.2, } svc3 = { "role": "yet-another-such-role", "load": 200.0, "heartbeat_interval": 0.2, } svc0 = svcreg.heartbeat(svc0) svc1 = svcreg.heartbeat(svc1) svc2 = svcreg.heartbeat(svc2) svc3 = svcreg.heartbeat(svc3) assert len(svcreg.available_services("yes-such-role")) == 2 assert len(svcreg.available_services()) == 4 def test_utcnow(): now_notz = datetime.datetime.utcnow() # has no timezone :( assert not now_notz.tzinfo now_tz = rethinkstuff.utcnow() # solution to that problem assert now_tz.tzinfo ## .timestamp() was added in python 3.3 if hasattr(now_tz, 'timestamp'): assert now_tz.timestamp() - now_notz.timestamp() < 0.1 ## XXX TypeError: can't subtract offset-naive and offset-aware datetimes # assert abs((now_tz - now_notz).total_seconds()) < 0.1 ## XXX what else can we test without jumping through hoops?