From bed04af440c159da1a8f8ee4cdbd406f38e6678a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 18 Jan 2018 11:12:52 -0800 Subject: [PATCH] postfetch chain info for /status and service reg including number of queued urls for each processor --- tests/test_warcprox.py | 7 +++++-- warcprox/controller.py | 23 ++++++++++++++++++++++- warcprox/warcproxy.py | 18 +++++++++++++----- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0c250e7..5e13f95 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -347,6 +347,9 @@ def warcprox_(request): logging.info('changing to working directory %r', work_dir) os.chdir(work_dir) + # we can't wait around all day in the tests + warcprox.BaseBatchPostfetchProcessor.MAX_BATCH_SEC = 0.5 + argv = ['warcprox', '--method-filter=GET', '--method-filter=POST', @@ -1319,7 +1322,7 @@ def test_status_api(warcprox_): 'queued_urls', 'queue_max_size', 'seconds_behind', 'threads', 'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min', 'active_requests','start_time','urls_processed', - 'warc_bytes_written'} + 'warc_bytes_written','postfetch_chain',} assert status['role'] == 'warcprox' assert status['version'] == warcprox.__version__ assert status['port'] == warcprox_.proxy.server_port @@ -1341,7 +1344,7 @@ def test_svcreg_status(warcprox_): 'first_heartbeat', 'ttl', 'last_heartbeat', 'threads', 'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min', 'active_requests','start_time','urls_processed', - 'warc_bytes_written',} + 'warc_bytes_written','postfetch_chain',} assert status['role'] == 'warcprox' assert status['version'] == warcprox.__version__ assert status['port'] == warcprox_.proxy.server_port diff --git a/warcprox/controller.py b/warcprox/controller.py index 4a0c09a..2d27958 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -132,7 +132,8 @@ class WarcproxController(object): self.stats_processor = Factory.stats_processor(self.options) - self.proxy = warcprox.warcproxy.WarcProxy(self.stats_processor, options) + self.proxy = warcprox.warcproxy.WarcProxy( + self.stats_processor, self.postfetch_status, options) self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) @@ -140,7 +141,27 @@ class WarcproxController(object): self.service_registry = Factory.service_registry(options) + def postfetch_status(self): + result = {'postfetch_chain': []} + for processor in self._postfetch_chain: + if processor.__class__ == warcprox.ListenerPostfetchProcessor: + name = processor.listener.__class__.__name__ + else: + name = processor.__class__.__name__ + + queued = len(processor.inq.queue) + if hasattr(processor, 'batch'): + queued += len(processor.batch) + + result['postfetch_chain'].append({ + 'processor': name, + 'queued_urls': len(processor.inq.queue)}) + return result + def chain(self, processor0, processor1): + ''' + Sets `processor0.outq` = `processor1.inq` = `queue.Queue()` + ''' assert not processor0.outq assert not processor1.inq q = warcprox.TimestampedQueue(maxsize=self.options.queue_size) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 7bccba7..8850821 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -377,7 +377,11 @@ class RecordedUrl: class SingleThreadedWarcProxy(http_server.HTTPServer, object): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") - def __init__(self, stats_db=None, options=warcprox.Options()): + def __init__( + self, stats_db=None, status_callback=None, + options=warcprox.Options()): + self.status_callback = status_callback + self.stats_db = stats_db self.options = options server_address = ( @@ -406,8 +410,6 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.recorded_url_q = warcprox.TimestampedQueue( maxsize=options.queue_size or 1000) - self.stats_db = stats_db - self.running_stats = warcprox.stats.RunningStats() def status(self): @@ -443,14 +445,20 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): 'urls_per_sec': urls_per_sec, 'warc_bytes_per_sec': warc_bytes_per_sec, } + # gets postfetch chain status from the controller + if self.status_callback: + result.update(self.status_callback()) return result class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") - def __init__(self, stats_db=None, options=warcprox.Options()): + def __init__( + self, stats_db=None, status_callback=None, + options=warcprox.Options()): warcprox.mitmproxy.PooledMitmProxy.__init__(self, options) - SingleThreadedWarcProxy.__init__(self, stats_db, options) + SingleThreadedWarcProxy.__init__( + self, stats_db, status_callback, options) def server_activate(self): http_server.HTTPServer.server_activate(self)