Merge pull request #109 from nlevitt/warc-close-api

Warc close api
This commit is contained in:
Noah Levitt 2019-01-17 17:15:13 -08:00 committed by GitHub
commit 16e3302d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 221 deletions

View File

@ -3,7 +3,7 @@
'''
tests/test_warcprox.py - automated tests for warcprox
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -844,10 +844,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# close the warc
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"]
warc = writer._available_warcs.queue[0]
warc_path = os.path.join(warc.directory, warc.finalname)
warc_path = os.path.join(writer.directory, writer.finalname)
assert not os.path.exists(warc_path)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close()
assert os.path.exists(warc_path)
# read the warc
@ -1701,7 +1700,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
assert response.status_code == 200
assert not 'via' in playback_response
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
with open(warc, 'rb') as f:
for record in warcio.archiveiterator.ArchiveIterator(f):
if record.rec_headers.get_header('warc-target-uri') == url:
@ -1933,9 +1932,8 @@ def test_long_warcprox_meta(
# check that warcprox-meta was parsed and honored ("warc-prefix" param)
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"]
warc = writer._available_warcs.queue[0]
warc_path = os.path.join(warc.directory, warc.finalname)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
warc_path = os.path.join(writer.directory, writer.finalname)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close()
assert os.path.exists(warc_path)
# read the warc
@ -2118,7 +2116,7 @@ def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies):
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check that response records were written
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
with open(warc, 'rb') as f:
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
record = next(rec_iter)
@ -2198,7 +2196,7 @@ def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies):
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check that response records were written
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer.path
with open(warc, 'rb') as f:
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
record = next(rec_iter)

View File

@ -1,7 +1,7 @@
'''
tests/test_writer.py - warcprox warc writing tests
Copyright (C) 2017 Internet Archive
Copyright (C) 2017-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -36,6 +36,12 @@ import tempfile
import logging
import hashlib
import queue
import sys
logging.basicConfig(
stream=sys.stdout, level=logging.TRACE,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
def lock_file(q, filename):
"""Try to lock file and return 1 if successful, else return 0.
@ -49,7 +55,6 @@ def lock_file(q, filename):
except IOError:
q.put('FAILED TO OBTAIN LOCK')
def test_warc_writer_locking(tmpdir):
"""Test if WarcWriter is locking WARC files.
When we don't have the .open suffix, WarcWriter locks the file and the
@ -64,7 +69,7 @@ def test_warc_writer_locking(tmpdir):
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(
directory=dirname, no_warc_open_suffix=True, writer_threads=1))
directory=dirname, no_warc_open_suffix=True))
wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')]
assert warcs
@ -75,7 +80,7 @@ def test_warc_writer_locking(tmpdir):
p.start()
p.join()
assert q.get() == 'FAILED TO OBTAIN LOCK'
wwriter.close_writer()
wwriter.close()
# locking must succeed after writer has closed the WARC file.
p = Process(target=lock_file, args=(q, target_warc))
@ -96,8 +101,7 @@ def test_special_dont_write_prefix():
logging.debug('cd %s', tmpdir)
os.chdir(tmpdir)
wwt = warcprox.writerthread.WarcWriterProcessor(
Options(prefix='-', writer_threads=1))
wwt = warcprox.writerthread.WarcWriterProcessor(Options(prefix='-'))
wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1)
try:
@ -131,7 +135,7 @@ def test_special_dont_write_prefix():
wwt.join()
wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1, blackout_period=60, prefix='foo'))
Options(blackout_period=60, prefix='foo'))
wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1)
try:
@ -199,14 +203,12 @@ def test_special_dont_write_prefix():
wwt.stop.set()
wwt.join()
def test_do_not_archive():
with tempfile.TemporaryDirectory() as tmpdir:
logging.debug('cd %s', tmpdir)
os.chdir(tmpdir)
wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1))
wwt = warcprox.writerthread.WarcWriterProcessor()
wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1)
try:
@ -240,7 +242,6 @@ def test_do_not_archive():
wwt.stop.set()
wwt.join()
def test_warc_writer_filename(tmpdir):
"""Test if WarcWriter is writing WARC files with custom filenames.
"""
@ -253,11 +254,121 @@ def test_warc_writer_filename(tmpdir):
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, prefix='foo',
warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}',
writer_threads=1))
warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}'))
wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname)]
assert warcs
assert re.search(
r'\d{17}_foo_\d{14}_00000.warc.open',
wwriter._available_warcs.queue[0].path)
r'\d{17}_foo_\d{14}_00000.warc.open', wwriter.path)
def test_close_for_prefix(tmpdir):
wwp = warcprox.writerthread.WarcWriterProcessor(
Options(directory=str(tmpdir)))
wwp.inq = queue.Queue(maxsize=1)
wwp.outq = queue.Queue(maxsize=1)
try:
wwp.start()
# write a record to the default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/1', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/1'
assert len(tmpdir.listdir()) == 1
assert tmpdir.listdir()[0].basename.startswith('warcprox-')
assert tmpdir.listdir()[0].basename.endswith('-00000.warc.open')
assert tmpdir.listdir()[0].basename == wwp.writer_pool.default_warc_writer.finalname + '.open'
# request close of default warc
wwp.close_for_prefix()
# write a record to some other prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/2', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'some-prefix'}))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/2'
assert len(tmpdir.listdir()) == 2
basenames = sorted(f.basename for f in tmpdir.listdir())
assert basenames[0].startswith('some-prefix-')
assert basenames[0].endswith('-00000.warc.open')
assert basenames[1].startswith('warcprox-')
assert basenames[1].endswith('-00000.warc')
# request close of warc with prefix
wwp.close_for_prefix('some-prefix')
# write another record to the default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/3', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/3'
# now some-prefix warc is closed and a new default prefix warc is open
basenames = sorted(f.basename for f in tmpdir.listdir())
assert len(basenames) == 3
assert basenames[0].startswith('some-prefix-')
assert basenames[0].endswith('-00000.warc')
assert basenames[1].startswith('warcprox-')
assert basenames[1].endswith('-00000.warc')
assert basenames[2].startswith('warcprox-')
assert basenames[2].endswith('-00001.warc.open')
# write another record to with prefix "some-prefix"
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
wwp.inq.put(RecordedUrl(
url='http://example.com/4', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'some-prefix'}))
time.sleep(0.5)
rurl = wwp.outq.get() # wait for it to finish
assert rurl.url == b'http://example.com/4'
# new some-prefix warc will have a new random token and start over at
# serial 00000
basenames = sorted(f.basename for f in tmpdir.listdir())
assert len(basenames) == 4
assert basenames[0].startswith('some-prefix-')
assert basenames[1].startswith('some-prefix-')
# order of these two warcs depends on random token so we don't know
# which is which
assert basenames[0][-5:] != basenames[1][-5:]
assert '-00000.' in basenames[0]
assert '-00000.' in basenames[1]
assert basenames[2].startswith('warcprox-')
assert basenames[2].endswith('-00000.warc')
assert basenames[3].startswith('warcprox-')
assert basenames[3].endswith('-00001.warc.open')
finally:
wwp.stop.set()
wwp.join()

View File

@ -1,7 +1,7 @@
"""
warcprox/__init__.py - warcprox package main file, contains some utility code
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -57,17 +57,6 @@ class Jsonner(json.JSONEncoder):
else:
return json.JSONEncoder.default(self, o)
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
'''
`concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size.
If `max_queued` is set, calls to `submit()` will block if necessary until a
free slot is available.
'''
def __init__(self, max_queued=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=max_queued or 0)
# XXX linux-specific
def gettid():
try:
@ -92,11 +81,14 @@ class RequestBlockedByRule(Exception):
class BasePostfetchProcessor(threading.Thread):
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
def __init__(self, options=Options()):
def __init__(self, options=Options(), controller=None, **kwargs):
threading.Thread.__init__(self, name=self.__class__.__name__)
self.options = options
self.controller = controller
self.stop = threading.Event()
# these should be set before thread is started
# these should be set by the caller before thread is started
self.inq = None
self.outq = None
self.profiler = None
@ -216,8 +208,8 @@ class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
raise Exception('not implemented')
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def __init__(self, listener, options=Options()):
BaseStandardPostfetchProcessor.__init__(self, options)
def __init__(self, listener, options=Options(), controller=None, **kwargs):
BaseStandardPostfetchProcessor.__init__(self, options, controller, **kwargs)
self.listener = listener
self.name = listener.__class__.__name__

View File

@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for
sending heartbeats to the service registry if configured to do so; also has
some memory profiling capabilities
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -93,15 +93,19 @@ class Factory:
return None
@staticmethod
def plugin(qualname, options):
def plugin(qualname, options, controller=None):
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
try: # new plugins take `options` argument
plugin = class_(options)
except: # backward-compatibility
plugin = class_()
try:
# new plugins take `options` and `controller` arguments
plugin = class_(options, controller)
except:
try: # medium plugins take `options` argument
plugin = class_(options)
except: # old plugins take no arguments
plugin = class_()
# check that this is either a listener or a batch processor
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
return plugin
@ -143,10 +147,6 @@ class WarcproxController(object):
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
# https://github.com/internetarchive/warcprox/wiki/benchmarking-number-of-threads
if not self.options.writer_threads:
self.options.writer_threads = 1
self.build_postfetch_chain(self.proxy.recorded_url_q)
self.service_registry = Factory.service_registry(options)
@ -233,7 +233,7 @@ class WarcproxController(object):
crawl_logger, self.options))
for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname, self.options)
plugin = Factory.plugin(qualname, self.options, self)
if hasattr(plugin, 'notify'):
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(

View File

@ -4,7 +4,7 @@
warcprox/main.py - entrypoint for warcprox executable, parses command line
arguments, initializes components, starts controller, handles signals
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -196,11 +196,6 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
help=suppress(
'turn on performance profiling; summary statistics are dumped '
'every 10 minutes and at shutdown'))
hidden.add_argument(
'--writer-threads', dest='writer_threads', type=int, default=1,
help=suppress(
'number of warc writer threads; caution, see '
'https://github.com/internetarchive/warcprox/issues/101'))
arg_parser.add_argument(
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=(

View File

@ -1,7 +1,7 @@
'''
warcprox/writer.py - warc writer, manages and writes records to warc files
Copyright (C) 2013-2017 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -29,37 +29,41 @@ import warcprox
import os
import socket
import random
import threading
try:
import queue
except ImportError:
import Queue as queue
import contextlib
class _OneWritableWarc:
class WarcWriter:
'''
Utility class used by WarcWriter
A writer for one warc prefix, which rolls over to new warc file,
incrementing serial number, when size limit is hit. Should only be used
from one thread.
'''
logger = logging.getLogger('warcprox.writer.WarcWriter')
logger = logging.getLogger('warcprox.writer._OneWritableWarc')
def __init__(self, options=warcprox.Options()):
self.options = options
self.gzip = options.gzip or False
self.record_builder = warcprox.warc.WarcRecordBuilder(
digest_algorithm=options.digest_algorithm or 'sha1',
base32=options.base32)
def __init__(self, options=warcprox.Options(), randomtoken='0'):
self.f = None
self.path = None
self.finalname = None
self.gzip = options.gzip or False
self.prefix = options.prefix or 'warcprox'
self.open_suffix = '' if options.no_warc_open_suffix else '.open'
self.randomtoken = randomtoken
self.rollover_size = options.rollover_size or 1000000000
self.rollover_idle_time = options.rollover_idle_time or None
self.directory = options.directory or './warcs'
self.filename_template = options.warc_filename or \
'{prefix}-{timestamp17}-{randomtoken}-{serialno}'
self.last_activity = time.time()
self.serial = 0
self.randomtoken = ''.join(
random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
# h3 default <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def next_filename(self, serial):
def filename(self, serial):
"""WARC filename is configurable with CLI parameter --warc-filename.
Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
Available variables are: prefix, timestamp14, timestamp17, serialno,
@ -81,13 +85,17 @@ class _OneWritableWarc:
return fname
def open(self, serial):
'''
Opens a new warc file with filename prefix `self.prefix` and serial
number `self.serial` and assigns file handle to `self.f`.
'''
if not os.path.exists(self.directory):
self.logger.info(
"warc destination directory %s doesn't exist, creating it",
self.directory)
os.mkdir(self.directory)
self.finalname = self.next_filename(serial)
self.finalname = self.filename(serial)
self.logger.trace('opening %s', self.finalname)
self.path = os.path.sep.join(
[self.directory, self.finalname + self.open_suffix])
@ -103,7 +111,54 @@ class _OneWritableWarc:
'could not lock file %s (%s)', self.path, exc)
return self.f
def ensure_open(self):
'''
Ensures `self.f` is ready to write the next warc record.
Closes current warc if size limit has been reached. Then, if warc is
not open, opens one, and writes the warcinfo record.
'''
self.maybe_size_rollover()
if not self.f:
serial = self.serial
self.serial += 1
self.open(serial)
warcinfo = self.record_builder.build_warcinfo_record(self.finalname)
self.logger.debug('warcinfo.headers=%s', warcinfo.headers)
warcinfo.write_to(self.f, gzip=self.gzip)
def write_records(self, recorded_url):
'''
Returns tuple of records written, which are instances of
`hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and
`offset` attributes.
'''
records = self.record_builder.build_warc_records(recorded_url)
self.ensure_open()
for record in records:
offset = self.f.tell()
record.write_to(self.f, gzip=self.gzip)
record.offset = offset
record.length = self.f.tell() - offset
record.warc_filename = self.finalname
self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s '
'digest=%s offset=%d warc=%s url=%s', record.type,
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(b'WARC-Payload-Digest'), record.offset,
self.path, record.get_header(warctools.WarcRecord.URL))
self.f.flush()
return records
def close(self):
'''
Closes out the active warc.
The next call to `write_records()` will write to a a new warc file with
the serial number incremented.
'''
if self.path:
self.logger.trace('closing %s', self.finalname)
if self.open_suffix == '':
@ -136,112 +191,16 @@ class _OneWritableWarc:
self.finalname, os.path.getsize(self.path))
self.close()
class WarcWriter:
logger = logging.getLogger('warcprox.writer.WarcWriter')
def __init__(self, options=warcprox.Options()):
self.options = options
self.gzip = options.gzip or False
self.record_builder = warcprox.warc.WarcRecordBuilder(
digest_algorithm=options.digest_algorithm or 'sha1',
base32=options.base32)
self._available_warcs = queue.Queue()
self._warc_count = 0
self._warc_count_lock = threading.Lock()
self._serial = 0
self._serial_lock = threading.Lock()
self._randomtoken = ''.join(
random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
def _bespeak_warc(self):
try:
return self._available_warcs.get(block=False)
except queue.Empty:
with self._warc_count_lock:
if self._warc_count < self.options.writer_threads:
self._warc_count += 1
return _OneWritableWarc(self.options, self._randomtoken)
# else we're maxed out, wait for one to free up
return self._available_warcs.get(block=True)
@contextlib.contextmanager
def _warc(self):
warc = self._bespeak_warc()
warc.maybe_size_rollover()
# lazy file open
if warc.f == None:
with self._serial_lock:
serial = self._serial
self._serial += 1
warc.open(serial)
warcinfo = self.record_builder.build_warcinfo_record(warc.finalname)
self.logger.debug('warcinfo.headers=%s', warcinfo.headers)
warcinfo.write_to(warc.f, gzip=self.gzip)
yield warc
# __exit__()
warc.f.flush()
warc.last_activity = time.time()
self._available_warcs.put(warc)
def write_records(self, recorded_url):
"""Returns tuple of records written, which are instances of
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
"offset" attributes."""
records = self.record_builder.build_warc_records(recorded_url)
with self._warc() as warc:
for record in records:
offset = warc.f.tell()
record.write_to(warc.f, gzip=self.gzip)
record.offset = offset
record.length = warc.f.tell() - offset
record.warc_filename = warc.finalname
self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s '
'digest=%s offset=%d warc=%s url=%s',
record.type,
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(b'WARC-Payload-Digest'),
record.offset, warc.path,
record.get_header(warctools.WarcRecord.URL))
return records
def maybe_idle_rollover(self):
warcs = []
while True:
try:
warc = self._available_warcs.get(block=False)
warcs.append(warc)
except queue.Empty:
break
for warc in warcs:
warc.maybe_idle_rollover()
self._available_warcs.put(warc)
def close_writer(self):
while self._warc_count > 0:
with self._warc_count_lock:
warc = self._available_warcs.get()
warc.close()
self._warc_count -= 1
class WarcWriterPool:
'''
A `WarcWriter` per warc prefix. Should only be used from one thread.
'''
logger = logging.getLogger("warcprox.writer.WarcWriterPool")
def __init__(self, options=warcprox.Options()):
self.default_warc_writer = WarcWriter(options)
self.warc_writers = {} # {prefix:WarcWriter}
self.options = options
self._lock = threading.RLock()
self._last_maybe = time.time()
# chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
@ -251,16 +210,17 @@ class WarcWriterPool:
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
options = warcprox.Options(**vars(self.options))
options.prefix = recorded_url.warcprox_meta["warc-prefix"]
with self._lock:
if not options.prefix in self.warc_writers:
self.warc_writers[options.prefix] = WarcWriter(options)
w = self.warc_writers[options.prefix]
if not options.prefix in self.warc_writers:
self.warc_writers[options.prefix] = WarcWriter(options)
w = self.warc_writers[options.prefix]
return w
def write_records(self, recorded_url):
"""Returns tuple of records written, which are instances of
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
"offset" attributes."""
'''
Returns tuple of records written, which are instances of
`hanzo.warctools.warc.WarcRecord`, decorated with `warc_filename` and
`offset` attributes.
'''
return self._writer(recorded_url).write_records(recorded_url)
def maybe_idle_rollover(self):
@ -271,7 +231,20 @@ class WarcWriterPool:
self._last_maybe = time.time()
def close_writers(self):
self.default_warc_writer.close_writer()
for w in self.warc_writers.values():
w.close_writer()
self.default_warc_writer.close()
for prefix, writer in list(self.warc_writers.items()):
del self.warc_writers[prefix]
writer.close()
def close_for_prefix(self, prefix=None):
'''
Close warc writer for the given warc prefix, or the default prefix if
`prefix` is `None`.
'''
if prefix and prefix in self.warc_writers:
writer = self.warc_writers[prefix]
del self.warc_writers[prefix]
writer.close()
else:
self.default_warc_writer.close()

View File

@ -2,7 +2,7 @@
warcprox/writerthread.py - warc writer thread, reads from the recorded url
queue, writes warc records, runs final tasks after warc records are written
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2019 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -33,6 +33,10 @@ import warcprox
from concurrent import futures
from datetime import datetime
import threading
try:
import queue
except ImportError:
import Queue as queue
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor")
@ -43,45 +47,28 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
# set max_queued small, because self.inq is already handling queueing
self.thread_local = threading.local()
self.thread_profilers = {}
# for us; but give it a little breathing room to make sure it can keep
# worker threads busy
self.pool = warcprox.ThreadPoolExecutor(
max_workers=options.writer_threads or 1,
max_queued=10 * (options.writer_threads or 1))
self.batch = set()
self.blackout_period = options.blackout_period or 0
def _startup(self):
self.logger.info('%s warc writer threads', self.pool._max_workers)
warcprox.BaseStandardPostfetchProcessor._startup(self)
self.close_prefix_reqs = queue.Queue()
def _get_process_put(self):
try:
recorded_url = self.inq.get(block=True, timeout=0.5)
self.batch.add(recorded_url)
self.pool.submit(self._wrap_process_url, recorded_url)
finally:
self.writer_pool.maybe_idle_rollover()
while True:
try:
prefix = self.close_prefix_reqs.get_nowait()
self.writer_pool.close_for_prefix(prefix)
except queue.Empty:
break
super()._get_process_put()
def _wrap_process_url(self, recorded_url):
if not getattr(self.thread_local, 'name_set', False):
threading.current_thread().name = 'WarcWriterThread(tid=%s)' % warcprox.gettid()
self.thread_local.name_set = True
if self.options.profile:
import cProfile
if not hasattr(self.thread_local, 'profiler'):
self.thread_local.profiler = cProfile.Profile()
tid = threading.current_thread().ident
self.thread_profilers[tid] = self.thread_local.profiler
self.thread_local.profiler.enable()
self._process_url(recorded_url)
self.thread_local.profiler.disable()
else:
self._process_url(recorded_url)
def close_for_prefix(self, prefix=None):
'''
Request close of warc writer for the given warc prefix, or the default
prefix if `prefix` is `None`.
This API exists so that some code from outside of warcprox proper (in a
third-party plugin for example) can close open warcs promptly when it
knows they are finished.
'''
self.close_prefix_reqs.put(prefix)
def _process_url(self, recorded_url):
try:
@ -97,10 +84,6 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
logging.error(
'caught exception processing %s', recorded_url.url,
exc_info=True)
finally:
self.batch.remove(recorded_url)
if self.outq:
self.outq.put(recorded_url)
def _filter_accepts(self, recorded_url):
if not self.method_filter: