diff --git a/setup.py b/setup.py index a00566f..5522b2a 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev166', + version='2.4b2.dev167', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/controller.py b/warcprox/controller.py index 7735df9..cfffd06 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -439,3 +439,18 @@ class WarcproxController(object): self.logger.notice( 'performance profile of %s:\n%s', processor, buf.getvalue()) + + if hasattr(processor, 'thread_profilers'): + files = [] + for th_id, profiler in processor.thread_profilers.items(): + file = os.path.join(tmpdir, '%s.dat' % th_id) + profiler.dump_stats(file) + files.append(file) + buf = io.StringIO() + stats = pstats.Stats(*files, stream=buf) + stats.sort_stats('cumulative') + stats.print_stats(0.1) + self.logger.notice( + 'aggregate performance profile of %s worker ' + 'threads of %s:\n%s', + len(files), processor, buf.getvalue()) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 03aee9e..f4de35d 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -31,6 +31,7 @@ import logging import time import warcprox from concurrent import futures +import threading class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") @@ -43,6 +44,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.method_filter = set(method.upper() for method in self.options.method_filter or []) # set max_queued small, because self.inq is already handling queueing + self.thread_local = threading.local() + self.thread_profilers = {} # for us; but give it a little breathing room to make sure it can keep # worker threads busy self.pool = warcprox.ThreadPoolExecutor( @@ -58,10 +61,26 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): try: recorded_url = self.inq.get(block=True, timeout=0.5) self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) + self.pool.submit(self._wrap_process_url, recorded_url) finally: self.writer_pool.maybe_idle_rollover() + def _wrap_process_url(self, recorded_url): + if not getattr(self.thread_local, 'name_set', False): + threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid() + self.thread_local.name_set = True + if self.options.profile: + import cProfile + if not hasattr(self.thread_local, 'profiler'): + self.thread_local.profiler = cProfile.Profile() + tid = threading.current_thread().ident + self.thread_profilers[tid] = self.thread_local.profiler + self.thread_local.profiler.enable() + self._process_url(recorded_url) + self.thread_local.profiler.disable() + else: + self._process_url(recorded_url) + def _process_url(self, recorded_url): try: records = []