From 0882a2b174a2540546a931eab8b83f3d5c0eb71d Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Mon, 7 Jan 2019 15:54:35 -0800
Subject: [PATCH 1/4] remove --writer-threads option

Support for multiple writer threads was broken, and profiling had shown
it to be of dubious utility.
https://github.com/internetarchive/warcprox/issues/101
https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads
---
 tests/test_warcprox.py   |  18 ++--
 tests/test_writer.py     |  23 ++---
 warcprox/controller.py   |   4 -
 warcprox/main.py         |   5 -
 warcprox/writer.py       | 206 ++++++++++++++++-----------------------
 warcprox/writerthread.py |  44 +--------
 6 files changed, 100 insertions(+), 200 deletions(-)

diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py
index a69553d..7c6d21a 100755
--- a/tests/test_warcprox.py
+++ b/tests/test_warcprox.py
@@ -3,7 +3,7 @@
 '''
 tests/test_warcprox.py - automated tests for warcprox
 
-Copyright (C) 2013-2018 Internet Archive
+Copyright (C) 2013-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -844,10 +844,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
     # close the warc
     assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"]
     writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"]
-    warc = writer._available_warcs.queue[0]
-    warc_path = os.path.join(warc.directory, warc.finalname)
+    warc_path = os.path.join(writer.directory, writer.finalname)
     assert not os.path.exists(warc_path)
-    warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
+    warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close()
     assert os.path.exists(warc_path)
 
     # read the warc
@@ -1701,7 +1700,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
     assert response.status_code == 200
     assert not 'via' in playback_response
 
-    warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
+    warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
     with open(warc, 'rb') as f:
         for record in warcio.archiveiterator.ArchiveIterator(f):
             if record.rec_headers.get_header('warc-target-uri') == url:
@@ -1933,9 +1932,8 @@ def test_long_warcprox_meta(
     # check that warcprox-meta was parsed and honored ("warc-prefix" param)
     assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"]
     writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"]
-    warc = writer._available_warcs.queue[0]
-    warc_path = os.path.join(warc.directory, warc.finalname)
-    warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
+    warc_path = os.path.join(writer.directory, writer.finalname)
+    warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close()
     assert os.path.exists(warc_path)
 
     # read the warc
@@ -2118,7 +2116,7 @@ def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies):
     wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
 
     # check that response records were written
-    warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
+    warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
     with open(warc, 'rb') as f:
         rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
         record = next(rec_iter)
@@ -2198,7 +2196,7 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies):
     wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
 
     # check that response records were written
-    warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
+    warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
     with open(warc, 'rb') as f:
         rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
         record = next(rec_iter)
diff --git a/tests/test_writer.py b/tests/test_writer.py
index f1d2466..ee3dcca 100644
--- a/tests/test_writer.py
+++ b/tests/test_writer.py
@@ -1,7 +1,7 @@
 '''
 tests/test_writer.py - warcprox warc writing tests
 
-Copyright (C) 2017 Internet Archive
+Copyright (C) 2017-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -49,7 +49,6 @@ def lock_file(q, filename):
     except IOError:
         q.put('FAILED TO OBTAIN LOCK')
 
-
 def test_warc_writer_locking(tmpdir):
     """Test if WarcWriter is locking WARC files.
     When we don't have the .open suffix, WarcWriter locks the file and the
@@ -64,7 +63,7 @@ def test_warc_writer_locking(tmpdir):
 
     dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
     wwriter = WarcWriter(Options(
-        directory=dirname, no_warc_open_suffix=True, writer_threads=1))
+        directory=dirname, no_warc_open_suffix=True))
     wwriter.write_records(recorded_url)
     warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')]
     assert warcs
@@ -75,7 +74,7 @@ def test_warc_writer_locking(tmpdir):
     p.start()
     p.join()
     assert q.get() == 'FAILED TO OBTAIN LOCK'
-    wwriter.close_writer()
+    wwriter.close()
 
     # locking must succeed after writer has closed the WARC file.
     p = Process(target=lock_file, args=(q, target_warc))
@@ -96,8 +95,7 @@ def test_special_dont_write_prefix():
         logging.debug('cd %s', tmpdir)
         os.chdir(tmpdir)
 
-        wwt = warcprox.writerthread.WarcWriterProcessor(
-                Options(prefix='-', writer_threads=1))
+        wwt = warcprox.writerthread.WarcWriterProcessor(Options(prefix='-'))
         wwt.inq = queue.Queue(maxsize=1)
         wwt.outq = queue.Queue(maxsize=1)
         try:
@@ -131,7 +129,7 @@ def test_special_dont_write_prefix():
             wwt.join()
 
         wwt = warcprox.writerthread.WarcWriterProcessor(
-                Options(writer_threads=1, blackout_period=60, prefix='foo'))
+                Options(blackout_period=60, prefix='foo'))
         wwt.inq = queue.Queue(maxsize=1)
         wwt.outq = queue.Queue(maxsize=1)
         try:
@@ -199,14 +197,12 @@ def test_special_dont_write_prefix():
             wwt.stop.set()
             wwt.join()
 
-
 def test_do_not_archive():
     with tempfile.TemporaryDirectory() as tmpdir:
         logging.debug('cd %s', tmpdir)
         os.chdir(tmpdir)
 
-        wwt = warcprox.writerthread.WarcWriterProcessor(
-                Options(writer_threads=1))
+        wwt = warcprox.writerthread.WarcWriterProcessor()
         wwt.inq = queue.Queue(maxsize=1)
         wwt.outq = queue.Queue(maxsize=1)
         try:
@@ -240,7 +236,6 @@ def test_do_not_archive():
             wwt.stop.set()
             wwt.join()
 
-
 def test_warc_writer_filename(tmpdir):
     """Test if WarcWriter is writing WARC files with custom filenames.
     """
@@ -253,11 +248,9 @@ def test_warc_writer_filename(tmpdir):
 
     dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
     wwriter = WarcWriter(Options(directory=dirname, prefix='foo',
-        warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}',
-        writer_threads=1))
+        warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}'))
     wwriter.write_records(recorded_url)
     warcs = [fn for fn in os.listdir(dirname)]
     assert warcs
     assert re.search(
-            r'\d{17}_foo_\d{14}_00000.warc.open',
-            wwriter._available_warcs.queue[0].path)
+            r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path)
diff --git a/warcprox/controller.py b/warcprox/controller.py
index 80eca1c..4311bdd 100644
--- a/warcprox/controller.py
+++ b/warcprox/controller.py
@@ -143,10 +143,6 @@ class WarcproxController(object):
         self.playback_proxy = Factory.playback_proxy(
             self.proxy.ca, self.options)
 
-        # https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads
-        if not self.options.writer_threads:
-            self.options.writer_threads = 1
-
         self.build_postfetch_chain(self.proxy.recorded_url_q)
 
         self.service_registry = Factory.service_registry(options)
diff --git a/warcprox/main.py b/warcprox/main.py
index bf8d11e..8a96a7c 100644
--- a/warcprox/main.py
+++ b/warcprox/main.py
@@ -196,11 +196,6 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
             help=suppress(
                 'turn on performance profiling; summary statistics are dumped '
                 'every 10 minutes and at shutdown'))
-    hidden.add_argument(
-            '--writer-threads', dest='writer_threads', type=int, default=1,
-            help=suppress(
-                'number of warc writer threads; caution, see '
-                'https://github.com/internetarchive/warcprox/issues/101'))
     arg_parser.add_argument(
             '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
             default=None, help=(
diff --git a/warcprox/writer.py b/warcprox/writer.py
index ffdca8f..6aa9010 100644
--- a/warcprox/writer.py
+++ b/warcprox/writer.py
@@ -1,7 +1,7 @@
 '''
 warcprox/writer.py - warc writer, manages and writes records to warc files
 
-Copyright (C) 2013-2017 Internet Archive
+Copyright (C) 2013-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -29,37 +29,41 @@ import warcprox
 import os
 import socket
 import random
-import threading
-try:
-    import queue
-except ImportError:
-    import Queue as queue
-import contextlib
 
-class _OneWritableWarc:
+class WarcWriter:
     '''
-    Utility class used by WarcWriter
+    A writer for one warc prefix, which rolls over to new warc file,
+    incrementing serial number, when size limit is hit. Should only be used
+    from one thread.
     '''
+    logger = logging.getLogger('warcprox.writer.WarcWriter')
 
-    logger = logging.getLogger('warcprox.writer._OneWritableWarc')
+    def __init__(self, options=warcprox.Options()):
+        self.options = options
+
+        self.gzip = options.gzip or False
+        self.record_builder = warcprox.warc.WarcRecordBuilder(
+                digest_algorithm=options.digest_algorithm or 'sha1',
+                base32=options.base32)
 
-    def __init__(self, options=warcprox.Options(), randomtoken='0'):
         self.f = None
         self.path = None
         self.finalname = None
         self.gzip = options.gzip or False
         self.prefix = options.prefix or 'warcprox'
         self.open_suffix = '' if options.no_warc_open_suffix else '.open'
-        self.randomtoken = randomtoken
         self.rollover_size = options.rollover_size or 1000000000
         self.rollover_idle_time = options.rollover_idle_time or None
         self.directory = options.directory or './warcs'
         self.filename_template = options.warc_filename or \
                 '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
         self.last_activity = time.time()
+        self.serial = 0
+        self.randomtoken = ''.join(
+                random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
 
     # h3 default <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
-    def next_filename(self, serial):
+    def filename(self, serial):
         """WARC filename is configurable with CLI parameter --warc-filename.
         Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
         Available variables are: prefix, timestamp14, timestamp17, serialno,
@@ -81,13 +85,17 @@ class _OneWritableWarc:
         return fname
 
     def open(self, serial):
+        '''
+        Opens a new warc file with filename prefix `self.prefix` and serial
+        number `self.serial` and assigns file handle to `self.f`.
+        '''
         if not os.path.exists(self.directory):
             self.logger.info(
                     "warc destination directory %s doesn't exist, creating it",
                     self.directory)
             os.mkdir(self.directory)
 
-        self.finalname = self.next_filename(serial)
+        self.finalname = self.filename(serial)
         self.logger.trace('opening %s', self.finalname)
         self.path = os.path.sep.join(
                 [self.directory, self.finalname + self.open_suffix])
@@ -103,7 +111,53 @@ class _OneWritableWarc:
                         'could not lock file %s (%s)', self.path, exc)
         return self.f
 
+    def ensure_open(self):
+        '''
+        Ensures `self.f` is ready to write the next warc record.
+
+        Closes current warc if size limit has been reached. Then, if warc is
+        not open, opens one, and writes the warcinfo record.
+        '''
+        self.maybe_size_rollover()
+        if not self.f:
+            serial = self.serial
+            self.serial += 1
+            self.open(serial)
+            warcinfo = self.record_builder.build_warcinfo_record(self.finalname)
+            self.logger.debug('warcinfo.headers=%s', warcinfo.headers)
+            warcinfo.write_to(self.f, gzip=self.gzip)
+
+    def write_records(self, recorded_url):
+        '''
+        Returns tuple of records written, which are instances of
+        `hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and
+        `offset` attributes.
+        '''
+        records = self.record_builder.build_warc_records(recorded_url)
+
+        self.ensure_open()
+        for record in records:
+            offset = self.f.tell()
+            record.write_to(self.f, gzip=self.gzip)
+            record.offset = offset
+            record.length = self.f.tell() - offset
+            record.warc_filename = self.finalname
+            self.logger.trace(
+                    'wrote warc record: warc_type=%s content_length=%s '
+                    'digest=%s offset=%d warc=%s url=%s', record.type,
+                    record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
+                    record.get_header(b'WARC-Payload-Digest'), record.offset,
+                    self.path, record.get_header(warctools.WarcRecord.URL))
+
+        return records
+
     def close(self):
+        '''
+        Closes out the active warc.
+
+        The next call to `write_records()` will write to a a new warc file with
+        the serial number incremented.
+        '''
         if self.path:
             self.logger.trace('closing %s', self.finalname)
             if self.open_suffix == '':
@@ -136,112 +190,16 @@ class _OneWritableWarc:
                     self.finalname, os.path.getsize(self.path))
             self.close()
 
-class WarcWriter:
-    logger = logging.getLogger('warcprox.writer.WarcWriter')
-
-    def __init__(self, options=warcprox.Options()):
-        self.options = options
-
-        self.gzip = options.gzip or False
-        self.record_builder = warcprox.warc.WarcRecordBuilder(
-                digest_algorithm=options.digest_algorithm or 'sha1',
-                base32=options.base32)
-
-        self._available_warcs = queue.Queue()
-        self._warc_count = 0
-        self._warc_count_lock = threading.Lock()
-
-        self._serial = 0
-        self._serial_lock = threading.Lock()
-
-        self._randomtoken = ''.join(
-                random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
-
-    def _bespeak_warc(self):
-        try:
-            return self._available_warcs.get(block=False)
-        except queue.Empty:
-            with self._warc_count_lock:
-                if self._warc_count < self.options.writer_threads:
-                    self._warc_count += 1
-                    return _OneWritableWarc(self.options, self._randomtoken)
-            # else we're maxed out, wait for one to free up
-            return self._available_warcs.get(block=True)
-
-    @contextlib.contextmanager
-    def _warc(self):
-        warc = self._bespeak_warc()
-
-        warc.maybe_size_rollover()
-
-        # lazy file open
-        if warc.f == None:
-            with self._serial_lock:
-                serial = self._serial
-                self._serial += 1
-            warc.open(serial)
-            warcinfo = self.record_builder.build_warcinfo_record(warc.finalname)
-            self.logger.debug('warcinfo.headers=%s', warcinfo.headers)
-            warcinfo.write_to(warc.f, gzip=self.gzip)
-
-        yield warc
-
-        # __exit__()
-        warc.f.flush()
-        warc.last_activity = time.time()
-        self._available_warcs.put(warc)
-
-    def write_records(self, recorded_url):
-        """Returns tuple of records written, which are instances of
-        hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
-        "offset" attributes."""
-        records = self.record_builder.build_warc_records(recorded_url)
-
-        with self._warc() as warc:
-            for record in records:
-                offset = warc.f.tell()
-                record.write_to(warc.f, gzip=self.gzip)
-                record.offset = offset
-                record.length = warc.f.tell() - offset
-                record.warc_filename = warc.finalname
-                self.logger.trace(
-                        'wrote warc record: warc_type=%s content_length=%s '
-                        'digest=%s offset=%d warc=%s url=%s',
-                        record.type,
-                        record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
-                        record.get_header(b'WARC-Payload-Digest'),
-                        record.offset, warc.path,
-                        record.get_header(warctools.WarcRecord.URL))
-
-        return records
-
-    def maybe_idle_rollover(self):
-        warcs = []
-        while True:
-            try:
-                warc = self._available_warcs.get(block=False)
-                warcs.append(warc)
-            except queue.Empty:
-                break
-        for warc in warcs:
-            warc.maybe_idle_rollover()
-            self._available_warcs.put(warc)
-
-    def close_writer(self):
-        while self._warc_count > 0:
-            with self._warc_count_lock:
-                warc = self._available_warcs.get()
-                warc.close()
-                self._warc_count -= 1
-
 class WarcWriterPool:
+    '''
+    A `WarcWriter` per warc prefix. Should only be used from one thread.
+    '''
     logger = logging.getLogger("warcprox.writer.WarcWriterPool")
 
     def __init__(self, options=warcprox.Options()):
         self.default_warc_writer = WarcWriter(options)
         self.warc_writers = {}  # {prefix:WarcWriter}
         self.options = options
-        self._lock = threading.RLock()
         self._last_maybe = time.time()
 
     # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
@@ -251,16 +209,17 @@ class WarcWriterPool:
             # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
             options = warcprox.Options(**vars(self.options))
             options.prefix = recorded_url.warcprox_meta["warc-prefix"]
-            with self._lock:
-                if not options.prefix in self.warc_writers:
-                    self.warc_writers[options.prefix] = WarcWriter(options)
-                w = self.warc_writers[options.prefix]
+            if not options.prefix in self.warc_writers:
+                self.warc_writers[options.prefix] = WarcWriter(options)
+            w = self.warc_writers[options.prefix]
         return w
 
     def write_records(self, recorded_url):
-        """Returns tuple of records written, which are instances of
-        hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
-        "offset" attributes."""
+        '''
+        Returns tuple of records written, which are instances of
+        `hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and
+        `offset` attributes.
+        '''
         return self._writer(recorded_url).write_records(recorded_url)
 
     def maybe_idle_rollover(self):
@@ -271,7 +230,8 @@ class WarcWriterPool:
             self._last_maybe = time.time()
 
     def close_writers(self):
-        self.default_warc_writer.close_writer()
-        for w in self.warc_writers.values():
-            w.close_writer()
+        self.default_warc_writer.close()
+        for prefix, writer in list(self.warc_writers.items()):
+            del self.warc_writers[prefix]
+            writer.close()
 
diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py
index 75aeb05..5ab5889 100644
--- a/warcprox/writerthread.py
+++ b/warcprox/writerthread.py
@@ -2,7 +2,7 @@
 warcprox/writerthread.py - warc writer thread, reads from the recorded url
 queue, writes warc records, runs final tasks after warc records are written
 
-Copyright (C) 2013-2018 Internet Archive
+Copyright (C) 2013-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -43,46 +43,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
         warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
         self.writer_pool = warcprox.writer.WarcWriterPool(options)
         self.method_filter = set(method.upper() for method in self.options.method_filter or [])
-
-        # set max_queued small, because self.inq is already handling queueing
-        self.thread_local = threading.local()
-        self.thread_profilers = {}
-        # for us; but give it a little breathing room to make sure it can keep
-        # worker threads busy
-        self.pool = warcprox.ThreadPoolExecutor(
-                max_workers=options.writer_threads or 1,
-                max_queued=10 * (options.writer_threads or 1))
-        self.batch = set()
         self.blackout_period = options.blackout_period or 0
 
-    def _startup(self):
-        self.logger.info('%s warc writer threads', self.pool._max_workers)
-        warcprox.BaseStandardPostfetchProcessor._startup(self)
-
-    def _get_process_put(self):
-        try:
-            recorded_url = self.inq.get(block=True, timeout=0.5)
-            self.batch.add(recorded_url)
-            self.pool.submit(self._wrap_process_url, recorded_url)
-        finally:
-            self.writer_pool.maybe_idle_rollover()
-
-    def _wrap_process_url(self, recorded_url):
-        if not getattr(self.thread_local, 'name_set', False):
-            threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid()
-            self.thread_local.name_set = True
-        if self.options.profile:
-            import cProfile
-            if not hasattr(self.thread_local, 'profiler'):
-                self.thread_local.profiler = cProfile.Profile()
-                tid = threading.current_thread().ident
-                self.thread_profilers[tid] = self.thread_local.profiler
-            self.thread_local.profiler.enable()
-            self._process_url(recorded_url)
-            self.thread_local.profiler.disable()
-        else:
-            self._process_url(recorded_url)
-
     def _process_url(self, recorded_url):
         try:
             records = []
@@ -97,10 +59,6 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
             logging.error(
                     'caught exception processing %s', recorded_url.url,
                     exc_info=True)
-        finally:
-            self.batch.remove(recorded_url)
-            if self.outq:
-                self.outq.put(recorded_url)
 
     def _filter_accepts(self, recorded_url):
         if not self.method_filter:

From 79d09d013bfa6f4a3e5aa386ca5dc2e206802d18 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Tue, 8 Jan 2019 11:15:20 -0800
Subject: [PATCH 2/4] ThreadPoolExecutor no longer used

it was part of the multi-threaded warc writer implementation
---
 warcprox/__init__.py | 13 +------------
 1 file changed, 1 insertion(+), 12 deletions(-)

diff --git a/warcprox/__init__.py b/warcprox/__init__.py
index 6a8e00e..e2c8df7 100644
--- a/warcprox/__init__.py
+++ b/warcprox/__init__.py
@@ -1,7 +1,7 @@
 """
 warcprox/__init__.py - warcprox package main file, contains some utility code
 
-Copyright (C) 2013-2018 Internet Archive
+Copyright (C) 2013-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -57,17 +57,6 @@ class Jsonner(json.JSONEncoder):
         else:
             return json.JSONEncoder.default(self, o)
 
-class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
-    '''
-    `concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size.
-
-    If `max_queued` is set, calls to `submit()` will block if necessary until a
-    free slot is available.
-    '''
-    def __init__(self, max_queued=None, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self._work_queue = queue.Queue(maxsize=max_queued or 0)
-
 # XXX linux-specific
 def gettid():
     try:

From 150c1e67c61952417858df8f849960387cb286e3 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Tue, 8 Jan 2019 11:27:11 -0800
Subject: [PATCH 3/4] WarcWriterProcessor.close_for_prefix()

New API to allow some code from outside of warcprox proper (in a
third-party plugin for example) can close open warcs promptly when it
knows they are finished.
---
 tests/test_writer.py     | 118 +++++++++++++++++++++++++++++++++++++++
 warcprox/controller.py   |   2 +-
 warcprox/main.py         |   2 +-
 warcprox/writer.py       |  13 +++++
 warcprox/writerthread.py |  25 +++++++++
 5 files changed, 158 insertions(+), 2 deletions(-)

diff --git a/tests/test_writer.py b/tests/test_writer.py
index ee3dcca..cadbd96 100644
--- a/tests/test_writer.py
+++ b/tests/test_writer.py
@@ -36,6 +36,12 @@ import tempfile
 import logging
 import hashlib
 import queue
+import sys
+
+logging.basicConfig(
+        stream=sys.stdout, level=logging.TRACE,
+        format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
+        '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
 
 def lock_file(q, filename):
     """Try to lock file and return 1 if successful, else return 0.
@@ -254,3 +260,115 @@ def test_warc_writer_filename(tmpdir):
     assert warcs
     assert re.search(
             r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path)
+
+def test_close_for_prefix(tmpdir):
+    wwp = warcprox.writerthread.WarcWriterProcessor(
+            Options(directory=str(tmpdir)))
+    wwp.inq = queue.Queue(maxsize=1)
+    wwp.outq = queue.Queue(maxsize=1)
+
+    try:
+        wwp.start()
+
+        # write a record to the default prefix
+        recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
+        recorder.read()
+        wwp.inq.put(RecordedUrl(
+            url='http://example.com/1', content_type='text/plain',
+            status=200, client_ip='127.0.0.2', request_data=b'abc',
+            response_recorder=recorder, remote_ip='127.0.0.3',
+            timestamp=datetime.utcnow(),
+            payload_digest=recorder.block_digest))
+        time.sleep(0.5)
+        rurl = wwp.outq.get() # wait for it to finish
+
+        assert rurl.url == b'http://example.com/1'
+        assert len(tmpdir.listdir()) == 1
+        assert tmpdir.listdir()[0].basename.startswith('warcprox-')
+        assert tmpdir.listdir()[0].basename.endswith('-00000.warc.open')
+        assert tmpdir.listdir()[0].basename == wwp.writer_pool.default_warc_writer.finalname + '.open'
+
+        # request close of default warc
+        wwp.close_for_prefix()
+
+        # write a record to some other prefix
+        recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
+        recorder.read()
+        wwp.inq.put(RecordedUrl(
+            url='http://example.com/2', content_type='text/plain',
+            status=200, client_ip='127.0.0.2', request_data=b'abc',
+            response_recorder=recorder, remote_ip='127.0.0.3',
+            timestamp=datetime.utcnow(),
+            payload_digest=recorder.block_digest,
+            warcprox_meta={'warc-prefix': 'some-prefix'}))
+        time.sleep(0.5)
+        rurl = wwp.outq.get() # wait for it to finish
+
+        assert rurl.url == b'http://example.com/2'
+        assert len(tmpdir.listdir()) == 2
+        basenames = sorted(f.basename for f in tmpdir.listdir())
+        assert basenames[0].startswith('some-prefix-')
+        assert basenames[0].endswith('-00000.warc.open')
+        assert basenames[1].startswith('warcprox-')
+        assert basenames[1].endswith('-00000.warc')
+
+        # request close of warc with prefix
+        wwp.close_for_prefix('some-prefix')
+
+        # write another record to the default prefix
+        recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
+        recorder.read()
+        wwp.inq.put(RecordedUrl(
+            url='http://example.com/3', content_type='text/plain',
+            status=200, client_ip='127.0.0.2', request_data=b'abc',
+            response_recorder=recorder, remote_ip='127.0.0.3',
+            timestamp=datetime.utcnow(),
+            payload_digest=recorder.block_digest))
+        time.sleep(0.5)
+        rurl = wwp.outq.get() # wait for it to finish
+
+        assert rurl.url == b'http://example.com/3'
+        # now some-prefix warc is closed and a new default prefix warc is open
+        basenames = sorted(f.basename for f in tmpdir.listdir())
+        assert len(basenames) == 3
+        assert basenames[0].startswith('some-prefix-')
+        assert basenames[0].endswith('-00000.warc')
+        assert basenames[1].startswith('warcprox-')
+        assert basenames[1].endswith('-00000.warc')
+        assert basenames[2].startswith('warcprox-')
+        assert basenames[2].endswith('-00001.warc.open')
+
+        # write another record to with prefix "some-prefix"
+        recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
+        recorder.read()
+        wwp.inq.put(RecordedUrl(
+            url='http://example.com/4', content_type='text/plain',
+            status=200, client_ip='127.0.0.2', request_data=b'abc',
+            response_recorder=recorder, remote_ip='127.0.0.3',
+            timestamp=datetime.utcnow(),
+            payload_digest=recorder.block_digest,
+            warcprox_meta={'warc-prefix': 'some-prefix'}))
+        time.sleep(0.5)
+        rurl = wwp.outq.get() # wait for it to finish
+
+        assert rurl.url == b'http://example.com/4'
+        # new some-prefix warc will have a new random token and start over at
+        # serial 00000
+        basenames = sorted(f.basename for f in tmpdir.listdir())
+        assert len(basenames) == 4
+        assert basenames[0].startswith('some-prefix-')
+        assert basenames[1].startswith('some-prefix-')
+        # order of these two warcs depends on random token so we don't know
+        # which is which
+        assert basenames[0][-5:] != basenames[1][-5:]
+        assert '-00000.' in basenames[0]
+        assert '-00000.' in basenames[1]
+
+        assert basenames[2].startswith('warcprox-')
+        assert basenames[2].endswith('-00000.warc')
+        assert basenames[3].startswith('warcprox-')
+        assert basenames[3].endswith('-00001.warc.open')
+
+    finally:
+        wwp.stop.set()
+        wwp.join()
diff --git a/warcprox/controller.py b/warcprox/controller.py
index 4311bdd..0b3daef 100644
--- a/warcprox/controller.py
+++ b/warcprox/controller.py
@@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for
 sending heartbeats to the service registry if configured to do so; also has
 some memory profiling capabilities
 
-Copyright (C) 2013-2018 Internet Archive
+Copyright (C) 2013-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
diff --git a/warcprox/main.py b/warcprox/main.py
index 8a96a7c..8dab727 100644
--- a/warcprox/main.py
+++ b/warcprox/main.py
@@ -4,7 +4,7 @@
 warcprox/main.py - entrypoint for warcprox executable, parses command line
 arguments, initializes components, starts controller, handles signals
 
-Copyright (C) 2013-2018 Internet Archive
+Copyright (C) 2013-2019 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
diff --git a/warcprox/writer.py b/warcprox/writer.py
index 6aa9010..96293db 100644
--- a/warcprox/writer.py
+++ b/warcprox/writer.py
@@ -148,6 +148,7 @@ class WarcWriter:
                     record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
                     record.get_header(b'WARC-Payload-Digest'), record.offset,
                     self.path, record.get_header(warctools.WarcRecord.URL))
+        self.f.flush()
 
         return records
 
@@ -235,3 +236,15 @@ class WarcWriterPool:
             del self.warc_writers[prefix]
             writer.close()
 
+    def close_for_prefix(self, prefix=None):
+        '''
+        Close warc writer for the given warc prefix, or the default prefix if
+        `prefix` is `None`.
+        '''
+        if prefix and prefix in self.warc_writers:
+            writer = self.warc_writers[prefix]
+            del self.warc_writers[prefix]
+            writer.close()
+        else:
+            self.default_warc_writer.close()
+
diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py
index 5ab5889..f6ac277 100644
--- a/warcprox/writerthread.py
+++ b/warcprox/writerthread.py
@@ -33,6 +33,10 @@ import warcprox
 from concurrent import futures
 from datetime import datetime
 import threading
+try:
+    import queue
+except ImportError:
+    import Queue as queue
 
 class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
     logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor")
@@ -44,6 +48,27 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
         self.writer_pool = warcprox.writer.WarcWriterPool(options)
         self.method_filter = set(method.upper() for method in self.options.method_filter or [])
         self.blackout_period = options.blackout_period or 0
+        self.close_prefix_reqs = queue.Queue()
+
+    def _get_process_put(self):
+        while True:
+            try:
+                prefix = self.close_prefix_reqs.get_nowait()
+                self.writer_pool.close_for_prefix(prefix)
+            except queue.Empty:
+                break
+        super()._get_process_put()
+
+    def close_for_prefix(self, prefix=None):
+        '''
+        Request close of warc writer for the given warc prefix, or the default
+        prefix if `prefix` is `None`.
+
+        This API exists so that some code from outside of warcprox proper (in a
+        third-party plugin for example) can close open warcs promptly when it
+        knows they are finished.
+        '''
+        self.close_prefix_reqs.put(prefix)
 
     def _process_url(self, recorded_url):
         try:

From 8fd1af1d042e261963cf89797324df521785e455 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Wed, 9 Jan 2019 22:47:04 +0000
Subject: [PATCH 4/4] offer WarcproxController to plugin constructors

because plugin needs to get at stuff, especially the warc writer
processor, for this close api to be useful
---
 warcprox/__init__.py   | 11 +++++++----
 warcprox/controller.py | 16 ++++++++++------
 2 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/warcprox/__init__.py b/warcprox/__init__.py
index e2c8df7..852d3fc 100644
--- a/warcprox/__init__.py
+++ b/warcprox/__init__.py
@@ -81,11 +81,14 @@ class RequestBlockedByRule(Exception):
 class BasePostfetchProcessor(threading.Thread):
     logger = logging.getLogger("warcprox.BasePostfetchProcessor")
 
-    def __init__(self, options=Options()):
+    def __init__(self, options=Options(), controller=None, **kwargs):
         threading.Thread.__init__(self, name=self.__class__.__name__)
         self.options = options
+        self.controller = controller
+
         self.stop = threading.Event()
-        # these should be set before thread is started
+
+        # these should be set by the caller before thread is started
         self.inq = None
         self.outq = None
         self.profiler = None
@@ -205,8 +208,8 @@ class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
         raise Exception('not implemented')
 
 class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
-    def __init__(self, listener, options=Options()):
-        BaseStandardPostfetchProcessor.__init__(self, options)
+    def __init__(self, listener, options=Options(), controller=None, **kwargs):
+        BaseStandardPostfetchProcessor.__init__(self, options, controller, **kwargs)
         self.listener = listener
         self.name = listener.__class__.__name__
 
diff --git a/warcprox/controller.py b/warcprox/controller.py
index 0b3daef..fcdaa58 100644
--- a/warcprox/controller.py
+++ b/warcprox/controller.py
@@ -93,15 +93,19 @@ class Factory:
             return None
 
     @staticmethod
-    def plugin(qualname, options):
+    def plugin(qualname, options, controller=None):
         try:
             (module_name, class_name) = qualname.rsplit('.', 1)
             module_ = importlib.import_module(module_name)
             class_ = getattr(module_, class_name)
-            try: # new plugins take `options` argument
-                plugin = class_(options)
-            except: # backward-compatibility
-                plugin = class_()
+            try:
+                # new plugins take `options` and `controller` arguments
+                plugin = class_(options, controller)
+            except:
+                try: # medium plugins take `options` argument
+                    plugin = class_(options)
+                except: # old plugins take no arguments
+                    plugin = class_()
             # check that this is either a listener or a batch processor
             assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
             return plugin
@@ -229,7 +233,7 @@ class WarcproxController(object):
                         crawl_logger, self.options))
 
         for qualname in self.options.plugins or []:
-            plugin = Factory.plugin(qualname, self.options)
+            plugin = Factory.plugin(qualname, self.options, self)
             if hasattr(plugin, 'notify'):
                 self._postfetch_chain.append(
                         warcprox.ListenerPostfetchProcessor(