Set number of threads using --writer-threads cli option

When the option is not set, use existing single threader writer
architecture.
If available, load ``WarcWriterMultiThread`` with pool size equal to
``--writer-threads``.
This commit is contained in:
Vangelis Banos 2018-02-05 13:05:43 +00:00 committed by Noah Levitt
parent e6f6993baf
commit d2bdc9e213
3 changed files with 42 additions and 15 deletions

View File

@ -69,7 +69,10 @@ class Factory:
@staticmethod
def warc_writer(options):
return warcprox.writerthread.WarcWriterThread(options)
if options.writer_threads:
return warcprox.writerthread.WarcWriterMultiThread(options)
else:
return warcprox.writerthread.WarcWriterThread(options)
@staticmethod
def playback_proxy(ca, options):

View File

@ -32,6 +32,11 @@ import socket
import string
import random
import threading
try:
import queue
except ImportError:
import Queue as queue
class WarcWriter:
logger = logging.getLogger('warcprox.writer.WarcWriter')
@ -194,16 +199,20 @@ class MultiWarcWriter(WarcWriter):
def __init__(self, options=warcprox.Options()):
super().__init__(options)
self._f = [None] * 3
self._fpath = [None] * 3
self._f_finalname = [None] * 3
self._lock = [threading.RLock()] * 3
self._thread_num = options.writer_threads
self._f = [None] * self._thread_num
self._fpath = [None] * self._thread_num
self._f_finalname = [None] * self._thread_num
self._lock = [threading.RLock()] * self._thread_num
self._available_threads = queue.Queue()
for i in range(self._thread_num):
self._available_threads.put(i)
def _writer(self, curr):
with self._lock[curr]:
if self._fpath[curr] and os.path.getsize(
self._fpath[curr]) > self.rollover_size:
self.close_writer()
self.close_writer(curr)
if self._f[curr] == None:
self._f_finalname[curr] = self._warc_filename()
@ -235,8 +244,9 @@ class MultiWarcWriter(WarcWriter):
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
"offset" attributes."""
records = self.record_builder.build_warc_records(recorded_url)
curr = random.choice([0, 1, 2])
curr = self._available_threads.get()
# we could also remove that lock?? The queue guaranties that no two
# threads have the same curr open.
with self._lock[curr]:
writer = self._writer(curr)
@ -256,11 +266,11 @@ class MultiWarcWriter(WarcWriter):
self._f[curr].flush()
self._last_activity = time.time()
self._available_threads.put(curr)
return records
def maybe_idle_rollover(self):
for curr in range(0, 3):
for curr in range(0, self._thread_num):
with self._lock[curr]:
if (self._fpath[curr] is not None
and self.rollover_idle_time is not None
@ -271,7 +281,15 @@ class MultiWarcWriter(WarcWriter):
self._f_finalname[curr], time.time() - self._last_activity)
self.close_writer(curr)
def close_writer(self, curr):
def close_writer(self, curr=None):
"""When this method is invoked without any argument (program termination)
close all writer.
"""
if not curr:
for curr in range(0, self._thread_num):
self.close_writer(curr)
return
with self._lock[curr]:
if self._fpath[curr]:
self.logger.info('closing %s', self._f_finalname[curr])
@ -290,8 +308,10 @@ class WarcWriterPool:
logger = logging.getLogger("warcprox.writer.WarcWriterPool")
def __init__(self, options=warcprox.Options()):
# self.default_warc_writer = WarcWriter(options=options)
self.default_warc_writer = MultiWarcWriter(options=options)
if options.writer_threads:
self.default_warc_writer = MultiWarcWriter(options=options)
else:
self.default_warc_writer = WarcWriter(options=options)
self.warc_writers = {} # {prefix:WarcWriter}
self.options = options
self._lock = threading.RLock()
@ -306,7 +326,11 @@ class WarcWriterPool:
options.prefix = recorded_url.warcprox_meta["warc-prefix"]
with self._lock:
if not options.prefix in self.warc_writers:
self.warc_writers[options.prefix] = WarcWriter(
if self.options.writer_threads:
self.warc_writers[options.prefix] = MultiWarcWriter(
options=options)
else:
self.warc_writers[options.prefix] = WarcWriter(
options=options)
w = self.warc_writers[options.prefix]
return w

View File

@ -100,7 +100,7 @@ class WarcWriterMultiThread(WarcWriterThread):
def __init__(self, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.pool = futures.ThreadPoolExecutor(max_workers=10)
self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads)
self.batch = set()
self.options = options
self.writer_pool = warcprox.writer.WarcWriterPool(options)