diff --git a/warcprox/controller.py b/warcprox/controller.py index 85fa42d..b33b42e 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -69,7 +69,10 @@ class Factory: @staticmethod def warc_writer(options): - return warcprox.writerthread.WarcWriterThread(options) + if options.writer_threads: + return warcprox.writerthread.WarcWriterMultiThread(options) + else: + return warcprox.writerthread.WarcWriterThread(options) @staticmethod def playback_proxy(ca, options): diff --git a/warcprox/writer.py b/warcprox/writer.py index 7b3414f..7779c6f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -32,6 +32,11 @@ import socket import string import random import threading +try: + import queue +except ImportError: + import Queue as queue + class WarcWriter: logger = logging.getLogger('warcprox.writer.WarcWriter') @@ -194,16 +199,20 @@ class MultiWarcWriter(WarcWriter): def __init__(self, options=warcprox.Options()): super().__init__(options) - self._f = [None] * 3 - self._fpath = [None] * 3 - self._f_finalname = [None] * 3 - self._lock = [threading.RLock()] * 3 + self._thread_num = options.writer_threads + self._f = [None] * self._thread_num + self._fpath = [None] * self._thread_num + self._f_finalname = [None] * self._thread_num + self._lock = [threading.RLock()] * self._thread_num + self._available_threads = queue.Queue() + for i in range(self._thread_num): + self._available_threads.put(i) def _writer(self, curr): with self._lock[curr]: if self._fpath[curr] and os.path.getsize( self._fpath[curr]) > self.rollover_size: - self.close_writer() + self.close_writer(curr) if self._f[curr] == None: self._f_finalname[curr] = self._warc_filename() @@ -235,8 +244,9 @@ class MultiWarcWriter(WarcWriter): hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and "offset" attributes.""" records = self.record_builder.build_warc_records(recorded_url) - curr = random.choice([0, 1, 2]) - + curr = self._available_threads.get() + # we could also remove that lock?? The queue guaranties that no two + # threads have the same curr open. with self._lock[curr]: writer = self._writer(curr) @@ -256,11 +266,11 @@ class MultiWarcWriter(WarcWriter): self._f[curr].flush() self._last_activity = time.time() - + self._available_threads.put(curr) return records def maybe_idle_rollover(self): - for curr in range(0, 3): + for curr in range(0, self._thread_num): with self._lock[curr]: if (self._fpath[curr] is not None and self.rollover_idle_time is not None @@ -271,7 +281,15 @@ class MultiWarcWriter(WarcWriter): self._f_finalname[curr], time.time() - self._last_activity) self.close_writer(curr) - def close_writer(self, curr): + def close_writer(self, curr=None): + """When this method is invoked without any argument (program termination) + close all writer. + """ + if not curr: + for curr in range(0, self._thread_num): + self.close_writer(curr) + return + with self._lock[curr]: if self._fpath[curr]: self.logger.info('closing %s', self._f_finalname[curr]) @@ -290,8 +308,10 @@ class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): - # self.default_warc_writer = WarcWriter(options=options) - self.default_warc_writer = MultiWarcWriter(options=options) + if options.writer_threads: + self.default_warc_writer = MultiWarcWriter(options=options) + else: + self.default_warc_writer = WarcWriter(options=options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options self._lock = threading.RLock() @@ -306,7 +326,11 @@ class WarcWriterPool: options.prefix = recorded_url.warcprox_meta["warc-prefix"] with self._lock: if not options.prefix in self.warc_writers: - self.warc_writers[options.prefix] = WarcWriter( + if self.options.writer_threads: + self.warc_writers[options.prefix] = MultiWarcWriter( + options=options) + else: + self.warc_writers[options.prefix] = WarcWriter( options=options) w = self.warc_writers[options.prefix] return w diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 6347e05..c78e8c0 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -100,7 +100,7 @@ class WarcWriterMultiThread(WarcWriterThread): def __init__(self, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.pool = futures.ThreadPoolExecutor(max_workers=10) + self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads) self.batch = set() self.options = options self.writer_pool = warcprox.writer.WarcWriterPool(options)