diff --git a/tests/test_writer.py b/tests/test_writer.py index ee3dcca..cadbd96 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -36,6 +36,12 @@ import tempfile import logging import hashlib import queue +import sys + +logging.basicConfig( + stream=sys.stdout, level=logging.TRACE, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') def lock_file(q, filename): """Try to lock file and return 1 if successful, else return 0. @@ -254,3 +260,115 @@ def test_warc_writer_filename(tmpdir): assert warcs assert re.search( r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path) + +def test_close_for_prefix(tmpdir): + wwp = warcprox.writerthread.WarcWriterProcessor( + Options(directory=str(tmpdir))) + wwp.inq = queue.Queue(maxsize=1) + wwp.outq = queue.Queue(maxsize=1) + + try: + wwp.start() + + # write a record to the default prefix + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/1', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest)) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/1' + assert len(tmpdir.listdir()) == 1 + assert tmpdir.listdir()[0].basename.startswith('warcprox-') + assert tmpdir.listdir()[0].basename.endswith('-00000.warc.open') + assert tmpdir.listdir()[0].basename == wwp.writer_pool.default_warc_writer.finalname + '.open' + + # request close of default warc + wwp.close_for_prefix() + + # write a record to some other prefix + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/2', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest, + warcprox_meta={'warc-prefix': 'some-prefix'})) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/2' + assert len(tmpdir.listdir()) == 2 + basenames = sorted(f.basename for f in tmpdir.listdir()) + assert basenames[0].startswith('some-prefix-') + assert basenames[0].endswith('-00000.warc.open') + assert basenames[1].startswith('warcprox-') + assert basenames[1].endswith('-00000.warc') + + # request close of warc with prefix + wwp.close_for_prefix('some-prefix') + + # write another record to the default prefix + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/3', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest)) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/3' + # now some-prefix warc is closed and a new default prefix warc is open + basenames = sorted(f.basename for f in tmpdir.listdir()) + assert len(basenames) == 3 + assert basenames[0].startswith('some-prefix-') + assert basenames[0].endswith('-00000.warc') + assert basenames[1].startswith('warcprox-') + assert basenames[1].endswith('-00000.warc') + assert basenames[2].startswith('warcprox-') + assert basenames[2].endswith('-00001.warc.open') + + # write another record to with prefix "some-prefix" + recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) + recorder.read() + wwp.inq.put(RecordedUrl( + url='http://example.com/4', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest, + warcprox_meta={'warc-prefix': 'some-prefix'})) + time.sleep(0.5) + rurl = wwp.outq.get() # wait for it to finish + + assert rurl.url == b'http://example.com/4' + # new some-prefix warc will have a new random token and start over at + # serial 00000 + basenames = sorted(f.basename for f in tmpdir.listdir()) + assert len(basenames) == 4 + assert basenames[0].startswith('some-prefix-') + assert basenames[1].startswith('some-prefix-') + # order of these two warcs depends on random token so we don't know + # which is which + assert basenames[0][-5:] != basenames[1][-5:] + assert '-00000.' in basenames[0] + assert '-00000.' in basenames[1] + + assert basenames[2].startswith('warcprox-') + assert basenames[2].endswith('-00000.warc') + assert basenames[3].startswith('warcprox-') + assert basenames[3].endswith('-00001.warc.open') + + finally: + wwp.stop.set() + wwp.join() diff --git a/warcprox/controller.py b/warcprox/controller.py index 4311bdd..0b3daef 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for sending heartbeats to the service registry if configured to do so; also has some memory profiling capabilities -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License diff --git a/warcprox/main.py b/warcprox/main.py index 8a96a7c..8dab727 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -4,7 +4,7 @@ warcprox/main.py - entrypoint for warcprox executable, parses command line arguments, initializes components, starts controller, handles signals -Copyright (C) 2013-2018 Internet Archive +Copyright (C) 2013-2019 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License diff --git a/warcprox/writer.py b/warcprox/writer.py index 6aa9010..96293db 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -148,6 +148,7 @@ class WarcWriter: record.get_header(warctools.WarcRecord.CONTENT_LENGTH), record.get_header(b'WARC-Payload-Digest'), record.offset, self.path, record.get_header(warctools.WarcRecord.URL)) + self.f.flush() return records @@ -235,3 +236,15 @@ class WarcWriterPool: del self.warc_writers[prefix] writer.close() + def close_for_prefix(self, prefix=None): + ''' + Close warc writer for the given warc prefix, or the default prefix if + `prefix` is `None`. + ''' + if prefix and prefix in self.warc_writers: + writer = self.warc_writers[prefix] + del self.warc_writers[prefix] + writer.close() + else: + self.default_warc_writer.close() + diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 5ab5889..f6ac277 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -33,6 +33,10 @@ import warcprox from concurrent import futures from datetime import datetime import threading +try: + import queue +except ImportError: + import Queue as queue class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") @@ -44,6 +48,27 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) self.blackout_period = options.blackout_period or 0 + self.close_prefix_reqs = queue.Queue() + + def _get_process_put(self): + while True: + try: + prefix = self.close_prefix_reqs.get_nowait() + self.writer_pool.close_for_prefix(prefix) + except queue.Empty: + break + super()._get_process_put() + + def close_for_prefix(self, prefix=None): + ''' + Request close of warc writer for the given warc prefix, or the default + prefix if `prefix` is `None`. + + This API exists so that some code from outside of warcprox proper (in a + third-party plugin for example) can close open warcs promptly when it + knows they are finished. + ''' + self.close_prefix_reqs.put(prefix) def _process_url(self, recorded_url): try: