mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
include warc writer worker threads in profiling
This commit is contained in:
parent
cc8fb4c608
commit
ea4fc0f10a
2
setup.py
2
setup.py
@ -40,7 +40,7 @@ except:
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='warcprox',
|
name='warcprox',
|
||||||
version='2.4b2.dev166',
|
version='2.4b2.dev167',
|
||||||
description='WARC writing MITM HTTP/S proxy',
|
description='WARC writing MITM HTTP/S proxy',
|
||||||
url='https://github.com/internetarchive/warcprox',
|
url='https://github.com/internetarchive/warcprox',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
@ -439,3 +439,18 @@ class WarcproxController(object):
|
|||||||
self.logger.notice(
|
self.logger.notice(
|
||||||
'performance profile of %s:\n%s', processor,
|
'performance profile of %s:\n%s', processor,
|
||||||
buf.getvalue())
|
buf.getvalue())
|
||||||
|
|
||||||
|
if hasattr(processor, 'thread_profilers'):
|
||||||
|
files = []
|
||||||
|
for th_id, profiler in processor.thread_profilers.items():
|
||||||
|
file = os.path.join(tmpdir, '%s.dat' % th_id)
|
||||||
|
profiler.dump_stats(file)
|
||||||
|
files.append(file)
|
||||||
|
buf = io.StringIO()
|
||||||
|
stats = pstats.Stats(*files, stream=buf)
|
||||||
|
stats.sort_stats('cumulative')
|
||||||
|
stats.print_stats(0.1)
|
||||||
|
self.logger.notice(
|
||||||
|
'aggregate performance profile of %s worker '
|
||||||
|
'threads of %s:\n%s',
|
||||||
|
len(files), processor, buf.getvalue())
|
||||||
|
@ -31,6 +31,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
import warcprox
|
import warcprox
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
|
import threading
|
||||||
|
|
||||||
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
|
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
|
||||||
logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor")
|
logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor")
|
||||||
@ -43,6 +44,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
|
|||||||
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
|
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
|
||||||
|
|
||||||
# set max_queued small, because self.inq is already handling queueing
|
# set max_queued small, because self.inq is already handling queueing
|
||||||
|
self.thread_local = threading.local()
|
||||||
|
self.thread_profilers = {}
|
||||||
# for us; but give it a little breathing room to make sure it can keep
|
# for us; but give it a little breathing room to make sure it can keep
|
||||||
# worker threads busy
|
# worker threads busy
|
||||||
self.pool = warcprox.ThreadPoolExecutor(
|
self.pool = warcprox.ThreadPoolExecutor(
|
||||||
@ -58,10 +61,26 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
|
|||||||
try:
|
try:
|
||||||
recorded_url = self.inq.get(block=True, timeout=0.5)
|
recorded_url = self.inq.get(block=True, timeout=0.5)
|
||||||
self.batch.add(recorded_url)
|
self.batch.add(recorded_url)
|
||||||
self.pool.submit(self._process_url, recorded_url)
|
self.pool.submit(self._wrap_process_url, recorded_url)
|
||||||
finally:
|
finally:
|
||||||
self.writer_pool.maybe_idle_rollover()
|
self.writer_pool.maybe_idle_rollover()
|
||||||
|
|
||||||
|
def _wrap_process_url(self, recorded_url):
|
||||||
|
if not getattr(self.thread_local, 'name_set', False):
|
||||||
|
threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid()
|
||||||
|
self.thread_local.name_set = True
|
||||||
|
if self.options.profile:
|
||||||
|
import cProfile
|
||||||
|
if not hasattr(self.thread_local, 'profiler'):
|
||||||
|
self.thread_local.profiler = cProfile.Profile()
|
||||||
|
tid = threading.current_thread().ident
|
||||||
|
self.thread_profilers[tid] = self.thread_local.profiler
|
||||||
|
self.thread_local.profiler.enable()
|
||||||
|
self._process_url(recorded_url)
|
||||||
|
self.thread_local.profiler.disable()
|
||||||
|
else:
|
||||||
|
self._process_url(recorded_url)
|
||||||
|
|
||||||
def _process_url(self, recorded_url):
|
def _process_url(self, recorded_url):
|
||||||
try:
|
try:
|
||||||
records = []
|
records = []
|
||||||
|
Loading…
x
Reference in New Issue
Block a user