# === warcprox note === # tests/test_queue.py - tests that ensure warcprox.TimestampedQueue doesn't # break any of the queue.Queue functionality # # This file copied and slightly modified from the cpython source: # https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/Lib/test/test_queue.py # See https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/LICENSE # for licensing information. # # Changes for warcprox are clearly delimited. The modifications are: # # Copyright (C) 2017 Internet Archive # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. # === end warcprox note === # Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. # === commented out for warcprox === # import queue # === end commented out for warcprox === # === added for warcprox === # make it work with py2 try: import queue except: import Queue as queue import threading # === end added for warcprox === import time import unittest # === commented out for warcprox === # from test import support # threading = support.import_module('threading') # === end commented out for warcprox === QUEUE_SIZE = 5 def qfull(q): return q.maxsize > 0 and q.qsize() == q.maxsize # A thread to run a function that unclogs a blocked Queue. class _TriggerThread(threading.Thread): def __init__(self, fn, args): self.fn = fn self.args = args self.startedEvent = threading.Event() threading.Thread.__init__(self) def run(self): # The sleep isn't necessary, but is intended to give the blocking # function in the main thread a chance at actually blocking before # we unclog it. But if the sleep is longer than the timeout-based # tests wait in their blocking functions, those tests will fail. # So we give them much longer timeout values compared to the # sleep here (I aimed at 10 seconds for blocking functions -- # they should never actually wait that long - they should make # progress as soon as we call self.fn()). time.sleep(0.1) self.startedEvent.set() self.fn(*self.args) # Execute a function that blocks, and in a separate thread, a function that # triggers the release. Returns the result of the blocking function. Caution: # block_func must guarantee to block until trigger_func is called, and # trigger_func must guarantee to change queue state so that block_func can make # enough progress to return. In particular, a block_func that just raises an # exception regardless of whether trigger_func is called will lead to # timing-dependent sporadic failures, and one of those went rarely seen but # undiagnosed for years. Now block_func must be unexceptional. If block_func # is supposed to raise an exception, call do_exceptional_blocking_test() # instead. class BlockingTestMixin: def tearDown(self): self.t = None def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): self.t = _TriggerThread(trigger_func, trigger_args) self.t.start() self.result = block_func(*block_args) # If block_func returned before our thread made the call, we failed! if not self.t.startedEvent.is_set(): self.fail("blocking function '%r' appeared not to block" % block_func) self.t.join(10) # make sure the thread terminates if self.t.is_alive(): self.fail("trigger function '%r' appeared to not return" % trigger_func) return self.result # Call this instead if block_func is supposed to raise an exception. def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, trigger_args, expected_exception_class): self.t = _TriggerThread(trigger_func, trigger_args) self.t.start() try: try: block_func(*block_args) except expected_exception_class: raise else: self.fail("expected exception of kind %r" % expected_exception_class) finally: self.t.join(10) # make sure the thread terminates if self.t.is_alive(): self.fail("trigger function '%r' appeared to not return" % trigger_func) if not self.t.startedEvent.is_set(): self.fail("trigger thread ended but event never set") class BaseQueueTestMixin(BlockingTestMixin): def setUp(self): self.cum = 0 self.cumlock = threading.Lock() def simple_queue_test(self, q): if q.qsize(): raise RuntimeError("Call this function with an empty queue") self.assertTrue(q.empty()) self.assertFalse(q.full()) # I guess we better check things actually queue correctly a little :) q.put(111) q.put(333) q.put(222) target_order = dict(Queue = [111, 333, 222], # === warcprox addition === TimestampedQueue = [111, 333, 222], # === end warcprox addition === LifoQueue = [222, 333, 111], PriorityQueue = [111, 222, 333]) actual_order = [q.get(), q.get(), q.get()] self.assertEqual(actual_order, target_order[q.__class__.__name__], "Didn't seem to queue the correct data!") for i in range(QUEUE_SIZE-1): q.put(i) self.assertTrue(q.qsize(), "Queue should not be empty") self.assertTrue(not qfull(q), "Queue should not be full") last = 2 * QUEUE_SIZE full = 3 * 2 * QUEUE_SIZE q.put(last) self.assertTrue(qfull(q), "Queue should be full") self.assertFalse(q.empty()) self.assertTrue(q.full()) try: q.put(full, block=0) self.fail("Didn't appear to block with a full queue") except queue.Full: pass try: q.put(full, timeout=0.01) self.fail("Didn't appear to time-out with a full queue") except queue.Full: pass # Test a blocking put self.do_blocking_test(q.put, (full,), q.get, ()) self.do_blocking_test(q.put, (full, True, 10), q.get, ()) # Empty it for i in range(QUEUE_SIZE): q.get() self.assertTrue(not q.qsize(), "Queue should be empty") try: q.get(block=0) self.fail("Didn't appear to block with an empty queue") except queue.Empty: pass try: q.get(timeout=0.01) self.fail("Didn't appear to time-out with an empty queue") except queue.Empty: pass # Test a blocking get self.do_blocking_test(q.get, (), q.put, ('empty',)) self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) def worker(self, q): while True: x = q.get() if x < 0: q.task_done() return with self.cumlock: self.cum += x q.task_done() def queue_join_test(self, q): self.cum = 0 for i in (0,1): threading.Thread(target=self.worker, args=(q,)).start() for i in range(100): q.put(i) q.join() self.assertEqual(self.cum, sum(range(100)), "q.join() did not block until all tasks were done") for i in (0,1): q.put(-1) # instruct the threads to close q.join() # verify that you can join twice def test_queue_task_done(self): # Test to make sure a queue task completed successfully. q = self.type2test() try: q.task_done() except ValueError: pass else: self.fail("Did not detect task count going negative") def test_queue_join(self): # Test that a queue join()s successfully, and before anything else # (done twice for insurance). q = self.type2test() self.queue_join_test(q) self.queue_join_test(q) try: q.task_done() except ValueError: pass else: self.fail("Did not detect task count going negative") def test_simple_queue(self): # Do it a couple of times on the same queue. # Done twice to make sure works with same instance reused. q = self.type2test(QUEUE_SIZE) self.simple_queue_test(q) self.simple_queue_test(q) def test_negative_timeout_raises_exception(self): q = self.type2test(QUEUE_SIZE) with self.assertRaises(ValueError): q.put(1, timeout=-1) with self.assertRaises(ValueError): q.get(1, timeout=-1) def test_nowait(self): q = self.type2test(QUEUE_SIZE) for i in range(QUEUE_SIZE): q.put_nowait(1) with self.assertRaises(queue.Full): q.put_nowait(1) for i in range(QUEUE_SIZE): q.get_nowait() with self.assertRaises(queue.Empty): q.get_nowait() def test_shrinking_queue(self): # issue 10110 q = self.type2test(3) q.put(1) q.put(2) q.put(3) with self.assertRaises(queue.Full): q.put_nowait(4) self.assertEqual(q.qsize(), 3) # === changed for warcprox to make test pass on python2 === # q.maxsize = 2 # shrink the queue q.maxsize = 3 # shrink the queue # === end changed for warcprox === with self.assertRaises(queue.Full): q.put_nowait(4) # === commented out for warcprox === # class QueueTest(BaseQueueTestMixin, unittest.TestCase): # type2test = queue.Queue # # class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase): # type2test = queue.LifoQueue # # class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase): # type2test = queue.PriorityQueue # # === end commented out for warcprox === # === warcprox additions === import warcprox class TimestampedQueueTest(BaseQueueTestMixin, unittest.TestCase): type2test = warcprox.TimestampedQueue # === end warcprox additions # === commented out for warcprox === # # A Queue subclass that can provoke failure at a moment's notice :) # class FailingQueueException(Exception): # pass # # class FailingQueue(queue.Queue): # def __init__(self, *args): # self.fail_next_put = False # self.fail_next_get = False # queue.Queue.__init__(self, *args) # def _put(self, item): # if self.fail_next_put: # self.fail_next_put = False # raise FailingQueueException("You Lose") # return queue.Queue._put(self, item) # def _get(self): # if self.fail_next_get: # self.fail_next_get = False # raise FailingQueueException("You Lose") # return queue.Queue._get(self) # # class FailingQueueTest(BlockingTestMixin, unittest.TestCase): # # def failing_queue_test(self, q): # if q.qsize(): # raise RuntimeError("Call this function with an empty queue") # for i in range(QUEUE_SIZE-1): # q.put(i) # # Test a failing non-blocking put. # q.fail_next_put = True # try: # q.put("oops", block=0) # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # q.fail_next_put = True # try: # q.put("oops", timeout=0.1) # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # q.put("last") # self.assertTrue(qfull(q), "Queue should be full") # # Test a failing blocking put # q.fail_next_put = True # try: # self.do_blocking_test(q.put, ("full",), q.get, ()) # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # # Check the Queue isn't damaged. # # put failed, but get succeeded - re-add # q.put("last") # # Test a failing timeout put # q.fail_next_put = True # try: # self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), # FailingQueueException) # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # # Check the Queue isn't damaged. # # put failed, but get succeeded - re-add # q.put("last") # self.assertTrue(qfull(q), "Queue should be full") # q.get() # self.assertTrue(not qfull(q), "Queue should not be full") # q.put("last") # self.assertTrue(qfull(q), "Queue should be full") # # Test a blocking put # self.do_blocking_test(q.put, ("full",), q.get, ()) # # Empty it # for i in range(QUEUE_SIZE): # q.get() # self.assertTrue(not q.qsize(), "Queue should be empty") # q.put("first") # q.fail_next_get = True # try: # q.get() # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # self.assertTrue(q.qsize(), "Queue should not be empty") # q.fail_next_get = True # try: # q.get(timeout=0.1) # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # self.assertTrue(q.qsize(), "Queue should not be empty") # q.get() # self.assertTrue(not q.qsize(), "Queue should be empty") # q.fail_next_get = True # try: # self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), # FailingQueueException) # self.fail("The queue didn't fail when it should have") # except FailingQueueException: # pass # # put succeeded, but get failed. # self.assertTrue(q.qsize(), "Queue should not be empty") # q.get() # self.assertTrue(not q.qsize(), "Queue should be empty") # # def test_failing_queue(self): # # Test to make sure a queue is functioning correctly. # # Done twice to the same instance. # q = FailingQueue(QUEUE_SIZE) # self.failing_queue_test(q) # self.failing_queue_test(q) # === end commented out for warcprox === if __name__ == "__main__": unittest.main()