From d6fdc07f384936c511df20ae3234a088d7fc8a29 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 5 Feb 2018 10:38:48 +0000 Subject: [PATCH] Implement WarcWriterMultiThread --- warcprox/writerthread.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 632ea2c..6347e05 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -30,6 +30,7 @@ except ImportError: import logging import time import warcprox +from concurrent import futures class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") @@ -93,3 +94,35 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): def _shutdown(self): self.writer_pool.close_writers() + +class WarcWriterMultiThread(WarcWriterThread): + logger = logging.getLogger("warcprox.writerthread.WarcWriterMultiThread") + + def __init__(self, options=warcprox.Options()): + warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + self.pool = futures.ThreadPoolExecutor(max_workers=10) + self.batch = set() + self.options = options + self.writer_pool = warcprox.writer.WarcWriterPool(options) + self.method_filter = set(method.upper() for method in self.options.method_filter or []) + + def _get_process_put(self): + recorded_url = self.inq.get(block=True, timeout=0.5) + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) + + def _process_url(self, recorded_url): + try: + records = [] + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() + finally: + self.batch.remove(recorded_url) + if self.outq: + self.outq.put(recorded_url) + self.writer_pool.maybe_idle_rollover()