remove --writer-threads option

Support for multiple writer threads was broken, and profiling had shown
it to be of dubious utility.
https://github.com/internetarchive/warcprox/issues/101
https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads
This commit is contained in:
Noah Levitt 2019-01-07 15:54:35 -08:00
parent 1ea8a06a69
commit 0882a2b174
6 changed files with 100 additions and 200 deletions

View File

@ -3,7 +3,7 @@
''' '''
tests/test_warcprox.py - automated tests for warcprox tests/test_warcprox.py - automated tests for warcprox
Copyright (C) 2013-2018 Internet Archive Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -844,10 +844,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# close the warc # close the warc
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"]
warc = writer._available_warcs.queue[0] warc_path = os.path.join(writer.directory, writer.finalname)
warc_path = os.path.join(warc.directory, warc.finalname)
assert not os.path.exists(warc_path) assert not os.path.exists(warc_path)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer() warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close()
assert os.path.exists(warc_path) assert os.path.exists(warc_path)
# read the warc # read the warc
@ -1701,7 +1700,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
assert response.status_code == 200 assert response.status_code == 200
assert not 'via' in playback_response assert not 'via' in playback_response
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
with open(warc, 'rb') as f: with open(warc, 'rb') as f:
for record in warcio.archiveiterator.ArchiveIterator(f): for record in warcio.archiveiterator.ArchiveIterator(f):
if record.rec_headers.get_header('warc-target-uri') == url: if record.rec_headers.get_header('warc-target-uri') == url:
@ -1933,9 +1932,8 @@ def test_long_warcprox_meta(
# check that warcprox-meta was parsed and honored ("warc-prefix" param) # check that warcprox-meta was parsed and honored ("warc-prefix" param)
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"]
warc = writer._available_warcs.queue[0] warc_path = os.path.join(writer.directory, writer.finalname)
warc_path = os.path.join(warc.directory, warc.finalname) warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close()
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
assert os.path.exists(warc_path) assert os.path.exists(warc_path)
# read the warc # read the warc
@ -2118,7 +2116,7 @@ def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies):
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check that response records were written # check that response records were written
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
with open(warc, 'rb') as f: with open(warc, 'rb') as f:
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f)) rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
record = next(rec_iter) record = next(rec_iter)
@ -2198,7 +2196,7 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies):
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check that response records were written # check that response records were written
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
with open(warc, 'rb') as f: with open(warc, 'rb') as f:
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f)) rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
record = next(rec_iter) record = next(rec_iter)

View File

@ -1,7 +1,7 @@
''' '''
tests/test_writer.py - warcprox warc writing tests tests/test_writer.py - warcprox warc writing tests
Copyright (C) 2017 Internet Archive Copyright (C) 2017-2019 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -49,7 +49,6 @@ def lock_file(q, filename):
except IOError: except IOError:
q.put('FAILED TO OBTAIN LOCK') q.put('FAILED TO OBTAIN LOCK')
def test_warc_writer_locking(tmpdir): def test_warc_writer_locking(tmpdir):
"""Test if WarcWriter is locking WARC files. """Test if WarcWriter is locking WARC files.
When we don't have the .open suffix, WarcWriter locks the file and the When we don't have the .open suffix, WarcWriter locks the file and the
@ -64,7 +63,7 @@ def test_warc_writer_locking(tmpdir):
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options( wwriter = WarcWriter(Options(
directory=dirname, no_warc_open_suffix=True, writer_threads=1)) directory=dirname, no_warc_open_suffix=True))
wwriter.write_records(recorded_url) wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')]
assert warcs assert warcs
@ -75,7 +74,7 @@ def test_warc_writer_locking(tmpdir):
p.start() p.start()
p.join() p.join()
assert q.get() == 'FAILED TO OBTAIN LOCK' assert q.get() == 'FAILED TO OBTAIN LOCK'
wwriter.close_writer() wwriter.close()
# locking must succeed after writer has closed the WARC file. # locking must succeed after writer has closed the WARC file.
p = Process(target=lock_file, args=(q, target_warc)) p = Process(target=lock_file, args=(q, target_warc))
@ -96,8 +95,7 @@ def test_special_dont_write_prefix():
logging.debug('cd %s', tmpdir) logging.debug('cd %s', tmpdir)
os.chdir(tmpdir) os.chdir(tmpdir)
wwt = warcprox.writerthread.WarcWriterProcessor( wwt = warcprox.writerthread.WarcWriterProcessor(Options(prefix='-'))
Options(prefix='-', writer_threads=1))
wwt.inq = queue.Queue(maxsize=1) wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1) wwt.outq = queue.Queue(maxsize=1)
try: try:
@ -131,7 +129,7 @@ def test_special_dont_write_prefix():
wwt.join() wwt.join()
wwt = warcprox.writerthread.WarcWriterProcessor( wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1, blackout_period=60, prefix='foo')) Options(blackout_period=60, prefix='foo'))
wwt.inq = queue.Queue(maxsize=1) wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1) wwt.outq = queue.Queue(maxsize=1)
try: try:
@ -199,14 +197,12 @@ def test_special_dont_write_prefix():
wwt.stop.set() wwt.stop.set()
wwt.join() wwt.join()
def test_do_not_archive(): def test_do_not_archive():
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
logging.debug('cd %s', tmpdir) logging.debug('cd %s', tmpdir)
os.chdir(tmpdir) os.chdir(tmpdir)
wwt = warcprox.writerthread.WarcWriterProcessor( wwt = warcprox.writerthread.WarcWriterProcessor()
Options(writer_threads=1))
wwt.inq = queue.Queue(maxsize=1) wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1) wwt.outq = queue.Queue(maxsize=1)
try: try:
@ -240,7 +236,6 @@ def test_do_not_archive():
wwt.stop.set() wwt.stop.set()
wwt.join() wwt.join()
def test_warc_writer_filename(tmpdir): def test_warc_writer_filename(tmpdir):
"""Test if WarcWriter is writing WARC files with custom filenames. """Test if WarcWriter is writing WARC files with custom filenames.
""" """
@ -253,11 +248,9 @@ def test_warc_writer_filename(tmpdir):
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, prefix='foo', wwriter = WarcWriter(Options(directory=dirname, prefix='foo',
warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}', warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}'))
writer_threads=1))
wwriter.write_records(recorded_url) wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname)] warcs = [fn for fn in os.listdir(dirname)]
assert warcs assert warcs
assert re.search( assert re.search(
r'\d{17}_foo_\d{14}_00000.warc.open', r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path)
wwriter._available_warcs.queue[0].path)

View File

@ -143,10 +143,6 @@ class WarcproxController(object):
self.playback_proxy = Factory.playback_proxy( self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options) self.proxy.ca, self.options)
# https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads
if not self.options.writer_threads:
self.options.writer_threads = 1
self.build_postfetch_chain(self.proxy.recorded_url_q) self.build_postfetch_chain(self.proxy.recorded_url_q)
self.service_registry = Factory.service_registry(options) self.service_registry = Factory.service_registry(options)

View File

@ -196,11 +196,6 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
help=suppress( help=suppress(
'turn on performance profiling; summary statistics are dumped ' 'turn on performance profiling; summary statistics are dumped '
'every 10 minutes and at shutdown')) 'every 10 minutes and at shutdown'))
hidden.add_argument(
'--writer-threads', dest='writer_threads', type=int, default=1,
help=suppress(
'number of warc writer threads; caution, see '
'https://github.com/internetarchive/warcprox/issues/101'))
arg_parser.add_argument( arg_parser.add_argument(
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=( default=None, help=(

View File

@ -1,7 +1,7 @@
''' '''
warcprox/writer.py - warc writer, manages and writes records to warc files warcprox/writer.py - warc writer, manages and writes records to warc files
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -29,37 +29,41 @@ import warcprox
import os import os
import socket import socket
import random import random
import threading
try:
import queue
except ImportError:
import Queue as queue
import contextlib
class _OneWritableWarc: class WarcWriter:
''' '''
Utility class used by WarcWriter A writer for one warc prefix, which rolls over to new warc file,
incrementing serial number, when size limit is hit. Should only be used
from one thread.
''' '''
logger = logging.getLogger('warcprox.writer.WarcWriter')
logger = logging.getLogger('warcprox.writer._OneWritableWarc') def __init__(self, options=warcprox.Options()):
self.options = options
self.gzip = options.gzip or False
self.record_builder = warcprox.warc.WarcRecordBuilder(
digest_algorithm=options.digest_algorithm or 'sha1',
base32=options.base32)
def __init__(self, options=warcprox.Options(), randomtoken='0'):
self.f = None self.f = None
self.path = None self.path = None
self.finalname = None self.finalname = None
self.gzip = options.gzip or False self.gzip = options.gzip or False
self.prefix = options.prefix or 'warcprox' self.prefix = options.prefix or 'warcprox'
self.open_suffix = '' if options.no_warc_open_suffix else '.open' self.open_suffix = '' if options.no_warc_open_suffix else '.open'
self.randomtoken = randomtoken
self.rollover_size = options.rollover_size or 1000000000 self.rollover_size = options.rollover_size or 1000000000
self.rollover_idle_time = options.rollover_idle_time or None self.rollover_idle_time = options.rollover_idle_time or None
self.directory = options.directory or './warcs' self.directory = options.directory or './warcs'
self.filename_template = options.warc_filename or \ self.filename_template = options.warc_filename or \
'{prefix}-{timestamp17}-{randomtoken}-{serialno}' '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
self.last_activity = time.time() self.last_activity = time.time()
self.serial = 0
self.randomtoken = ''.join(
random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
# h3 default <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> --> # h3 default <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def next_filename(self, serial): def filename(self, serial):
"""WARC filename is configurable with CLI parameter --warc-filename. """WARC filename is configurable with CLI parameter --warc-filename.
Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}' Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
Available variables are: prefix, timestamp14, timestamp17, serialno, Available variables are: prefix, timestamp14, timestamp17, serialno,
@ -81,13 +85,17 @@ class _OneWritableWarc:
return fname return fname
def open(self, serial): def open(self, serial):
'''
Opens a new warc file with filename prefix `self.prefix` and serial
number `self.serial` and assigns file handle to `self.f`.
'''
if not os.path.exists(self.directory): if not os.path.exists(self.directory):
self.logger.info( self.logger.info(
"warc destination directory %s doesn't exist, creating it", "warc destination directory %s doesn't exist, creating it",
self.directory) self.directory)
os.mkdir(self.directory) os.mkdir(self.directory)
self.finalname = self.next_filename(serial) self.finalname = self.filename(serial)
self.logger.trace('opening %s', self.finalname) self.logger.trace('opening %s', self.finalname)
self.path = os.path.sep.join( self.path = os.path.sep.join(
[self.directory, self.finalname + self.open_suffix]) [self.directory, self.finalname + self.open_suffix])
@ -103,7 +111,53 @@ class _OneWritableWarc:
'could not lock file %s (%s)', self.path, exc) 'could not lock file %s (%s)', self.path, exc)
return self.f return self.f
def ensure_open(self):
'''
Ensures `self.f` is ready to write the next warc record.
Closes current warc if size limit has been reached. Then, if warc is
not open, opens one, and writes the warcinfo record.
'''
self.maybe_size_rollover()
if not self.f:
serial = self.serial
self.serial += 1
self.open(serial)
warcinfo = self.record_builder.build_warcinfo_record(self.finalname)
self.logger.debug('warcinfo.headers=%s', warcinfo.headers)
warcinfo.write_to(self.f, gzip=self.gzip)
def write_records(self, recorded_url):
'''
Returns tuple of records written, which are instances of
`hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and
`offset` attributes.
'''
records = self.record_builder.build_warc_records(recorded_url)
self.ensure_open()
for record in records:
offset = self.f.tell()
record.write_to(self.f, gzip=self.gzip)
record.offset = offset
record.length = self.f.tell() - offset
record.warc_filename = self.finalname
self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s '
'digest=%s offset=%d warc=%s url=%s', record.type,
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(b'WARC-Payload-Digest'), record.offset,
self.path, record.get_header(warctools.WarcRecord.URL))
return records
def close(self): def close(self):
'''
Closes out the active warc.
The next call to `write_records()` will write to a a new warc file with
the serial number incremented.
'''
if self.path: if self.path:
self.logger.trace('closing %s', self.finalname) self.logger.trace('closing %s', self.finalname)
if self.open_suffix == '': if self.open_suffix == '':
@ -136,112 +190,16 @@ class _OneWritableWarc:
self.finalname, os.path.getsize(self.path)) self.finalname, os.path.getsize(self.path))
self.close() self.close()
class WarcWriter:
logger = logging.getLogger('warcprox.writer.WarcWriter')
def __init__(self, options=warcprox.Options()):
self.options = options
self.gzip = options.gzip or False
self.record_builder = warcprox.warc.WarcRecordBuilder(
digest_algorithm=options.digest_algorithm or 'sha1',
base32=options.base32)
self._available_warcs = queue.Queue()
self._warc_count = 0
self._warc_count_lock = threading.Lock()
self._serial = 0
self._serial_lock = threading.Lock()
self._randomtoken = ''.join(
random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
def _bespeak_warc(self):
try:
return self._available_warcs.get(block=False)
except queue.Empty:
with self._warc_count_lock:
if self._warc_count < self.options.writer_threads:
self._warc_count += 1
return _OneWritableWarc(self.options, self._randomtoken)
# else we're maxed out, wait for one to free up
return self._available_warcs.get(block=True)
@contextlib.contextmanager
def _warc(self):
warc = self._bespeak_warc()
warc.maybe_size_rollover()
# lazy file open
if warc.f == None:
with self._serial_lock:
serial = self._serial
self._serial += 1
warc.open(serial)
warcinfo = self.record_builder.build_warcinfo_record(warc.finalname)
self.logger.debug('warcinfo.headers=%s', warcinfo.headers)
warcinfo.write_to(warc.f, gzip=self.gzip)
yield warc
# __exit__()
warc.f.flush()
warc.last_activity = time.time()
self._available_warcs.put(warc)
def write_records(self, recorded_url):
"""Returns tuple of records written, which are instances of
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
"offset" attributes."""
records = self.record_builder.build_warc_records(recorded_url)
with self._warc() as warc:
for record in records:
offset = warc.f.tell()
record.write_to(warc.f, gzip=self.gzip)
record.offset = offset
record.length = warc.f.tell() - offset
record.warc_filename = warc.finalname
self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s '
'digest=%s offset=%d warc=%s url=%s',
record.type,
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(b'WARC-Payload-Digest'),
record.offset, warc.path,
record.get_header(warctools.WarcRecord.URL))
return records
def maybe_idle_rollover(self):
warcs = []
while True:
try:
warc = self._available_warcs.get(block=False)
warcs.append(warc)
except queue.Empty:
break
for warc in warcs:
warc.maybe_idle_rollover()
self._available_warcs.put(warc)
def close_writer(self):
while self._warc_count > 0:
with self._warc_count_lock:
warc = self._available_warcs.get()
warc.close()
self._warc_count -= 1
class WarcWriterPool: class WarcWriterPool:
'''
A `WarcWriter` per warc prefix. Should only be used from one thread.
'''
logger = logging.getLogger("warcprox.writer.WarcWriterPool") logger = logging.getLogger("warcprox.writer.WarcWriterPool")
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
self.default_warc_writer = WarcWriter(options) self.default_warc_writer = WarcWriter(options)
self.warc_writers = {} # {prefix:WarcWriter} self.warc_writers = {} # {prefix:WarcWriter}
self.options = options self.options = options
self._lock = threading.RLock()
self._last_maybe = time.time() self._last_maybe = time.time()
# chooses writer for filename specified by warcprox_meta["warc-prefix"] if set # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
@ -251,16 +209,17 @@ class WarcWriterPool:
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
options = warcprox.Options(**vars(self.options)) options = warcprox.Options(**vars(self.options))
options.prefix = recorded_url.warcprox_meta["warc-prefix"] options.prefix = recorded_url.warcprox_meta["warc-prefix"]
with self._lock: if not options.prefix in self.warc_writers:
if not options.prefix in self.warc_writers: self.warc_writers[options.prefix] = WarcWriter(options)
self.warc_writers[options.prefix] = WarcWriter(options) w = self.warc_writers[options.prefix]
w = self.warc_writers[options.prefix]
return w return w
def write_records(self, recorded_url): def write_records(self, recorded_url):
"""Returns tuple of records written, which are instances of '''
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and Returns tuple of records written, which are instances of
"offset" attributes.""" `hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and
`offset` attributes.
'''
return self._writer(recorded_url).write_records(recorded_url) return self._writer(recorded_url).write_records(recorded_url)
def maybe_idle_rollover(self): def maybe_idle_rollover(self):
@ -271,7 +230,8 @@ class WarcWriterPool:
self._last_maybe = time.time() self._last_maybe = time.time()
def close_writers(self): def close_writers(self):
self.default_warc_writer.close_writer() self.default_warc_writer.close()
for w in self.warc_writers.values(): for prefix, writer in list(self.warc_writers.items()):
w.close_writer() del self.warc_writers[prefix]
writer.close()

View File

@ -2,7 +2,7 @@
warcprox/writerthread.py - warc writer thread, reads from the recorded url warcprox/writerthread.py - warc writer thread, reads from the recorded url
queue, writes warc records, runs final tasks after warc records are written queue, writes warc records, runs final tasks after warc records are written
Copyright (C) 2013-2018 Internet Archive Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -43,46 +43,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.writer_pool = warcprox.writer.WarcWriterPool(options) self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or []) self.method_filter = set(method.upper() for method in self.options.method_filter or [])
# set max_queued small, because self.inq is already handling queueing
self.thread_local = threading.local()
self.thread_profilers = {}
# for us; but give it a little breathing room to make sure it can keep
# worker threads busy
self.pool = warcprox.ThreadPoolExecutor(
max_workers=options.writer_threads or 1,
max_queued=10 * (options.writer_threads or 1))
self.batch = set()
self.blackout_period = options.blackout_period or 0 self.blackout_period = options.blackout_period or 0
def _startup(self):
self.logger.info('%s warc writer threads', self.pool._max_workers)
warcprox.BaseStandardPostfetchProcessor._startup(self)
def _get_process_put(self):
try:
recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url)
self.pool.submit(self._wrap_process_url, recorded_url)
finally:
self.writer_pool.maybe_idle_rollover()
def _wrap_process_url(self, recorded_url):
if not getattr(self.thread_local, 'name_set', False):
threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid()
self.thread_local.name_set = True
if self.options.profile:
import cProfile
if not hasattr(self.thread_local, 'profiler'):
self.thread_local.profiler = cProfile.Profile()
tid = threading.current_thread().ident
self.thread_profilers[tid] = self.thread_local.profiler
self.thread_local.profiler.enable()
self._process_url(recorded_url)
self.thread_local.profiler.disable()
else:
self._process_url(recorded_url)
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
try: try:
records = [] records = []
@ -97,10 +59,6 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
logging.error( logging.error(
'caught exception processing %s', recorded_url.url, 'caught exception processing %s', recorded_url.url,
exc_info=True) exc_info=True)
finally:
self.batch.remove(recorded_url)
if self.outq:
self.outq.put(recorded_url)
def _filter_accepts(self, recorded_url): def _filter_accepts(self, recorded_url):
if not self.method_filter: if not self.method_filter: