postfetch chain info for /status and service reg

including number of queued urls for each processor
This commit is contained in:
Noah Levitt 2018-01-18 11:12:52 -08:00
parent 93e2baab8f
commit bed04af440
3 changed files with 40 additions and 8 deletions

View File

@ -347,6 +347,9 @@ def warcprox_(request):
logging.info('changing to working directory %r', work_dir)
os.chdir(work_dir)
# we can't wait around all day in the tests
warcprox.BaseBatchPostfetchProcessor.MAX_BATCH_SEC = 0.5
argv = ['warcprox',
'--method-filter=GET',
'--method-filter=POST',
@ -1319,7 +1322,7 @@ def test_status_api(warcprox_):
'queued_urls', 'queue_max_size', 'seconds_behind', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min',
'active_requests','start_time','urls_processed',
'warc_bytes_written'}
'warc_bytes_written','postfetch_chain',}
assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port
@ -1341,7 +1344,7 @@ def test_svcreg_status(warcprox_):
'first_heartbeat', 'ttl', 'last_heartbeat', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests',
'rates_15min', 'active_requests','start_time','urls_processed',
'warc_bytes_written',}
'warc_bytes_written','postfetch_chain',}
assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port

View File

@ -132,7 +132,8 @@ class WarcproxController(object):
self.stats_processor = Factory.stats_processor(self.options)
self.proxy = warcprox.warcproxy.WarcProxy(self.stats_processor, options)
self.proxy = warcprox.warcproxy.WarcProxy(
self.stats_processor, self.postfetch_status, options)
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
@ -140,7 +141,27 @@ class WarcproxController(object):
self.service_registry = Factory.service_registry(options)
def postfetch_status(self):
result = {'postfetch_chain': []}
for processor in self._postfetch_chain:
if processor.__class__ == warcprox.ListenerPostfetchProcessor:
name = processor.listener.__class__.__name__
else:
name = processor.__class__.__name__
queued = len(processor.inq.queue)
if hasattr(processor, 'batch'):
queued += len(processor.batch)
result['postfetch_chain'].append({
'processor': name,
'queued_urls': len(processor.inq.queue)})
return result
def chain(self, processor0, processor1):
'''
Sets `processor0.outq` = `processor1.inq` = `queue.Queue()`
'''
assert not processor0.outq
assert not processor1.inq
q = warcprox.TimestampedQueue(maxsize=self.options.queue_size)

View File

@ -377,7 +377,11 @@ class RecordedUrl:
class SingleThreadedWarcProxy(http_server.HTTPServer, object):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, stats_db=None, options=warcprox.Options()):
def __init__(
self, stats_db=None, status_callback=None,
options=warcprox.Options()):
self.status_callback = status_callback
self.stats_db = stats_db
self.options = options
server_address = (
@ -406,8 +410,6 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.stats_db = stats_db
self.running_stats = warcprox.stats.RunningStats()
def status(self):
@ -443,14 +445,20 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec,
}
# gets postfetch chain status from the controller
if self.status_callback:
result.update(self.status_callback())
return result
class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, stats_db=None, options=warcprox.Options()):
def __init__(
self, stats_db=None, status_callback=None,
options=warcprox.Options()):
warcprox.mitmproxy.PooledMitmProxy.__init__(self, options)
SingleThreadedWarcProxy.__init__(self, stats_db, options)
SingleThreadedWarcProxy.__init__(
self, stats_db, status_callback, options)
def server_activate(self):
http_server.HTTPServer.server_activate(self)