From 0882a2b174a2540546a931eab8b83f3d5c0eb71d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 7 Jan 2019 15:54:35 -0800 Subject: [PATCH 1/3] remove --writer-threads option Support for multiple writer threads was broken, and profiling had shown it to be of dubious utility. https://github.com/internetarchive/warcprox/issues/101 https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads --- tests/test_warcprox.py | 18 ++-- tests/test_writer.py | 23 ++--- warcprox/controller.py | 4 - warcprox/main.py | 5 - warcprox/writer.py | 206 ++++++++++++++++----------------------- warcprox/writerthread.py | 44 +-------- 6 files changed, 100 insertions(+), 200 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index a69553d..7c6d21a 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -3,7 +3,7 @@ ''' tests/test_warcprox.py - automated tests for warcprox -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -844,10 +844,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # close the warc assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] - warc = writer._available_warcs.queue[0] - warc_path = os.path.join(warc.directory, warc.finalname) + warc_path = os.path.join(writer.directory, writer.finalname) assert not os.path.exists(warc_path) - warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer() + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close() assert os.path.exists(warc_path) # read the warc @@ -1701,7 +1700,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path + warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: @@ -1933,9 +1932,8 @@ def test_long_warcprox_meta( # check that warcprox-meta was parsed and honored ("warc-prefix" param) assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] - warc = writer._available_warcs.queue[0] - warc_path = os.path.join(warc.directory, warc.finalname) - warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + warc_path = os.path.join(writer.directory, writer.finalname) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close() assert os.path.exists(warc_path) # read the warc @@ -2118,7 +2116,7 @@ def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies): wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that response records were written - warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path + warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path with open(warc, 'rb') as f: rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f)) record = next(rec_iter) @@ -2198,7 +2196,7 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies): wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that response records were written - warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path + warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path with open(warc, 'rb') as f: rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f)) record = next(rec_iter) diff --git a/tests/test_writer.py b/tests/test_writer.py index f1d2466..ee3dcca 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1,7 +1,7 @@ ''' tests/test_writer.py - warcprox warc writing tests -Copyright (C) 2017 Internet Archive +Copyright (C) 2017-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -49,7 +49,6 @@ def lock_file(q, filename): except IOError: q.put('FAILED TO OBTAIN LOCK') - def test_warc_writer_locking(tmpdir): """Test if WarcWriter is locking WARC files. When we don't have the .open suffix, WarcWriter locks the file and the @@ -64,7 +63,7 @@ def test_warc_writer_locking(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options( - directory=dirname, no_warc_open_suffix=True, writer_threads=1)) + directory=dirname, no_warc_open_suffix=True)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] assert warcs @@ -75,7 +74,7 @@ def test_warc_writer_locking(tmpdir): p.start() p.join() assert q.get() == 'FAILED TO OBTAIN LOCK' - wwriter.close_writer() + wwriter.close() # locking must succeed after writer has closed the WARC file. p = Process(target=lock_file, args=(q, target_warc)) @@ -96,8 +95,7 @@ def test_special_dont_write_prefix(): logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - wwt = warcprox.writerthread.WarcWriterProcessor( - Options(prefix='-', writer_threads=1)) + wwt = warcprox.writerthread.WarcWriterProcessor(Options(prefix='-')) wwt.inq = queue.Queue(maxsize=1) wwt.outq = queue.Queue(maxsize=1) try: @@ -131,7 +129,7 @@ def test_special_dont_write_prefix(): wwt.join() wwt = warcprox.writerthread.WarcWriterProcessor( - Options(writer_threads=1, blackout_period=60, prefix='foo')) + Options(blackout_period=60, prefix='foo')) wwt.inq = queue.Queue(maxsize=1) wwt.outq = queue.Queue(maxsize=1) try: @@ -199,14 +197,12 @@ def test_special_dont_write_prefix(): wwt.stop.set() wwt.join() - def test_do_not_archive(): with tempfile.TemporaryDirectory() as tmpdir: logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - wwt = warcprox.writerthread.WarcWriterProcessor( - Options(writer_threads=1)) + wwt = warcprox.writerthread.WarcWriterProcessor() wwt.inq = queue.Queue(maxsize=1) wwt.outq = queue.Queue(maxsize=1) try: @@ -240,7 +236,6 @@ def test_do_not_archive(): wwt.stop.set() wwt.join() - def test_warc_writer_filename(tmpdir): """Test if WarcWriter is writing WARC files with custom filenames. """ @@ -253,11 +248,9 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}', - writer_threads=1)) + warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs assert re.search( - r'\d{17}_foo_\d{14}_00000.warc.open', - wwriter._available_warcs.queue[0].path) + r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path) diff --git a/warcprox/controller.py b/warcprox/controller.py index 80eca1c..4311bdd 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -143,10 +143,6 @@ class WarcproxController(object): self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) - # https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads - if not self.options.writer_threads: - self.options.writer_threads = 1 - self.build_postfetch_chain(self.proxy.recorded_url_q) self.service_registry = Factory.service_registry(options) diff --git a/warcprox/main.py b/warcprox/main.py index bf8d11e..8a96a7c 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -196,11 +196,6 @@ def _build_arg_parser(prog='warcprox', show_hidden=False): help=suppress( 'turn on performance profiling; summary statistics are dumped ' 'every 10 minutes and at shutdown')) - hidden.add_argument( - '--writer-threads', dest='writer_threads', type=int, default=1, - help=suppress( - 'number of warc writer threads; caution, see ' - 'https://github.com/internetarchive/warcprox/issues/101')) arg_parser.add_argument( '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', default=None, help=( diff --git a/warcprox/writer.py b/warcprox/writer.py index ffdca8f..6aa9010 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -1,7 +1,7 @@ ''' warcprox/writer.py - warc writer, manages and writes records to warc files -Copyright (C) 2013-2017 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -29,37 +29,41 @@ import warcprox import os import socket import random -import threading -try: - import queue -except ImportError: - import Queue as queue -import contextlib -class _OneWritableWarc: +class WarcWriter: ''' - Utility class used by WarcWriter + A writer for one warc prefix, which rolls over to new warc file, + incrementing serial number, when size limit is hit. Should only be used + from one thread. ''' + logger = logging.getLogger('warcprox.writer.WarcWriter') - logger = logging.getLogger('warcprox.writer._OneWritableWarc') + def __init__(self, options=warcprox.Options()): + self.options = options + + self.gzip = options.gzip or False + self.record_builder = warcprox.warc.WarcRecordBuilder( + digest_algorithm=options.digest_algorithm or 'sha1', + base32=options.base32) - def __init__(self, options=warcprox.Options(), randomtoken='0'): self.f = None self.path = None self.finalname = None self.gzip = options.gzip or False self.prefix = options.prefix or 'warcprox' self.open_suffix = '' if options.no_warc_open_suffix else '.open' - self.randomtoken = randomtoken self.rollover_size = options.rollover_size or 1000000000 self.rollover_idle_time = options.rollover_idle_time or None self.directory = options.directory or './warcs' self.filename_template = options.warc_filename or \ '{prefix}-{timestamp17}-{randomtoken}-{serialno}' self.last_activity = time.time() + self.serial = 0 + self.randomtoken = ''.join( + random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8)) # h3 default - def next_filename(self, serial): + def filename(self, serial): """WARC filename is configurable with CLI parameter --warc-filename. Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}' Available variables are: prefix, timestamp14, timestamp17, serialno, @@ -81,13 +85,17 @@ class _OneWritableWarc: return fname def open(self, serial): + ''' + Opens a new warc file with filename prefix `self.prefix` and serial + number `self.serial` and assigns file handle to `self.f`. + ''' if not os.path.exists(self.directory): self.logger.info( "warc destination directory %s doesn't exist, creating it", self.directory) os.mkdir(self.directory) - self.finalname = self.next_filename(serial) + self.finalname = self.filename(serial) self.logger.trace('opening %s', self.finalname) self.path = os.path.sep.join( [self.directory, self.finalname + self.open_suffix]) @@ -103,7 +111,53 @@ class _OneWritableWarc: 'could not lock file %s (%s)', self.path, exc) return self.f + def ensure_open(self): + ''' + Ensures `self.f` is ready to write the next warc record. + + Closes current warc if size limit has been reached. Then, if warc is + not open, opens one, and writes the warcinfo record. + ''' + self.maybe_size_rollover() + if not self.f: + serial = self.serial + self.serial += 1 + self.open(serial) + warcinfo = self.record_builder.build_warcinfo_record(self.finalname) + self.logger.debug('warcinfo.headers=%s', warcinfo.headers) + warcinfo.write_to(self.f, gzip=self.gzip) + + def write_records(self, recorded_url): + ''' + Returns tuple of records written, which are instances of + `hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and + `offset` attributes. + ''' + records = self.record_builder.build_warc_records(recorded_url) + + self.ensure_open() + for record in records: + offset = self.f.tell() + record.write_to(self.f, gzip=self.gzip) + record.offset = offset + record.length = self.f.tell() - offset + record.warc_filename = self.finalname + self.logger.trace( + 'wrote warc record: warc_type=%s content_length=%s ' + 'digest=%s offset=%d warc=%s url=%s', record.type, + record.get_header(warctools.WarcRecord.CONTENT_LENGTH), + record.get_header(b'WARC-Payload-Digest'), record.offset, + self.path, record.get_header(warctools.WarcRecord.URL)) + + return records + def close(self): + ''' + Closes out the active warc. + + The next call to `write_records()` will write to a a new warc file with + the serial number incremented. + ''' if self.path: self.logger.trace('closing %s', self.finalname) if self.open_suffix == '': @@ -136,112 +190,16 @@ class _OneWritableWarc: self.finalname, os.path.getsize(self.path)) self.close() -class WarcWriter: - logger = logging.getLogger('warcprox.writer.WarcWriter') - - def __init__(self, options=warcprox.Options()): - self.options = options - - self.gzip = options.gzip or False - self.record_builder = warcprox.warc.WarcRecordBuilder( - digest_algorithm=options.digest_algorithm or 'sha1', - base32=options.base32) - - self._available_warcs = queue.Queue() - self._warc_count = 0 - self._warc_count_lock = threading.Lock() - - self._serial = 0 - self._serial_lock = threading.Lock() - - self._randomtoken = ''.join( - random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8)) - - def _bespeak_warc(self): - try: - return self._available_warcs.get(block=False) - except queue.Empty: - with self._warc_count_lock: - if self._warc_count < self.options.writer_threads: - self._warc_count += 1 - return _OneWritableWarc(self.options, self._randomtoken) - # else we're maxed out, wait for one to free up - return self._available_warcs.get(block=True) - - @contextlib.contextmanager - def _warc(self): - warc = self._bespeak_warc() - - warc.maybe_size_rollover() - - # lazy file open - if warc.f == None: - with self._serial_lock: - serial = self._serial - self._serial += 1 - warc.open(serial) - warcinfo = self.record_builder.build_warcinfo_record(warc.finalname) - self.logger.debug('warcinfo.headers=%s', warcinfo.headers) - warcinfo.write_to(warc.f, gzip=self.gzip) - - yield warc - - # __exit__() - warc.f.flush() - warc.last_activity = time.time() - self._available_warcs.put(warc) - - def write_records(self, recorded_url): - """Returns tuple of records written, which are instances of - hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and - "offset" attributes.""" - records = self.record_builder.build_warc_records(recorded_url) - - with self._warc() as warc: - for record in records: - offset = warc.f.tell() - record.write_to(warc.f, gzip=self.gzip) - record.offset = offset - record.length = warc.f.tell() - offset - record.warc_filename = warc.finalname - self.logger.trace( - 'wrote warc record: warc_type=%s content_length=%s ' - 'digest=%s offset=%d warc=%s url=%s', - record.type, - record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(b'WARC-Payload-Digest'), - record.offset, warc.path, - record.get_header(warctools.WarcRecord.URL)) - - return records - - def maybe_idle_rollover(self): - warcs = [] - while True: - try: - warc = self._available_warcs.get(block=False) - warcs.append(warc) - except queue.Empty: - break - for warc in warcs: - warc.maybe_idle_rollover() - self._available_warcs.put(warc) - - def close_writer(self): - while self._warc_count > 0: - with self._warc_count_lock: - warc = self._available_warcs.get() - warc.close() - self._warc_count -= 1 - class WarcWriterPool: + ''' + A `WarcWriter` per warc prefix. Should only be used from one thread. + ''' logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): self.default_warc_writer = WarcWriter(options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options - self._lock = threading.RLock() self._last_maybe = time.time() # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set @@ -251,16 +209,17 @@ class WarcWriterPool: # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) options = warcprox.Options(**vars(self.options)) options.prefix = recorded_url.warcprox_meta["warc-prefix"] - with self._lock: - if not options.prefix in self.warc_writers: - self.warc_writers[options.prefix] = WarcWriter(options) - w = self.warc_writers[options.prefix] + if not options.prefix in self.warc_writers: + self.warc_writers[options.prefix] = WarcWriter(options) + w = self.warc_writers[options.prefix] return w def write_records(self, recorded_url): - """Returns tuple of records written, which are instances of - hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and - "offset" attributes.""" + ''' + Returns tuple of records written, which are instances of + `hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and + `offset` attributes. + ''' return self._writer(recorded_url).write_records(recorded_url) def maybe_idle_rollover(self): @@ -271,7 +230,8 @@ class WarcWriterPool: self._last_maybe = time.time() def close_writers(self): - self.default_warc_writer.close_writer() - for w in self.warc_writers.values(): - w.close_writer() + self.default_warc_writer.close() + for prefix, writer in list(self.warc_writers.items()): + del self.warc_writers[prefix] + writer.close() diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 75aeb05..5ab5889 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -2,7 +2,7 @@ warcprox/writerthread.py - warc writer thread, reads from the recorded url queue, writes warc records, runs final tasks after warc records are written -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -43,46 +43,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) - - # set max_queued small, because self.inq is already handling queueing - self.thread_local = threading.local() - self.thread_profilers = {} - # for us; but give it a little breathing room to make sure it can keep - # worker threads busy - self.pool = warcprox.ThreadPoolExecutor( - max_workers=options.writer_threads or 1, - max_queued=10 * (options.writer_threads or 1)) - self.batch = set() self.blackout_period = options.blackout_period or 0 - def _startup(self): - self.logger.info('%s warc writer threads', self.pool._max_workers) - warcprox.BaseStandardPostfetchProcessor._startup(self) - - def _get_process_put(self): - try: - recorded_url = self.inq.get(block=True, timeout=0.5) - self.batch.add(recorded_url) - self.pool.submit(self._wrap_process_url, recorded_url) - finally: - self.writer_pool.maybe_idle_rollover() - - def _wrap_process_url(self, recorded_url): - if not getattr(self.thread_local, 'name_set', False): - threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid() - self.thread_local.name_set = True - if self.options.profile: - import cProfile - if not hasattr(self.thread_local, 'profiler'): - self.thread_local.profiler = cProfile.Profile() - tid = threading.current_thread().ident - self.thread_profilers[tid] = self.thread_local.profiler - self.thread_local.profiler.enable() - self._process_url(recorded_url) - self.thread_local.profiler.disable() - else: - self._process_url(recorded_url) - def _process_url(self, recorded_url): try: records = [] @@ -97,10 +59,6 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): logging.error( 'caught exception processing %s', recorded_url.url, exc_info=True) - finally: - self.batch.remove(recorded_url) - if self.outq: - self.outq.put(recorded_url) def _filter_accepts(self, recorded_url): if not self.method_filter: From 79d09d013bfa6f4a3e5aa386ca5dc2e206802d18 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 8 Jan 2019 11:15:20 -0800 Subject: [PATCH 2/3] ThreadPoolExecutor no longer used it was part of the multi-threaded warc writer implementation --- warcprox/__init__.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 6a8e00e..e2c8df7 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,7 +1,7 @@ """ warcprox/__init__.py - warcprox package main file, contains some utility code -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -57,17 +57,6 @@ class Jsonner(json.JSONEncoder): else: return json.JSONEncoder.default(self, o) -class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): - ''' - `concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size. - - If `max_queued` is set, calls to `submit()` will block if necessary until a - free slot is available. - ''' - def __init__(self, max_queued=None, *args, **kwargs): - super().__init__(*args, **kwargs) - self._work_queue = queue.Queue(maxsize=max_queued or 0) - # XXX linux-specific def gettid(): try: From 150c1e67c61952417858df8f849960387cb286e3 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 8 Jan 2019 11:27:11 -0800 Subject: [PATCH 3/3] WarcWriterProcessor.close_for_prefix() New API to allow some code from outside of warcprox proper (in a third-party plugin for example) can close open warcs promptly when it knows they are finished. --- tests/test_writer.py | 118 +++++++++++++++++++++++++++++++++++++++ warcprox/controller.py | 2 +- warcprox/main.py | 2 +- warcprox/writer.py | 13 +++++ warcprox/writerthread.py | 25 +++++++++ 5 files changed, 158 insertions(+), 2 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index ee3dcca..cadbd96 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -36,6 +36,12 @@ import tempfile import logging import hashlib import queue +import sys + +logging.basicConfig( + stream=sys.stdout, level=logging.TRACE, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') def lock_file(q, filename): """Try to lock file and return 1 if successful, else return 0. @@ -254,3 +260,115 @@ def test_warc_writer_filename(tmpdir): assert warcs assert re.search( r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path) + +def test_close_for_prefix(tmpdir): + wwp = warcprox.writerthread.WarcWriterProcessor( + Options(directory=str(tmpdir))) + wwp.inq = queue.Queue(maxsize=1) + wwp.outq = queue.Queue(maxsize=1) + + try: + wwp.start() + + # write a record to the default prefix + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/1', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest)) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/1' + assert len(tmpdir.listdir()) == 1 + assert tmpdir.listdir()[0].basename.startswith('warcprox-') + assert tmpdir.listdir()[0].basename.endswith('-00000.warc.open') + assert tmpdir.listdir()[0].basename == wwp.writer_pool.default_warc_writer.finalname + '.open' + + # request close of default warc + wwp.close_for_prefix() + + # write a record to some other prefix + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/2', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest, + warcprox_meta={'warc-prefix': 'some-prefix'})) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/2' + assert len(tmpdir.listdir()) == 2 + basenames = sorted(f.basename for f in tmpdir.listdir()) + assert basenames[0].startswith('some-prefix-') + assert basenames[0].endswith('-00000.warc.open') + assert basenames[1].startswith('warcprox-') + assert basenames[1].endswith('-00000.warc') + + # request close of warc with prefix + wwp.close_for_prefix('some-prefix') + + # write another record to the default prefix + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/3', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest)) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/3' + # now some-prefix warc is closed and a new default prefix warc is open + basenames = sorted(f.basename for f in tmpdir.listdir()) + assert len(basenames) == 3 + assert basenames[0].startswith('some-prefix-') + assert basenames[0].endswith('-00000.warc') + assert basenames[1].startswith('warcprox-') + assert basenames[1].endswith('-00000.warc') + assert basenames[2].startswith('warcprox-') + assert basenames[2].endswith('-00001.warc.open') + + # write another record to with prefix "some-prefix" + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/4', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest, + warcprox_meta={'warc-prefix': 'some-prefix'})) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/4' + # new some-prefix warc will have a new random token and start over at + # serial 00000 + basenames = sorted(f.basename for f in tmpdir.listdir()) + assert len(basenames) == 4 + assert basenames[0].startswith('some-prefix-') + assert basenames[1].startswith('some-prefix-') + # order of these two warcs depends on random token so we don't know + # which is which + assert basenames[0][-5:] != basenames[1][-5:] + assert '-00000.' in basenames[0] + assert '-00000.' in basenames[1] + + assert basenames[2].startswith('warcprox-') + assert basenames[2].endswith('-00000.warc') + assert basenames[3].startswith('warcprox-') + assert basenames[3].endswith('-00001.warc.open') + + finally: + wwp.stop.set() + wwp.join() diff --git a/warcprox/controller.py b/warcprox/controller.py index 4311bdd..0b3daef 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for sending heartbeats to the service registry if configured to do so; also has some memory profiling capabilities -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License diff --git a/warcprox/main.py b/warcprox/main.py index 8a96a7c..8dab727 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -4,7 +4,7 @@ warcprox/main.py - entrypoint for warcprox executable, parses command line arguments, initializes components, starts controller, handles signals -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License diff --git a/warcprox/writer.py b/warcprox/writer.py index 6aa9010..96293db 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -148,6 +148,7 @@ class WarcWriter: record.get_header(warctools.WarcRecord.CONTENT_LENGTH), record.get_header(b'WARC-Payload-Digest'), record.offset, self.path, record.get_header(warctools.WarcRecord.URL)) + self.f.flush() return records @@ -235,3 +236,15 @@ class WarcWriterPool: del self.warc_writers[prefix] writer.close() + def close_for_prefix(self, prefix=None): + ''' + Close warc writer for the given warc prefix, or the default prefix if + `prefix` is `None`. + ''' + if prefix and prefix in self.warc_writers: + writer = self.warc_writers[prefix] + del self.warc_writers[prefix] + writer.close() + else: + self.default_warc_writer.close() + diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 5ab5889..f6ac277 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -33,6 +33,10 @@ import warcprox from concurrent import futures from datetime import datetime import threading +try: + import queue +except ImportError: + import Queue as queue class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") @@ -44,6 +48,27 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) self.blackout_period = options.blackout_period or 0 + self.close_prefix_reqs = queue.Queue() + + def _get_process_put(self): + while True: + try: + prefix = self.close_prefix_reqs.get_nowait() + self.writer_pool.close_for_prefix(prefix) + except queue.Empty: + break + super()._get_process_put() + + def close_for_prefix(self, prefix=None): + ''' + Request close of warc writer for the given warc prefix, or the default + prefix if `prefix` is `None`. + + This API exists so that some code from outside of warcprox proper (in a + third-party plugin for example) can close open warcs promptly when it + knows they are finished. + ''' + self.close_prefix_reqs.put(prefix) def _process_url(self, recorded_url): try: