Implement WarcWriterMultiThread

This commit is contained in:
Vangelis Banos 2018-02-05 10:38:48 +00:00 committed by Noah Levitt
parent e68be9354d
commit d6fdc07f38

View File

@ -30,6 +30,7 @@ except ImportError:
import logging
import time
import warcprox
from concurrent import futures
class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterThread")
@ -93,3 +94,35 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
def _shutdown(self):
self.writer_pool.close_writers()
class WarcWriterMultiThread(WarcWriterThread):
logger = logging.getLogger("warcprox.writerthread.WarcWriterMultiThread")
def __init__(self, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.pool = futures.ThreadPoolExecutor(max_workers=10)
self.batch = set()
self.options = options
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url)
self.pool.submit(self._process_url, recorded_url)
def _process_url(self, recorded_url):
try:
records = []
if self._should_archive(recorded_url):
records = self.writer_pool.write_records(recorded_url)
recorded_url.warc_records = records
self._log(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
finally:
self.batch.remove(recorded_url)
if self.outq:
self.outq.put(recorded_url)
self.writer_pool.maybe_idle_rollover()