WarcWriterProcessor.close_for_prefix()

New API to allow some code from outside of warcprox proper (in a
third-party plugin for example) can close open warcs promptly when it
knows they are finished.
This commit is contained in:
Noah Levitt 2019-01-08 11:27:11 -08:00
parent 79d09d013b
commit 150c1e67c6
5 changed files with 158 additions and 2 deletions

View File

@ -36,6 +36,12 @@ import tempfile
import logging
import hashlib
import queue
import sys
logging.basicConfig(
stream=sys.stdout, level=logging.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
def lock_file(q, filename):
"""Try to lock file and return 1 if successful, else return 0.
@ -254,3 +260,115 @@ def test_warc_writer_filename(tmpdir):
assert warcs
assert re.search(
r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path)
def test_close_for_prefix(tmpdir):
wwp = warcprox.writerthread.WarcWriterProcessor(
Options(directory=str(tmpdir)))
wwp.inq = queue.Queue(maxsize=1)
wwp.outq = queue.Queue(maxsize=1)
try:
wwp.start()
# write a record to the default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/1', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/1'
assert len(tmpdir.listdir()) == 1
assert tmpdir.listdir()[0].basename.startswith('warcprox-')
assert tmpdir.listdir()[0].basename.endswith('-00000.warc.open')
assert tmpdir.listdir()[0].basename == wwp.writer_pool.default_warc_writer.finalname + '.open'
# request close of default warc
wwp.close_for_prefix()
# write a record to some other prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/2', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'some-prefix'}))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/2'
assert len(tmpdir.listdir()) == 2
basenames = sorted(f.basename for f in tmpdir.listdir())
assert basenames[0].startswith('some-prefix-')
assert basenames[0].endswith('-00000.warc.open')
assert basenames[1].startswith('warcprox-')
assert basenames[1].endswith('-00000.warc')
# request close of warc with prefix
wwp.close_for_prefix('some-prefix')
# write another record to the default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/3', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/3'
# now some-prefix warc is closed and a new default prefix warc is open
basenames = sorted(f.basename for f in tmpdir.listdir())
assert len(basenames) == 3
assert basenames[0].startswith('some-prefix-')
assert basenames[0].endswith('-00000.warc')
assert basenames[1].startswith('warcprox-')
assert basenames[1].endswith('-00000.warc')
assert basenames[2].startswith('warcprox-')
assert basenames[2].endswith('-00001.warc.open')
# write another record to with prefix "some-prefix"
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/4', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'some-prefix'}))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/4'
# new some-prefix warc will have a new random token and start over at
# serial 00000
basenames = sorted(f.basename for f in tmpdir.listdir())
assert len(basenames) == 4
assert basenames[0].startswith('some-prefix-')
assert basenames[1].startswith('some-prefix-')
# order of these two warcs depends on random token so we don't know
# which is which
assert basenames[0][-5:] != basenames[1][-5:]
assert '-00000.' in basenames[0]
assert '-00000.' in basenames[1]
assert basenames[2].startswith('warcprox-')
assert basenames[2].endswith('-00000.warc')
assert basenames[3].startswith('warcprox-')
assert basenames[3].endswith('-00001.warc.open')
finally:
wwp.stop.set()
wwp.join()

View File

@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for
sending heartbeats to the service registry if configured to do so; also has
some memory profiling capabilities
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License

View File

@ -4,7 +4,7 @@
warcprox/main.py - entrypoint for warcprox executable, parses command line
arguments, initializes components, starts controller, handles signals
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License

View File

@ -148,6 +148,7 @@ class WarcWriter:
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(b'WARC-Payload-Digest'), record.offset,
self.path, record.get_header(warctools.WarcRecord.URL))
self.f.flush()
return records
@ -235,3 +236,15 @@ class WarcWriterPool:
del self.warc_writers[prefix]
writer.close()
def close_for_prefix(self, prefix=None):
'''
Close warc writer for the given warc prefix, or the default prefix if
`prefix` is `None`.
'''
if prefix and prefix in self.warc_writers:
writer = self.warc_writers[prefix]
del self.warc_writers[prefix]
writer.close()
else:
self.default_warc_writer.close()

View File

@ -33,6 +33,10 @@ import warcprox
from concurrent import futures
from datetime import datetime
import threading
try:
import queue
except ImportError:
import Queue as queue
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor")
@ -44,6 +48,27 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
self.blackout_period = options.blackout_period or 0
self.close_prefix_reqs = queue.Queue()
def _get_process_put(self):
while True:
try:
prefix = self.close_prefix_reqs.get_nowait()
self.writer_pool.close_for_prefix(prefix)
except queue.Empty:
break
super()._get_process_put()
def close_for_prefix(self, prefix=None):
'''
Request close of warc writer for the given warc prefix, or the default
prefix if `prefix` is `None`.
This API exists so that some code from outside of warcprox proper (in a
third-party plugin for example) can close open warcs promptly when it
knows they are finished.
'''
self.close_prefix_reqs.put(prefix)
def _process_url(self, recorded_url):
try: