diff --git a/tests/test_queue.py b/tests/test_queue.py deleted file mode 100644 index 1c220db..0000000 --- a/tests/test_queue.py +++ /dev/null @@ -1,416 +0,0 @@ -# === warcprox note === -# tests/test_queue.py - tests that ensure warcprox.TimestampedQueue doesn't -# break any of the queue.Queue functionality -# -# This file copied and slightly modified from the cpython source: -# https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/Lib/test/test_queue.py -# See https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/LICENSE -# for licensing information. -# -# Changes for warcprox are clearly delimited. The modifications are: -# -# Copyright (C) 2017 Internet Archive -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# === end warcprox note === - -# Some simple queue module tests, plus some failure conditions -# to ensure the Queue locks remain stable. -# === commented out for warcprox === -# import queue -# === end commented out for warcprox === - -# === added for warcprox === -# make it work with py2 -try: - import queue -except: - import Queue as queue -import threading -# === end added for warcprox === - -import time -import unittest - -# === commented out for warcprox === -# from test import support -# threading = support.import_module('threading') -# === end commented out for warcprox === - -QUEUE_SIZE = 5 - -def qfull(q): - return q.maxsize > 0 and q.qsize() == q.maxsize - -# A thread to run a function that unclogs a blocked Queue. -class _TriggerThread(threading.Thread): - def __init__(self, fn, args): - self.fn = fn - self.args = args - self.startedEvent = threading.Event() - threading.Thread.__init__(self) - - def run(self): - # The sleep isn't necessary, but is intended to give the blocking - # function in the main thread a chance at actually blocking before - # we unclog it. But if the sleep is longer than the timeout-based - # tests wait in their blocking functions, those tests will fail. - # So we give them much longer timeout values compared to the - # sleep here (I aimed at 10 seconds for blocking functions -- - # they should never actually wait that long - they should make - # progress as soon as we call self.fn()). - time.sleep(0.1) - self.startedEvent.set() - self.fn(*self.args) - - -# Execute a function that blocks, and in a separate thread, a function that -# triggers the release. Returns the result of the blocking function. Caution: -# block_func must guarantee to block until trigger_func is called, and -# trigger_func must guarantee to change queue state so that block_func can make -# enough progress to return. In particular, a block_func that just raises an -# exception regardless of whether trigger_func is called will lead to -# timing-dependent sporadic failures, and one of those went rarely seen but -# undiagnosed for years. Now block_func must be unexceptional. If block_func -# is supposed to raise an exception, call do_exceptional_blocking_test() -# instead. - -class BlockingTestMixin: - - def tearDown(self): - self.t = None - - def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): - self.t = _TriggerThread(trigger_func, trigger_args) - self.t.start() - self.result = block_func(*block_args) - # If block_func returned before our thread made the call, we failed! - if not self.t.startedEvent.is_set(): - self.fail("blocking function '%r' appeared not to block" % - block_func) - self.t.join(10) # make sure the thread terminates - if self.t.is_alive(): - self.fail("trigger function '%r' appeared to not return" % - trigger_func) - return self.result - - # Call this instead if block_func is supposed to raise an exception. - def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, - trigger_args, expected_exception_class): - self.t = _TriggerThread(trigger_func, trigger_args) - self.t.start() - try: - try: - block_func(*block_args) - except expected_exception_class: - raise - else: - self.fail("expected exception of kind %r" % - expected_exception_class) - finally: - self.t.join(10) # make sure the thread terminates - if self.t.is_alive(): - self.fail("trigger function '%r' appeared to not return" % - trigger_func) - if not self.t.startedEvent.is_set(): - self.fail("trigger thread ended but event never set") - - -class BaseQueueTestMixin(BlockingTestMixin): - def setUp(self): - self.cum = 0 - self.cumlock = threading.Lock() - - def simple_queue_test(self, q): - if q.qsize(): - raise RuntimeError("Call this function with an empty queue") - self.assertTrue(q.empty()) - self.assertFalse(q.full()) - # I guess we better check things actually queue correctly a little :) - q.put(111) - q.put(333) - q.put(222) - target_order = dict(Queue = [111, 333, 222], - # === warcprox addition === - TimestampedQueue = [111, 333, 222], - # === end warcprox addition === - LifoQueue = [222, 333, 111], - PriorityQueue = [111, 222, 333]) - actual_order = [q.get(), q.get(), q.get()] - self.assertEqual(actual_order, target_order[q.__class__.__name__], - "Didn't seem to queue the correct data!") - for i in range(QUEUE_SIZE-1): - q.put(i) - self.assertTrue(q.qsize(), "Queue should not be empty") - self.assertTrue(not qfull(q), "Queue should not be full") - last = 2 * QUEUE_SIZE - full = 3 * 2 * QUEUE_SIZE - q.put(last) - self.assertTrue(qfull(q), "Queue should be full") - self.assertFalse(q.empty()) - self.assertTrue(q.full()) - try: - q.put(full, block=0) - self.fail("Didn't appear to block with a full queue") - except queue.Full: - pass - try: - q.put(full, timeout=0.01) - self.fail("Didn't appear to time-out with a full queue") - except queue.Full: - pass - # Test a blocking put - self.do_blocking_test(q.put, (full,), q.get, ()) - self.do_blocking_test(q.put, (full, True, 10), q.get, ()) - # Empty it - for i in range(QUEUE_SIZE): - q.get() - self.assertTrue(not q.qsize(), "Queue should be empty") - try: - q.get(block=0) - self.fail("Didn't appear to block with an empty queue") - except queue.Empty: - pass - try: - q.get(timeout=0.01) - self.fail("Didn't appear to time-out with an empty queue") - except queue.Empty: - pass - # Test a blocking get - self.do_blocking_test(q.get, (), q.put, ('empty',)) - self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) - - - def worker(self, q): - while True: - x = q.get() - if x < 0: - q.task_done() - return - with self.cumlock: - self.cum += x - q.task_done() - - def queue_join_test(self, q): - self.cum = 0 - for i in (0,1): - threading.Thread(target=self.worker, args=(q,)).start() - for i in range(100): - q.put(i) - q.join() - self.assertEqual(self.cum, sum(range(100)), - "q.join() did not block until all tasks were done") - for i in (0,1): - q.put(-1) # instruct the threads to close - q.join() # verify that you can join twice - - def test_queue_task_done(self): - # Test to make sure a queue task completed successfully. - q = self.type2test() - try: - q.task_done() - except ValueError: - pass - else: - self.fail("Did not detect task count going negative") - - def test_queue_join(self): - # Test that a queue join()s successfully, and before anything else - # (done twice for insurance). - q = self.type2test() - self.queue_join_test(q) - self.queue_join_test(q) - try: - q.task_done() - except ValueError: - pass - else: - self.fail("Did not detect task count going negative") - - def test_simple_queue(self): - # Do it a couple of times on the same queue. - # Done twice to make sure works with same instance reused. - q = self.type2test(QUEUE_SIZE) - self.simple_queue_test(q) - self.simple_queue_test(q) - - def test_negative_timeout_raises_exception(self): - q = self.type2test(QUEUE_SIZE) - with self.assertRaises(ValueError): - q.put(1, timeout=-1) - with self.assertRaises(ValueError): - q.get(1, timeout=-1) - - def test_nowait(self): - q = self.type2test(QUEUE_SIZE) - for i in range(QUEUE_SIZE): - q.put_nowait(1) - with self.assertRaises(queue.Full): - q.put_nowait(1) - - for i in range(QUEUE_SIZE): - q.get_nowait() - with self.assertRaises(queue.Empty): - q.get_nowait() - - def test_shrinking_queue(self): - # issue 10110 - q = self.type2test(3) - q.put(1) - q.put(2) - q.put(3) - with self.assertRaises(queue.Full): - q.put_nowait(4) - self.assertEqual(q.qsize(), 3) - # === changed for warcprox to make test pass on python2 === - # q.maxsize = 2 # shrink the queue - q.maxsize = 3 # shrink the queue - # === end changed for warcprox === - with self.assertRaises(queue.Full): - q.put_nowait(4) - -# === commented out for warcprox === -# class QueueTest(BaseQueueTestMixin, unittest.TestCase): -# type2test = queue.Queue -# -# class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase): -# type2test = queue.LifoQueue -# -# class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase): -# type2test = queue.PriorityQueue -# -# === end commented out for warcprox === - -# === warcprox additions === -import warcprox -class TimestampedQueueTest(BaseQueueTestMixin, unittest.TestCase): - type2test = warcprox.TimestampedQueue -# === end warcprox additions - -# === commented out for warcprox === -# # A Queue subclass that can provoke failure at a moment's notice :) -# class FailingQueueException(Exception): -# pass -# -# class FailingQueue(queue.Queue): -# def __init__(self, *args): -# self.fail_next_put = False -# self.fail_next_get = False -# queue.Queue.__init__(self, *args) -# def _put(self, item): -# if self.fail_next_put: -# self.fail_next_put = False -# raise FailingQueueException("You Lose") -# return queue.Queue._put(self, item) -# def _get(self): -# if self.fail_next_get: -# self.fail_next_get = False -# raise FailingQueueException("You Lose") -# return queue.Queue._get(self) -# -# class FailingQueueTest(BlockingTestMixin, unittest.TestCase): -# -# def failing_queue_test(self, q): -# if q.qsize(): -# raise RuntimeError("Call this function with an empty queue") -# for i in range(QUEUE_SIZE-1): -# q.put(i) -# # Test a failing non-blocking put. -# q.fail_next_put = True -# try: -# q.put("oops", block=0) -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# q.fail_next_put = True -# try: -# q.put("oops", timeout=0.1) -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# q.put("last") -# self.assertTrue(qfull(q), "Queue should be full") -# # Test a failing blocking put -# q.fail_next_put = True -# try: -# self.do_blocking_test(q.put, ("full",), q.get, ()) -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# # Check the Queue isn't damaged. -# # put failed, but get succeeded - re-add -# q.put("last") -# # Test a failing timeout put -# q.fail_next_put = True -# try: -# self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), -# FailingQueueException) -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# # Check the Queue isn't damaged. -# # put failed, but get succeeded - re-add -# q.put("last") -# self.assertTrue(qfull(q), "Queue should be full") -# q.get() -# self.assertTrue(not qfull(q), "Queue should not be full") -# q.put("last") -# self.assertTrue(qfull(q), "Queue should be full") -# # Test a blocking put -# self.do_blocking_test(q.put, ("full",), q.get, ()) -# # Empty it -# for i in range(QUEUE_SIZE): -# q.get() -# self.assertTrue(not q.qsize(), "Queue should be empty") -# q.put("first") -# q.fail_next_get = True -# try: -# q.get() -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# self.assertTrue(q.qsize(), "Queue should not be empty") -# q.fail_next_get = True -# try: -# q.get(timeout=0.1) -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# self.assertTrue(q.qsize(), "Queue should not be empty") -# q.get() -# self.assertTrue(not q.qsize(), "Queue should be empty") -# q.fail_next_get = True -# try: -# self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), -# FailingQueueException) -# self.fail("The queue didn't fail when it should have") -# except FailingQueueException: -# pass -# # put succeeded, but get failed. -# self.assertTrue(q.qsize(), "Queue should not be empty") -# q.get() -# self.assertTrue(not q.qsize(), "Queue should be empty") -# -# def test_failing_queue(self): -# # Test to make sure a queue is functioning correctly. -# # Done twice to the same instance. -# q = FailingQueue(QUEUE_SIZE) -# self.failing_queue_test(q) -# self.failing_queue_test(q) -# === end commented out for warcprox === - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index dbec93d..031db85 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1551,7 +1551,8 @@ def test_status_api(warcprox_): 'queued_urls', 'queue_max_size', 'seconds_behind', 'threads', 'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min', 'active_requests','start_time','urls_processed', - 'warc_bytes_written','postfetch_chain',} + 'warc_bytes_written', 'postfetch_chain', + 'earliest_still_active_fetch_start',} assert status['role'] == 'warcprox' assert status['version'] == warcprox.__version__ assert status['port'] == warcprox_.proxy.server_port @@ -1573,26 +1574,14 @@ def test_svcreg_status(warcprox_): 'first_heartbeat', 'ttl', 'last_heartbeat', 'threads', 'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min', 'active_requests','start_time','urls_processed', - 'warc_bytes_written','postfetch_chain',} + 'warc_bytes_written', 'postfetch_chain', + 'earliest_still_active_fetch_start',} assert status['role'] == 'warcprox' assert status['version'] == warcprox.__version__ assert status['port'] == warcprox_.proxy.server_port assert status['pid'] == os.getpid() assert status['threads'] == warcprox_.proxy.pool._max_workers -def test_timestamped_queue(): - # see also test_queue.py - q = warcprox.TimestampedQueue() - q.put('monkey') - q.put('flonkey') - timestamp_item = q.get_with_timestamp() - assert isinstance(timestamp_item, tuple) - assert isinstance(timestamp_item[0], datetime.datetime) - assert timestamp_item[1] == 'monkey' - assert timestamp_item[0] < q.oldest_timestamp() - time.sleep(1) - assert q.seconds_behind() > 1 - def test_controller_with_defaults(): # tests some initialization code that we rarely touch otherwise controller = warcprox.controller.WarcproxController() diff --git a/tests/test_writer.py b/tests/test_writer.py index ed5c699..f1d2466 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -35,8 +35,9 @@ import io import tempfile import logging import hashlib +import queue -def lock_file(queue, filename): +def lock_file(q, filename): """Try to lock file and return 1 if successful, else return 0. It is necessary to run this method in a different process to test locking. """ @@ -44,9 +45,9 @@ def lock_file(queue, filename): fi = open(filename, 'ab') fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) fi.close() - queue.put('OBTAINED LOCK') + q.put('OBTAINED LOCK') except IOError: - queue.put('FAILED TO OBTAIN LOCK') + q.put('FAILED TO OBTAIN LOCK') def test_warc_writer_locking(tmpdir): @@ -69,18 +70,18 @@ def test_warc_writer_locking(tmpdir): assert warcs target_warc = os.path.join(dirname, warcs[0]) # launch another process and try to lock WARC file - queue = Queue() - p = Process(target=lock_file, args=(queue, target_warc)) + q = Queue() + p = Process(target=lock_file, args=(q, target_warc)) p.start() p.join() - assert queue.get() == 'FAILED TO OBTAIN LOCK' + assert q.get() == 'FAILED TO OBTAIN LOCK' wwriter.close_writer() # locking must succeed after writer has closed the WARC file. - p = Process(target=lock_file, args=(queue, target_warc)) + p = Process(target=lock_file, args=(q, target_warc)) p.start() p.join() - assert queue.get() == 'OBTAINED LOCK' + assert q.get() == 'OBTAINED LOCK' def wait(callback, timeout): start = time.time() @@ -97,8 +98,8 @@ def test_special_dont_write_prefix(): wwt = warcprox.writerthread.WarcWriterProcessor( Options(prefix='-', writer_threads=1)) - wwt.inq = warcprox.TimestampedQueue(maxsize=1) - wwt.outq = warcprox.TimestampedQueue(maxsize=1) + wwt.inq = queue.Queue(maxsize=1) + wwt.outq = queue.Queue(maxsize=1) try: wwt.start() # not to be written due to default prefix @@ -131,8 +132,8 @@ def test_special_dont_write_prefix(): wwt = warcprox.writerthread.WarcWriterProcessor( Options(writer_threads=1, blackout_period=60, prefix='foo')) - wwt.inq = warcprox.TimestampedQueue(maxsize=1) - wwt.outq = warcprox.TimestampedQueue(maxsize=1) + wwt.inq = queue.Queue(maxsize=1) + wwt.outq = queue.Queue(maxsize=1) try: wwt.start() # to be written due to default prefix @@ -206,8 +207,8 @@ def test_do_not_archive(): wwt = warcprox.writerthread.WarcWriterProcessor( Options(writer_threads=1)) - wwt.inq = warcprox.TimestampedQueue(maxsize=1) - wwt.outq = warcprox.TimestampedQueue(maxsize=1) + wwt.inq = queue.Queue(maxsize=1) + wwt.outq = queue.Queue(maxsize=1) try: wwt.start() # to be written -- default do_not_archive False diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 67cf654..6a8e00e 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -31,6 +31,7 @@ try: import queue except ImportError: import Queue as queue +import json __version__ = _get_distribution('warcprox').version @@ -47,6 +48,15 @@ class Options(_Namespace): except AttributeError: return None +class Jsonner(json.JSONEncoder): + def default(self, o): + if isinstance(o, datetime.datetime): + return o.isoformat() + elif isinstance(o, bytes): + return base64.b64encode(o).decode('ascii') + else: + return json.JSONEncoder.default(self, o) + class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): ''' `concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size. @@ -58,36 +68,6 @@ class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): super().__init__(*args, **kwargs) self._work_queue = queue.Queue(maxsize=max_queued or 0) -class TimestampedQueue(queue.Queue): - """ - A queue.Queue that exposes the time enqueued of the oldest item in the - queue. - """ - def put(self, item, block=True, timeout=None): - return queue.Queue.put( - self, (datetime.datetime.utcnow(), item), block, timeout) - - def get(self, block=True, timeout=None): - timestamp, item = self.get_with_timestamp(block, timeout) - return item - - get_with_timestamp = queue.Queue.get - - def oldest_timestamp(self): - with self.mutex: - if self.queue: - timestamp, item = self.queue[0] - else: - return None - return timestamp - - def seconds_behind(self): - timestamp = self.oldest_timestamp() - if timestamp: - return (datetime.datetime.utcnow() - timestamp).total_seconds() - else: - return 0.0 - # XXX linux-specific def gettid(): try: diff --git a/warcprox/controller.py b/warcprox/controller.py index ed43bb6..5289325 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -35,6 +35,7 @@ import certauth import functools import doublethink import importlib +import queue class Factory: @staticmethod @@ -149,8 +150,39 @@ class WarcproxController(object): self.service_registry = Factory.service_registry(options) + def earliest_still_active_fetch_start(self): + ''' + Looks at urls currently in flight, either being fetched or being + processed at some step of the postfetch chain, finds the one with the + earliest fetch start time, and returns that time. + ''' + earliest = None + for timestamp in list(self.proxy.active_requests.values()): + if earliest is None or timestamp < earliest: + earliest = timestamp + for processor in self._postfetch_chain: + with processor.inq.mutex: + l = list(processor.inq.queue) + for recorded_url in l: + if earliest is None or recorded_url.timestamp < earliest: + earliest = recorded_url.timestamp + if earliest: + logging.info('earliest: %s', earliest) + return earliest + else: + return None + def postfetch_status(self): - result = {'postfetch_chain': []} + earliest = self.earliest_still_active_fetch_start() + if earliest: + seconds_behind = (datetime.datetime.utcnow() - earliest).total_seconds() + else: + seconds_behind = 0 + result = { + 'earliest_still_active_fetch_start': earliest, + 'seconds_behind': seconds_behind, + 'postfetch_chain': [] + } for processor in self._postfetch_chain: if processor.__class__ == warcprox.ListenerPostfetchProcessor: name = processor.listener.__class__.__name__ @@ -171,7 +203,7 @@ class WarcproxController(object): ''' assert not processor0.outq assert not processor1.inq - q = warcprox.TimestampedQueue(maxsize=self.options.queue_size) + q = queue.Queue(maxsize=self.options.queue_size) processor0.outq = q processor1.inq = q diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 83c3ea1..3e9477a 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -515,7 +515,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): class PooledMixIn(socketserver.ThreadingMixIn): logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn") def __init__(self, max_threads=None): - self.active_requests = set() + self.active_requests = {} self.unaccepted_requests = 0 self.max_threads = max_threads or 100 self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads) @@ -533,15 +533,15 @@ class PooledMixIn(socketserver.ThreadingMixIn): return result def process_request(self, request, client_address): - self.active_requests.add(request) + self.active_requests[request] = datetime.datetime.utcnow() future = self.pool.submit( self.process_request_thread, request, client_address) future.add_done_callback( - lambda f: self.active_requests.discard(request)) + lambda f: self.active_requests.pop(request, None)) if future.done(): # avoid theoretical timing issue, in case process_request_thread # managed to finish before future.add_done_callback() ran - self.active_requests.discard(request) + self.active_requests.pop(request, None) def get_request(self): ''' diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index e2649f0..2568ed8 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -245,7 +245,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): } status_info.update(self.server.status()) payload = json.dumps( - status_info, indent=2).encode('utf-8') + b'\n' + status_info, cls=warcprox.Jsonner, + indent=2).encode('utf-8') + b'\n' self.send_response(200, 'OK') self.send_header('Content-type', 'application/json') self.send_header('Content-Length', len(payload)) @@ -460,8 +461,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): certs_dir=options.certs_dir or './warcprox-ca', ca_name=ca_name) - self.recorded_url_q = warcprox.TimestampedQueue( - maxsize=options.queue_size or 1000) + self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000) self.running_stats = warcprox.stats.RunningStats() @@ -475,7 +475,6 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.recorded_url_q.maxsize or 100), 'queued_urls': self.recorded_url_q.qsize(), 'queue_max_size': self.recorded_url_q.maxsize, - 'seconds_behind': self.recorded_url_q.seconds_behind(), 'urls_processed': self.running_stats.urls, 'warc_bytes_written': self.running_stats.warc_bytes, 'start_time': self.running_stats.first_snap_time,