From fd8119051705c2ea8f15e7e163c970db4e972879 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 5 Feb 2018 17:22:09 -0800 Subject: [PATCH] refactor the multithreaded warc writing main functional change is that only as man warc files are created as are needed to keep up with the throughput --- tests/test_warcprox.py | 45 +++-- tests/test_writer.py | 16 +- warcprox/__init__.py | 8 + warcprox/controller.py | 18 +- warcprox/writer.py | 365 +++++++++++++++------------------------ warcprox/writerthread.py | 72 +++----- 6 files changed, 218 insertions(+), 306 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0e0b298..da36df3 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -767,10 +767,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) # close the warc - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] + warc = writer._available_warcs.queue[0] + warc_path = os.path.join(warc.directory, warc.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -1389,20 +1391,16 @@ def test_controller_with_defaults(): assert controller.proxy.server_port == 8000 assert controller.proxy.running_stats assert not controller.proxy.stats_db - wwt = controller.warc_writer_thread - assert wwt - assert wwt.inq - assert wwt.outq - assert wwt.writer_pool - assert wwt.writer_pool.default_warc_writer - assert wwt.writer_pool.default_warc_writer.directory == './warcs' - assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None - assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 - assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' - assert wwt.writer_pool.default_warc_writer.gzip is False - assert wwt.writer_pool.default_warc_writer.record_builder - assert not wwt.writer_pool.default_warc_writer.record_builder.base32 - assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + wwp = controller.warc_writer_processor + assert wwp + assert wwp.inq + assert wwp.outq + assert wwp.writer_pool + assert wwp.writer_pool.default_warc_writer + assert wwp.writer_pool.default_warc_writer.gzip is False + assert wwp.writer_pool.default_warc_writer.record_builder + assert not wwp.writer_pool.default_warc_writer.record_builder.base32 + assert wwp.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_load_plugin(): options = warcprox.Options(port=0, plugins=[ @@ -1482,7 +1480,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath + warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: @@ -1700,10 +1698,11 @@ def test_long_warcprox_meta( wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that warcprox-meta was parsed and honored ("warc-prefix" param) - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] - warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] + warc = writer._available_warcs.queue[0] + warc_path = os.path.join(warc.directory, warc.finalname) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() assert os.path.exists(warc_path) # read the warc diff --git a/tests/test_writer.py b/tests/test_writer.py index 0a18c33..fb39378 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -61,7 +61,8 @@ def test_warc_writer_locking(tmpdir): timestamp=datetime.utcnow()) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) - wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) + wwriter = WarcWriter(Options( + directory=dirname, no_warc_open_suffix=True, writer_threads=1)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] assert warcs @@ -93,7 +94,8 @@ def test_special_dont_write_prefix(): logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - wwt = warcprox.writerthread.WarcWriterThread(Options(prefix='-')) + wwt = warcprox.writerthread.WarcWriterProcessor( + Options(prefix='-', writer_threads=1)) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -126,7 +128,8 @@ def test_special_dont_write_prefix(): wwt.stop.set() wwt.join() - wwt = warcprox.writerthread.WarcWriterThread() + wwt = warcprox.writerthread.WarcWriterProcessor( + Options(writer_threads=1)) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -172,8 +175,11 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) + warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}', + writer_threads=1)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs - assert re.search('\d{17}_foo_\d{14}_00000.warc.open', wwriter._fpath) + assert re.search( + r'\d{17}_foo_\d{14}_00000.warc.open', + wwriter._available_warcs.queue[0].path) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 39b31ea..33af61a 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -237,6 +237,14 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.logger.error( '%s raised exception', listener.stop, exc_info=True) +def timestamp17(): + now = datetime.datetime.utcnow() + return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) + +def timestamp14(): + now = datetime.datetime.utcnow() + return '{:%Y%m%d%H%M%S}'.format(now) + # monkey-patch log levels TRACE and NOTICE TRACE = 5 def _logger_trace(self, msg, *args, **kwargs): diff --git a/warcprox/controller.py b/warcprox/controller.py index b33b42e..30446c3 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -57,7 +57,6 @@ class Factory: @staticmethod def stats_processor(options): - # return warcprox.stats.StatsProcessor(options) if options.rethinkdb_stats_url: stats_processor = warcprox.stats.RethinkStatsProcessor(options) elif options.stats_db_file in (None, '', '/dev/null'): @@ -68,11 +67,8 @@ class Factory: return stats_processor @staticmethod - def warc_writer(options): - if options.writer_threads: - return warcprox.writerthread.WarcWriterMultiThread(options) - else: - return warcprox.writerthread.WarcWriterThread(options) + def warc_writer_processor(options): + return warcprox.writerthread.WarcWriterProcessor(options) @staticmethod def playback_proxy(ca, options): @@ -145,6 +141,12 @@ class WarcproxController(object): self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) + # default number of warc writer threads = sqrt(proxy.max_threads) + # pulled out of thin air because it strikes me as reasonable + # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + if not self.options.writer_threads: + self.options.writer_threads = int(self.proxy.max_threads ** 0.5) + self.build_postfetch_chain(self.proxy.recorded_url_q) self.service_registry = Factory.service_registry(options) @@ -184,8 +186,8 @@ class WarcproxController(object): if self.dedup_db: self._postfetch_chain.append(self.dedup_db.loader()) - self.warc_writer_thread = Factory.warc_writer(self.options) - self._postfetch_chain.append(self.warc_writer_thread) + self.warc_writer_processor = Factory.warc_writer_processor(self.options) + self._postfetch_chain.append(self.warc_writer_processor) if self.dedup_db: self._postfetch_chain.append(self.dedup_db.storer()) diff --git a/warcprox/writer.py b/warcprox/writer.py index 7779c6f..dbabafb 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -22,296 +22,220 @@ USA. from __future__ import absolute_import import logging -from datetime import datetime from hanzo import warctools import fcntl import time import warcprox import os import socket -import string import random import threading try: import queue except ImportError: import Queue as queue +import contextlib +class _OneWritableWarc: + logger = logging.getLogger('warcprox.writer._OneWritableWarc') -class WarcWriter: - logger = logging.getLogger('warcprox.writer.WarcWriter') - - def __init__(self, options=warcprox.Options()): - + ''' + Utility class used by WarcWriter + ''' + def __init__(self, options=warcprox.Options(), randomtoken='0'): + self.f = None + self.path = None + self.finalname = None + self.gzip = options.gzip or False + self.prefix = options.prefix or 'warcprox' + self.open_suffix = '' if options.no_warc_open_suffix else '.open' + self.randomtoken = randomtoken self.rollover_size = options.rollover_size or 1000000000 self.rollover_idle_time = options.rollover_idle_time or None - self._last_activity = time.time() - - self.gzip = options.gzip or False - self.warc_filename = options.warc_filename or \ - '{prefix}-{timestamp17}-{randomtoken}-{serialno}' - digest_algorithm = options.digest_algorithm or 'sha1' - base32 = options.base32 - self.record_builder = warcprox.warc.WarcRecordBuilder( - digest_algorithm=digest_algorithm, base32=base32) - - # warc path and filename stuff self.directory = options.directory or './warcs' - self.prefix = options.prefix or 'warcprox' - - self._f = None - self._fpath = None - self._f_finalname = None - self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' - self._serial = 0 - self._lock = threading.RLock() - - self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8)) - - if not os.path.exists(self.directory): - self.logger.info("warc destination directory {} doesn't exist, creating it".format(self.directory)) - os.mkdir(self.directory) - - def timestamp17(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) - - def timestamp14(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}'.format(now) - - def close_writer(self): - with self._lock: - if self._fpath: - self.logger.info('closing %s', self._f_finalname) - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f, fcntl.LOCK_UN) - except IOError as exc: - self.logger.error('could not unlock file %s (%s)', - self._fpath, exc) - self._f.close() - finalpath = os.path.sep.join( - [self.directory, self._f_finalname]) - os.rename(self._fpath, finalpath) - - self._fpath = None - self._f = None - - def serial(self): - return '{:05d}'.format(self._serial) + self.filename_template = options.warc_filename or \ + '{prefix}-{timestamp17}-{randomtoken}-{serialno}' + self.last_activity = time.time() # h3 default - def _warc_filename(self): + def next_filename(self, serial): """WARC filename is configurable with CLI parameter --warc-filename. - Default: '{prefix}-{timestamp17}-{serialno}-{randomtoken}' + Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}' Available variables are: prefix, timestamp14, timestamp17, serialno, randomtoken, hostname, shorthostname. Extension ``.warc`` or ``.warc.gz`` is appended automatically. """ hostname = socket.getfqdn() shorthostname = hostname.split('.')[0] - fname = self.warc_filename.format(prefix=self.prefix, - timestamp14=self.timestamp14(), - timestamp17=self.timestamp17(), - serialno=self.serial(), - randomtoken=self._randomtoken, - hostname=hostname, - shorthostname=shorthostname) + fname = self.filename_template.format( + prefix=self.prefix, timestamp14=warcprox.timestamp14(), + timestamp17=warcprox.timestamp17(), + serialno='{:05d}'.format(serial), + randomtoken=self.randomtoken, hostname=hostname, + shorthostname=shorthostname) if self.gzip: fname = fname + '.warc.gz' else: fname = fname + '.warc' return fname - def _writer(self): - with self._lock: - if self._fpath and os.path.getsize( - self._fpath) > self.rollover_size: - self.close_writer() + def open(self, serial): + if not os.path.exists(self.directory): + self.logger.info( + "warc destination directory %s doesn't exist, creating it", + self.directory) + os.mkdir(self.directory) - if self._f == None: - self._f_finalname = self._warc_filename() - self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + self._f_open_suffix]) + self.finalname = self.next_filename(serial) + self.path = os.path.sep.join( + [self.directory, self.finalname + self.open_suffix]) - self._f = open(self._fpath, 'wb') - # if no '.open' suffix is used for WARC, acquire an exclusive - # file lock. - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError as exc: - self.logger.error('could not lock file %s (%s)', - self._fpath, exc) + self.f = open(self.path, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self.open_suffix == '': + try: + fcntl.lockf(self.f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error( + 'could not lock file %s (%s)', self.path, exc) + return self.f - warcinfo_record = self.record_builder.build_warcinfo_record( - self._f_finalname) - self.logger.debug( - 'warcinfo_record.headers=%s', warcinfo_record.headers) - warcinfo_record.write_to(self._f, gzip=self.gzip) + def close(self): + if self.path: + self.logger.trace('closing %s', self.finalname) + if self.open_suffix == '': + try: + fcntl.lockf(self.f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error( + 'could not unlock file %s (%s)', self.path, exc) + self.f.close() + finalpath = os.path.sep.join( + [self.directory, self.finalname]) + os.rename(self.path, finalpath) - self._serial += 1 - - return self._f - - def write_records(self, recorded_url): - """Returns tuple of records written, which are instances of - hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and - "offset" attributes.""" - records = self.record_builder.build_warc_records(recorded_url) - - with self._lock: - writer = self._writer() - - for record in records: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) - record.offset = offset - record.length = writer.tell() - offset - record.warc_filename = self._f_finalname - self.logger.debug( - 'wrote warc record: warc_type=%s content_length=%s ' - 'url=%s warc=%s offset=%d', - record.get_header(warctools.WarcRecord.TYPE), - record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - self._fpath, record.offset) - - self._f.flush() - self._last_activity = time.time() - - return records + self.path = None + self.f = None def maybe_idle_rollover(self): - with self._lock: - if (self._fpath is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.info( - 'rolling over %s after %s seconds idle', - self._f_finalname, time.time() - self._last_activity) - self.close_writer() + if (self.path and self.rollover_idle_time + and self.rollover_idle_time > 0 + and time.time() - self.last_activity > self.rollover_idle_time): + self.logger.info( + 'rolling over %s after %0.1f seconds idle', + self.finalname, time.time() - self.last_activity) + self.close() -class MultiWarcWriter(WarcWriter): - logger = logging.getLogger("warcprox.writer.MultiWarcWriter") + def maybe_size_rollover(self): + if self.path and os.path.getsize(self.path) > self.rollover_size: + self.logger.info( + 'rolling over %s because it has reached %s bytes in size', + self.finalname, os.path.getsize(self.path)) + self.close() + +class WarcWriter: + logger = logging.getLogger('warcprox.writer.WarcWriter') def __init__(self, options=warcprox.Options()): - super().__init__(options) - self._thread_num = options.writer_threads - self._f = [None] * self._thread_num - self._fpath = [None] * self._thread_num - self._f_finalname = [None] * self._thread_num - self._lock = [threading.RLock()] * self._thread_num - self._available_threads = queue.Queue() - for i in range(self._thread_num): - self._available_threads.put(i) + self.options = options - def _writer(self, curr): - with self._lock[curr]: - if self._fpath[curr] and os.path.getsize( - self._fpath[curr]) > self.rollover_size: - self.close_writer(curr) + self.gzip = options.gzip or False + self.record_builder = warcprox.warc.WarcRecordBuilder( + digest_algorithm=options.digest_algorithm or 'sha1', + base32=options.base32) - if self._f[curr] == None: - self._f_finalname[curr] = self._warc_filename() - self._fpath[curr] = os.path.sep.join([ - self.directory, self._f_finalname[curr] + self._f_open_suffix]) + self._available_warcs = queue.Queue() + self._warc_count = 0 + self._warc_count_lock = threading.Lock() - self._f[curr] = open(self._fpath[curr], 'wb') - # if no '.open' suffix is used for WARC, acquire an exclusive - # file lock. - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f[curr], fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError as exc: - self.logger.error('could not lock file %s (%s)', - self._fpath, exc) + self._serial = 0 + self._serial_lock = threading.Lock() - warcinfo_record = self.record_builder.build_warcinfo_record( - self._f_finalname[curr]) - self.logger.debug( - 'warcinfo_record.headers=%s', warcinfo_record.headers) - warcinfo_record.write_to(self._f[curr], gzip=self.gzip) + self._randomtoken = ''.join( + random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8)) + def _bespeak_warc(self): + try: + return self._available_warcs.get(block=False) + except queue.Empty: + with self._warc_count_lock: + if self._warc_count < self.options.writer_threads: + self._warc_count += 1 + return _OneWritableWarc(self.options, self._randomtoken) + # else we're maxed out, wait for one to free up + return self._available_warcs.get(block=True) + + @contextlib.contextmanager + def _warc(self): + warc = self._bespeak_warc() + + warc.maybe_size_rollover() + + # lazy file open + if warc.f == None: + with self._serial_lock: + serial = self._serial self._serial += 1 + warc.open(serial) + warcinfo = self.record_builder.build_warcinfo_record(warc.finalname) + self.logger.debug('warcinfo.headers=%s', warcinfo.headers) + warcinfo.write_to(warc.f, gzip=self.gzip) - return self._f[curr] + yield warc + + # __exit__() + warc.f.flush() + warc.last_activity = time.time() + self._available_warcs.put(warc) def write_records(self, recorded_url): """Returns tuple of records written, which are instances of hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and "offset" attributes.""" records = self.record_builder.build_warc_records(recorded_url) - curr = self._available_threads.get() - # we could also remove that lock?? The queue guaranties that no two - # threads have the same curr open. - with self._lock[curr]: - writer = self._writer(curr) + with self._warc() as warc: for record in records: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) + offset = warc.f.tell() + record.write_to(warc.f, gzip=self.gzip) record.offset = offset - record.length = writer.tell() - offset - record.warc_filename = self._f_finalname[curr] + record.length = warc.f.tell() - offset + record.warc_filename = warc.finalname self.logger.debug( 'wrote warc record: warc_type=%s content_length=%s ' 'url=%s warc=%s offset=%d', record.get_header(warctools.WarcRecord.TYPE), record.get_header(warctools.WarcRecord.CONTENT_LENGTH), record.get_header(warctools.WarcRecord.URL), - self._fpath[curr], record.offset) + warc.path, record.offset) - self._f[curr].flush() - self._last_activity = time.time() - self._available_threads.put(curr) return records def maybe_idle_rollover(self): - for curr in range(0, self._thread_num): - with self._lock[curr]: - if (self._fpath[curr] is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.info( - 'rolling over %s after %s seconds idle', - self._f_finalname[curr], time.time() - self._last_activity) - self.close_writer(curr) + warcs = [] + while True: + try: + warc = self._available_warcs.get(block=False) + warcs.append(warc) + except queue.Empty: + break + for warc in warcs: + warc.maybe_idle_rollover() + self._available_warcs.put(warc) - def close_writer(self, curr=None): - """When this method is invoked without any argument (program termination) - close all writer. - """ - if not curr: - for curr in range(0, self._thread_num): - self.close_writer(curr) - return - - with self._lock[curr]: - if self._fpath[curr]: - self.logger.info('closing %s', self._f_finalname[curr]) - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f[curr], fcntl.LOCK_UN) - except IOError as exc: - self.logger.error('could not unlock file %s (%s)', - self._fpath[curr], exc) - self._f[curr].close() - finalpath = os.path.sep.join( - [self.directory, self._f_finalname[curr]]) - os.rename(self._fpath[curr], finalpath) + def close_writer(self): + while self._warc_count > 0: + with self._warc_count_lock: + warc = self._available_warcs.get() + warc.close() + self._warc_count -= 1 class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): - if options.writer_threads: - self.default_warc_writer = MultiWarcWriter(options=options) - else: - self.default_warc_writer = WarcWriter(options=options) + self.default_warc_writer = WarcWriter(options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options self._lock = threading.RLock() @@ -326,12 +250,7 @@ class WarcWriterPool: options.prefix = recorded_url.warcprox_meta["warc-prefix"] with self._lock: if not options.prefix in self.warc_writers: - if self.options.writer_threads: - self.warc_writers[options.prefix] = MultiWarcWriter( - options=options) - else: - self.warc_writers[options.prefix] = WarcWriter( - options=options) + self.warc_writers[options.prefix] = WarcWriter(options) w = self.warc_writers[options.prefix] return w diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index c78e8c0..1010161 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -32,32 +32,42 @@ import time import warcprox from concurrent import futures -class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): - logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") +class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): + logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} def __init__(self, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.options = options self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) + self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1) + self.batch = set() def _get_process_put(self): - try: - warcprox.BaseStandardPostfetchProcessor._get_process_put(self) - finally: - self.writer_pool.maybe_idle_rollover() + recorded_url = self.inq.get(block=True, timeout=0.5) + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) def _process_url(self, recorded_url): - records = [] - if self._should_archive(recorded_url): - records = self.writer_pool.write_records(recorded_url) - recorded_url.warc_records = records - self._log(recorded_url, records) - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() + try: + records = [] + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() + except: + logging.error( + 'caught exception processing %s', recorded_url.url, + exc_info=True) + finally: + self.batch.remove(recorded_url) + if self.outq: + self.outq.put(recorded_url) + self.writer_pool.maybe_idle_rollover() def _filter_accepts(self, recorded_url): if not self.method_filter: @@ -94,35 +104,3 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): def _shutdown(self): self.writer_pool.close_writers() - -class WarcWriterMultiThread(WarcWriterThread): - logger = logging.getLogger("warcprox.writerthread.WarcWriterMultiThread") - - def __init__(self, options=warcprox.Options()): - warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads) - self.batch = set() - self.options = options - self.writer_pool = warcprox.writer.WarcWriterPool(options) - self.method_filter = set(method.upper() for method in self.options.method_filter or []) - - def _get_process_put(self): - recorded_url = self.inq.get(block=True, timeout=0.5) - self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) - - def _process_url(self, recorded_url): - try: - records = [] - if self._should_archive(recorded_url): - records = self.writer_pool.write_records(recorded_url) - recorded_url.warc_records = records - self._log(recorded_url, records) - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() - finally: - self.batch.remove(recorded_url) - if self.outq: - self.outq.put(recorded_url) - self.writer_pool.maybe_idle_rollover()