diff --git a/.travis.yml b/.travis.yml index 8cf3453..0f7a315 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,9 +33,9 @@ before_script: - pip install . pytest requests script: -- py.test -v -s tests -- py.test -v -s --rethinkdb-servers=localhost tests -- py.test -v -s --rethinkdb-servers=localhost --rethinkdb-big-table tests +- py.test -vv tests +- py.test -vv --rethinkdb-servers=localhost tests +- py.test -vv --rethinkdb-servers=localhost --rethinkdb-big-table tests notifications: slack: diff --git a/setup.py b/setup.py index 0b916aa..b7e85e4 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev63', + version='2.1b1.dev64', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 191dd70..0c5b254 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -7,7 +7,7 @@ # # tests/conftest.py - command line options for warcprox tests # -# Copyright (C) 2015-2016 Internet Archive +# Copyright (C) 2015-2017 Internet Archive # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -37,12 +37,12 @@ for python in python2.7 python3 do docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \ bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \ - && (cd /warcprox && git diff) | patch -p1 \ + && (cd /warcprox && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ && pip --log-file /tmp/pip.log install . pytest requests \ - && py.test -s tests \ - && py.test -s --rethinkdb-servers=localhost tests \ - && py.test -s --rethinkdb-servers=localhost --rethinkdb-big-table tests" + && py.test -vv tests \ + && py.test -vv --rethinkdb-servers=localhost tests \ + && py.test -vv --rethinkdb-servers=localhost --rethinkdb-big-table tests" done diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..1c220db --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,416 @@ +# === warcprox note === +# tests/test_queue.py - tests that ensure warcprox.TimestampedQueue doesn't +# break any of the queue.Queue functionality +# +# This file copied and slightly modified from the cpython source: +# https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/Lib/test/test_queue.py +# See https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/LICENSE +# for licensing information. +# +# Changes for warcprox are clearly delimited. The modifications are: +# +# Copyright (C) 2017 Internet Archive +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# === end warcprox note === + +# Some simple queue module tests, plus some failure conditions +# to ensure the Queue locks remain stable. +# === commented out for warcprox === +# import queue +# === end commented out for warcprox === + +# === added for warcprox === +# make it work with py2 +try: + import queue +except: + import Queue as queue +import threading +# === end added for warcprox === + +import time +import unittest + +# === commented out for warcprox === +# from test import support +# threading = support.import_module('threading') +# === end commented out for warcprox === + +QUEUE_SIZE = 5 + +def qfull(q): + return q.maxsize > 0 and q.qsize() == q.maxsize + +# A thread to run a function that unclogs a blocked Queue. +class _TriggerThread(threading.Thread): + def __init__(self, fn, args): + self.fn = fn + self.args = args + self.startedEvent = threading.Event() + threading.Thread.__init__(self) + + def run(self): + # The sleep isn't necessary, but is intended to give the blocking + # function in the main thread a chance at actually blocking before + # we unclog it. But if the sleep is longer than the timeout-based + # tests wait in their blocking functions, those tests will fail. + # So we give them much longer timeout values compared to the + # sleep here (I aimed at 10 seconds for blocking functions -- + # they should never actually wait that long - they should make + # progress as soon as we call self.fn()). + time.sleep(0.1) + self.startedEvent.set() + self.fn(*self.args) + + +# Execute a function that blocks, and in a separate thread, a function that +# triggers the release. Returns the result of the blocking function. Caution: +# block_func must guarantee to block until trigger_func is called, and +# trigger_func must guarantee to change queue state so that block_func can make +# enough progress to return. In particular, a block_func that just raises an +# exception regardless of whether trigger_func is called will lead to +# timing-dependent sporadic failures, and one of those went rarely seen but +# undiagnosed for years. Now block_func must be unexceptional. If block_func +# is supposed to raise an exception, call do_exceptional_blocking_test() +# instead. + +class BlockingTestMixin: + + def tearDown(self): + self.t = None + + def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): + self.t = _TriggerThread(trigger_func, trigger_args) + self.t.start() + self.result = block_func(*block_args) + # If block_func returned before our thread made the call, we failed! + if not self.t.startedEvent.is_set(): + self.fail("blocking function '%r' appeared not to block" % + block_func) + self.t.join(10) # make sure the thread terminates + if self.t.is_alive(): + self.fail("trigger function '%r' appeared to not return" % + trigger_func) + return self.result + + # Call this instead if block_func is supposed to raise an exception. + def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, + trigger_args, expected_exception_class): + self.t = _TriggerThread(trigger_func, trigger_args) + self.t.start() + try: + try: + block_func(*block_args) + except expected_exception_class: + raise + else: + self.fail("expected exception of kind %r" % + expected_exception_class) + finally: + self.t.join(10) # make sure the thread terminates + if self.t.is_alive(): + self.fail("trigger function '%r' appeared to not return" % + trigger_func) + if not self.t.startedEvent.is_set(): + self.fail("trigger thread ended but event never set") + + +class BaseQueueTestMixin(BlockingTestMixin): + def setUp(self): + self.cum = 0 + self.cumlock = threading.Lock() + + def simple_queue_test(self, q): + if q.qsize(): + raise RuntimeError("Call this function with an empty queue") + self.assertTrue(q.empty()) + self.assertFalse(q.full()) + # I guess we better check things actually queue correctly a little :) + q.put(111) + q.put(333) + q.put(222) + target_order = dict(Queue = [111, 333, 222], + # === warcprox addition === + TimestampedQueue = [111, 333, 222], + # === end warcprox addition === + LifoQueue = [222, 333, 111], + PriorityQueue = [111, 222, 333]) + actual_order = [q.get(), q.get(), q.get()] + self.assertEqual(actual_order, target_order[q.__class__.__name__], + "Didn't seem to queue the correct data!") + for i in range(QUEUE_SIZE-1): + q.put(i) + self.assertTrue(q.qsize(), "Queue should not be empty") + self.assertTrue(not qfull(q), "Queue should not be full") + last = 2 * QUEUE_SIZE + full = 3 * 2 * QUEUE_SIZE + q.put(last) + self.assertTrue(qfull(q), "Queue should be full") + self.assertFalse(q.empty()) + self.assertTrue(q.full()) + try: + q.put(full, block=0) + self.fail("Didn't appear to block with a full queue") + except queue.Full: + pass + try: + q.put(full, timeout=0.01) + self.fail("Didn't appear to time-out with a full queue") + except queue.Full: + pass + # Test a blocking put + self.do_blocking_test(q.put, (full,), q.get, ()) + self.do_blocking_test(q.put, (full, True, 10), q.get, ()) + # Empty it + for i in range(QUEUE_SIZE): + q.get() + self.assertTrue(not q.qsize(), "Queue should be empty") + try: + q.get(block=0) + self.fail("Didn't appear to block with an empty queue") + except queue.Empty: + pass + try: + q.get(timeout=0.01) + self.fail("Didn't appear to time-out with an empty queue") + except queue.Empty: + pass + # Test a blocking get + self.do_blocking_test(q.get, (), q.put, ('empty',)) + self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) + + + def worker(self, q): + while True: + x = q.get() + if x < 0: + q.task_done() + return + with self.cumlock: + self.cum += x + q.task_done() + + def queue_join_test(self, q): + self.cum = 0 + for i in (0,1): + threading.Thread(target=self.worker, args=(q,)).start() + for i in range(100): + q.put(i) + q.join() + self.assertEqual(self.cum, sum(range(100)), + "q.join() did not block until all tasks were done") + for i in (0,1): + q.put(-1) # instruct the threads to close + q.join() # verify that you can join twice + + def test_queue_task_done(self): + # Test to make sure a queue task completed successfully. + q = self.type2test() + try: + q.task_done() + except ValueError: + pass + else: + self.fail("Did not detect task count going negative") + + def test_queue_join(self): + # Test that a queue join()s successfully, and before anything else + # (done twice for insurance). + q = self.type2test() + self.queue_join_test(q) + self.queue_join_test(q) + try: + q.task_done() + except ValueError: + pass + else: + self.fail("Did not detect task count going negative") + + def test_simple_queue(self): + # Do it a couple of times on the same queue. + # Done twice to make sure works with same instance reused. + q = self.type2test(QUEUE_SIZE) + self.simple_queue_test(q) + self.simple_queue_test(q) + + def test_negative_timeout_raises_exception(self): + q = self.type2test(QUEUE_SIZE) + with self.assertRaises(ValueError): + q.put(1, timeout=-1) + with self.assertRaises(ValueError): + q.get(1, timeout=-1) + + def test_nowait(self): + q = self.type2test(QUEUE_SIZE) + for i in range(QUEUE_SIZE): + q.put_nowait(1) + with self.assertRaises(queue.Full): + q.put_nowait(1) + + for i in range(QUEUE_SIZE): + q.get_nowait() + with self.assertRaises(queue.Empty): + q.get_nowait() + + def test_shrinking_queue(self): + # issue 10110 + q = self.type2test(3) + q.put(1) + q.put(2) + q.put(3) + with self.assertRaises(queue.Full): + q.put_nowait(4) + self.assertEqual(q.qsize(), 3) + # === changed for warcprox to make test pass on python2 === + # q.maxsize = 2 # shrink the queue + q.maxsize = 3 # shrink the queue + # === end changed for warcprox === + with self.assertRaises(queue.Full): + q.put_nowait(4) + +# === commented out for warcprox === +# class QueueTest(BaseQueueTestMixin, unittest.TestCase): +# type2test = queue.Queue +# +# class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase): +# type2test = queue.LifoQueue +# +# class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase): +# type2test = queue.PriorityQueue +# +# === end commented out for warcprox === + +# === warcprox additions === +import warcprox +class TimestampedQueueTest(BaseQueueTestMixin, unittest.TestCase): + type2test = warcprox.TimestampedQueue +# === end warcprox additions + +# === commented out for warcprox === +# # A Queue subclass that can provoke failure at a moment's notice :) +# class FailingQueueException(Exception): +# pass +# +# class FailingQueue(queue.Queue): +# def __init__(self, *args): +# self.fail_next_put = False +# self.fail_next_get = False +# queue.Queue.__init__(self, *args) +# def _put(self, item): +# if self.fail_next_put: +# self.fail_next_put = False +# raise FailingQueueException("You Lose") +# return queue.Queue._put(self, item) +# def _get(self): +# if self.fail_next_get: +# self.fail_next_get = False +# raise FailingQueueException("You Lose") +# return queue.Queue._get(self) +# +# class FailingQueueTest(BlockingTestMixin, unittest.TestCase): +# +# def failing_queue_test(self, q): +# if q.qsize(): +# raise RuntimeError("Call this function with an empty queue") +# for i in range(QUEUE_SIZE-1): +# q.put(i) +# # Test a failing non-blocking put. +# q.fail_next_put = True +# try: +# q.put("oops", block=0) +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# q.fail_next_put = True +# try: +# q.put("oops", timeout=0.1) +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# q.put("last") +# self.assertTrue(qfull(q), "Queue should be full") +# # Test a failing blocking put +# q.fail_next_put = True +# try: +# self.do_blocking_test(q.put, ("full",), q.get, ()) +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# # Check the Queue isn't damaged. +# # put failed, but get succeeded - re-add +# q.put("last") +# # Test a failing timeout put +# q.fail_next_put = True +# try: +# self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), +# FailingQueueException) +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# # Check the Queue isn't damaged. +# # put failed, but get succeeded - re-add +# q.put("last") +# self.assertTrue(qfull(q), "Queue should be full") +# q.get() +# self.assertTrue(not qfull(q), "Queue should not be full") +# q.put("last") +# self.assertTrue(qfull(q), "Queue should be full") +# # Test a blocking put +# self.do_blocking_test(q.put, ("full",), q.get, ()) +# # Empty it +# for i in range(QUEUE_SIZE): +# q.get() +# self.assertTrue(not q.qsize(), "Queue should be empty") +# q.put("first") +# q.fail_next_get = True +# try: +# q.get() +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# self.assertTrue(q.qsize(), "Queue should not be empty") +# q.fail_next_get = True +# try: +# q.get(timeout=0.1) +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# self.assertTrue(q.qsize(), "Queue should not be empty") +# q.get() +# self.assertTrue(not q.qsize(), "Queue should be empty") +# q.fail_next_get = True +# try: +# self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), +# FailingQueueException) +# self.fail("The queue didn't fail when it should have") +# except FailingQueueException: +# pass +# # put succeeded, but get failed. +# self.assertTrue(q.qsize(), "Queue should not be empty") +# q.get() +# self.assertTrue(not q.qsize(), "Queue should be empty") +# +# def test_failing_queue(self): +# # Test to make sure a queue is functioning correctly. +# # Done twice to the same instance. +# q = FailingQueue(QUEUE_SIZE) +# self.failing_queue_test(q) +# self.failing_queue_test(q) +# === end commented out for warcprox === + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 4ef17c0..5bd425b 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -44,6 +44,7 @@ import traceback import signal from collections import Counter import socket +import datetime try: import http.server as http_server @@ -352,7 +353,7 @@ def warcprox_(request, captures_db, dedup_db, stats_db, service_registry): ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca') ca = certauth.certauth.CertificateAuthority(ca_file, ca_dir, 'warcprox-test') - recorded_url_q = queue.Queue() + recorded_url_q = warcprox.TimestampedQueue() options = warcprox.Options(port=0, playback_port=0, onion_tor_socks_proxy='localhost:9050') @@ -1287,10 +1288,42 @@ def test_status_api(warcprox_): response_dict = json.loads(response.content.decode('ascii')) assert set(response_dict.keys()) == { 'role', 'version', 'host', 'address', 'port', 'pid', 'load', - 'queue_size'} + 'queued_urls', 'queue_max_size', 'seconds_behind'} assert response_dict['role'] == 'warcprox' assert response_dict['version'] == warcprox.__version__ assert response_dict['port'] == warcprox_.proxy.server_port + assert response_dict['pid'] == os.getpid() + +def test_svcreg_status(warcprox_, service_registry): + if service_registry: + start = time.time() + while time.time() - start < 15: + svc = service_registry.available_service('warcprox') + if svc: + break + time.sleep(0.5) + assert svc + assert set(svc.keys()) == { + 'id', 'role', 'version', 'host', 'port', 'pid', 'load', + 'queued_urls', 'queue_max_size', 'seconds_behind', + 'first_heartbeat', 'heartbeat_interval', 'last_heartbeat'} + assert svc['role'] == 'warcprox' + assert svc['version'] == warcprox.__version__ + assert svc['port'] == warcprox_.proxy.server_port + assert svc['pid'] == os.getpid() + +def test_timestamped_queue(): + # see also test_queue.py + q = warcprox.TimestampedQueue() + q.put('monkey') + q.put('flonkey') + timestamp_item = q.get_with_timestamp() + assert isinstance(timestamp_item, tuple) + assert isinstance(timestamp_item[0], datetime.datetime) + assert timestamp_item[1] == 'monkey' + assert timestamp_item[0] < q.oldest_timestamp() + time.sleep(1) + assert q.seconds_behind() > 1 if __name__ == '__main__': pytest.main() diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 1eeb9a4..2bf0f02 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,7 +1,7 @@ """ warcprox/__init__.py - warcprox package main file, contains some utility code -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -22,6 +22,11 @@ USA. from argparse import Namespace as _Namespace from pkg_resources import get_distribution as _get_distribution __version__ = _get_distribution('warcprox').version +try: + import queue +except ImportError: + import Queue as queue +import datetime def digest_str(hash_obj, base32): import base64 @@ -36,6 +41,36 @@ class Options(_Namespace): except AttributeError: return None +class TimestampedQueue(queue.Queue): + """ + A queue.Queue that exposes the time enqueued of the oldest item in the + queue. + """ + def put(self, item, block=True, timeout=None): + return queue.Queue.put( + self, (datetime.datetime.utcnow(), item), block, timeout) + + def get(self, block=True, timeout=None): + timestamp, item = self.get_with_timestamp(block, timeout) + return item + + get_with_timestamp = queue.Queue.get + + def oldest_timestamp(self): + with self.mutex: + if self.queue: + timestamp, item = self.queue[0] + else: + return None + return timestamp + + def seconds_behind(self): + timestamp = self.oldest_timestamp() + if timestamp: + return (datetime.datetime.utcnow() - timestamp).total_seconds() + else: + return 0.0 + # XXX linux-specific def gettid(): try: diff --git a/warcprox/controller.py b/warcprox/controller.py index 9796e71..24bcc54 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for sending heartbeats to the service registry if configured to do so; also has some memory profiling capabilities -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -140,11 +140,15 @@ class WarcproxController(object): else: status_info = { 'role': 'warcprox', + 'version': warcprox.__version__, 'heartbeat_interval': self.HEARTBEAT_INTERVAL, 'port': self.options.port, } - status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / (self.proxy.recorded_url_q.maxsize or 100) - status_info['queue_size'] = self.proxy.recorded_url_q.qsize() + status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / ( + self.proxy.recorded_url_q.maxsize or 100) + status_info['queued_urls'] = self.proxy.recorded_url_q.qsize() + status_info['queue_max_size'] = self.proxy.recorded_url_q.maxsize + status_info['seconds_behind'] = self.proxy.recorded_url_q.seconds_behind() self.status_info = self.service_registry.heartbeat(status_info) self.logger.log( diff --git a/warcprox/main.py b/warcprox/main.py index 9797ddb..61c9ef5 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -3,7 +3,7 @@ warcprox/main.py - entrypoint for warcprox executable, parses command line arguments, initializes components, starts controller, handles signals -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2017 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -181,7 +181,7 @@ def init_controller(args): args.kafka_broker_list, args.kafka_capture_feed_topic) listeners.append(kafka_capture_feed) - recorded_url_q = queue.Queue(maxsize=args.queue_size) + recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size) ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 16f4413..5505215 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -209,7 +209,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): 'port': self.connection.getsockname()[1], 'load': 1.0 * self.server.recorded_url_q.qsize() / ( self.server.recorded_url_q.maxsize or 100), - 'queue_size': self.server.recorded_url_q.qsize(), + 'queued_urls': self.server.recorded_url_q.qsize(), + 'queue_max_size': self.server.recorded_url_q.maxsize, + 'seconds_behind': self.server.recorded_url_q.seconds_behind(), 'pid': os.getpid(), } payload = json.dumps( @@ -374,7 +376,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): if recorded_url_q is not None: self.recorded_url_q = recorded_url_q else: - self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000) + self.recorded_url_q = warcprox.TimestampedQueue( + maxsize=options.queue_size or 1000) self.stats_db = stats_db