multiple warc writer threads (hacked in with little thought to code organization)

This commit is contained in:
Noah Levitt 2017-05-19 16:10:44 -07:00
parent 515dd84aed
commit ef5dd2e4ae
7 changed files with 195 additions and 141 deletions

View File

@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.1b1.dev83',
version='2.1b1.dev84',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -375,15 +375,18 @@ def warcprox_(request, captures_db, dedup_db, stats_db, service_registry):
options.method_filter = ['GET','POST']
writer_pool = warcprox.writer.WarcWriterPool(options)
warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=[
captures_db or dedup_db, playback_index_db, stats_db],
options=options)
warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=[
captures_db or dedup_db, playback_index_db, stats_db],
options=options)
for i in range(int(proxy.max_threads ** 0.5))]
warcprox_ = warcprox.controller.WarcproxController(proxy=proxy,
warc_writer_thread=warc_writer_thread, playback_proxy=playback_proxy,
service_registry=service_registry, options=options)
warcprox_ = warcprox.controller.WarcproxController(
proxy=proxy, warc_writer_threads=warc_writer_threads,
playback_proxy=playback_proxy, service_registry=service_registry,
options=options)
logging.info('starting warcprox')
warcprox_thread = threading.Thread(name='WarcproxThread',
target=warcprox_.run_until_shutdown)
@ -503,7 +506,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.content == b'404 Not in Archive\n'
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup is None
# archive
@ -520,13 +524,14 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -545,12 +550,13 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id
assert dedup_lookup['date'] == dedup_date
@ -573,7 +579,8 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.content == b'404 Not in Archive\n'
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup is None
# archive
@ -590,13 +597,14 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup['url'] == url.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -615,12 +623,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id
assert dedup_lookup['date'] == dedup_date
@ -645,7 +654,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
@ -657,7 +666,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(2.5)
@ -682,12 +691,13 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check url1 in dedup db bucket_a
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -695,7 +705,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup is None
# archive url2 bucket_b
@ -707,12 +718,13 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check url2 in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup['url'] == url2.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -735,15 +747,15 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# close the warc
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer()
assert os.path.exists(warc_path)
# read the warc
@ -928,7 +940,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
@ -943,7 +955,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -970,7 +982,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -985,7 +997,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1053,7 +1065,7 @@ def test_domain_data_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1069,7 +1081,7 @@ def test_domain_data_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1085,7 +1097,7 @@ def test_domain_data_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1219,7 +1231,7 @@ def test_dedup_ok_flag(
url = 'http://localhost:{}/z/b'.format(http_daemon.server_port)
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup is None
@ -1234,12 +1246,12 @@ def test_dedup_ok_flag(
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check that dedup db doesn't give us anything for this
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup is None
@ -1255,18 +1267,18 @@ def test_dedup_ok_flag(
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5)
while not warcprox_.warc_writer_thread.idle:
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check that dedup db gives us something for this
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup
# inspect what's in rethinkdb more closely
rethink_captures = warcprox_.warc_writer_thread.dedup_db.captures_db
rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db
results_iter = rethink_captures.rr.table(rethink_captures.table).get_all(
['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response',
'test_dedup_ok_flag'], index='sha1_warc_type').order_by(
@ -1336,7 +1348,6 @@ def test_controller_with_defaults():
assert not controller.proxy_thread
assert not controller.playback_proxy
assert not controller.playback_proxy_thread
assert controller.warc_writer_thread
assert controller.proxy.RequestHandlerClass == warcprox.warcproxy.WarcProxyHandler
assert controller.proxy.ca
assert controller.proxy.digest_algorithm == 'sha1'
@ -1344,18 +1355,20 @@ def test_controller_with_defaults():
assert controller.proxy.recorded_url_q
assert controller.proxy.server_address == ('127.0.0.1', 8000)
assert controller.proxy.server_port == 8000
assert controller.warc_writer_thread.recorded_url_q
assert controller.warc_writer_thread.recorded_url_q is controller.proxy.recorded_url_q
assert controller.warc_writer_thread.writer_pool
assert controller.warc_writer_thread.writer_pool.default_warc_writer
assert controller.warc_writer_thread.writer_pool.default_warc_writer.directory == './warcs'
assert controller.warc_writer_thread.writer_pool.default_warc_writer.rollover_idle_time is None
assert controller.warc_writer_thread.writer_pool.default_warc_writer.rollover_size == 1000000000
assert controller.warc_writer_thread.writer_pool.default_warc_writer.prefix == 'warcprox'
assert controller.warc_writer_thread.writer_pool.default_warc_writer.gzip is False
assert controller.warc_writer_thread.writer_pool.default_warc_writer.record_builder
assert not controller.warc_writer_thread.writer_pool.default_warc_writer.record_builder.base32
assert controller.warc_writer_thread.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
for wwt in controller.warc_writer_threads:
assert wwt
assert wwt.recorded_url_q
assert wwt.recorded_url_q is controller.proxy.recorded_url_q
assert wwt.writer_pool
assert wwt.writer_pool.default_warc_writer
assert wwt.writer_pool.default_warc_writer.directory == './warcs'
assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None
assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000
assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox'
assert wwt.writer_pool.default_warc_writer.gzip is False
assert wwt.writer_pool.default_warc_writer.record_builder
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
def test_choose_a_port_for_me(service_registry):
options = warcprox.Options()
@ -1402,7 +1415,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
assert response.status_code == 200
assert not 'via' in playback_response
warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath
warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath
with open(warc, 'rb') as f:
for record in warcio.archiveiterator.ArchiveIterator(f):
if record.rec_headers.get_header('warc-target-uri') == url:

View File

@ -37,15 +37,15 @@ class WarcproxController(object):
HEARTBEAT_INTERVAL = 20.0
def __init__(self, proxy=None, warc_writer_thread=None,
playback_proxy=None, service_registry=None,
options=warcprox.Options()):
def __init__(
self, proxy=None, warc_writer_threads=None, playback_proxy=None,
service_registry=None, options=warcprox.Options()):
"""
Create warcprox controller.
If supplied, proxy should be an instance of WarcProxy, and
warc_writer_thread should be an instance of WarcWriterThread. If not
supplied, they are created with default values.
If supplied, `proxy` should be an instance of WarcProxy, and
`warc_writer_threads` should be an list of WarcWriterThread instances.
If not supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If
not supplied, no playback proxy will run.
@ -55,11 +55,15 @@ class WarcproxController(object):
else:
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
if warc_writer_thread is not None:
self.warc_writer_thread = warc_writer_thread
if warc_writer_threads is not None:
self.warc_writer_threads = warc_writer_threads
else:
self.warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=self.proxy.recorded_url_q)
self.warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i,
recorded_url_q=self.proxy.recorded_url_q,
options=options)
for i in range(int(self.proxy.max_threads ** 0.5))]
self.proxy_thread = None
self.playback_proxy_thread = None
@ -169,9 +173,14 @@ class WarcproxController(object):
target=self.proxy.serve_forever, name='ProxyThread')
self.proxy_thread.start()
if self.warc_writer_thread.dedup_db:
self.warc_writer_thread.dedup_db.start()
self.warc_writer_thread.start()
assert(all(
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
for wwt in self.warc_writer_threads))
if self.warc_writer_threads[0].dedup_db:
self.warc_writer_threads[0].dedup_db.start()
for wwt in self.warc_writer_threads:
wwt.start()
if self.playback_proxy is not None:
self.playback_proxy_thread = threading.Thread(
@ -185,7 +194,8 @@ class WarcproxController(object):
self.logger.info('warcprox is not running')
return
self.warc_writer_thread.stop.set()
for wwt in self.warc_writer_threads:
wwt.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
@ -196,12 +206,13 @@ class WarcproxController(object):
self.playback_proxy.playback_index_db.close()
# wait for threads to finish
self.warc_writer_thread.join()
for wwt in self.warc_writer_threads:
wwt.join()
if self.proxy.stats_db:
self.proxy.stats_db.stop()
if self.warc_writer_thread.dedup_db:
self.warc_writer_thread.dedup_db.close()
if self.warc_writer_threads[0].dedup_db:
self.warc_writer_threads[0].dedup_db.close()
self.proxy_thread.join()
if self.playback_proxy is not None:

View File

@ -25,6 +25,7 @@ import datetime
import json
import logging
from hanzo import warctools
import threading
class CaptureFeed:
logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed')
@ -34,20 +35,22 @@ class CaptureFeed:
self.topic = topic
self.__producer = None
self._connection_exception = None
self._lock = threading.RLock()
def _producer(self):
if not self.__producer:
try:
# acks=0 to avoid ever blocking
self.__producer = kafka.KafkaProducer(
bootstrap_servers=self.broker_list, acks=0)
if self._connection_exception:
logging.info('connected to kafka successfully!')
self._connection_exception = None
except Exception as e:
if not self._connection_exception:
self._connection_exception = e
logging.error('problem connecting to kafka', exc_info=True)
with self._lock:
if not self.__producer:
try:
# acks=0 to avoid ever blocking
self.__producer = kafka.KafkaProducer(
bootstrap_servers=self.broker_list, acks=0)
if self._connection_exception:
logging.info('connected to kafka successfully!')
self._connection_exception = None
except Exception as e:
if not self._connection_exception:
self._connection_exception = e
logging.error('problem connecting to kafka', exc_info=1)
return self.__producer

View File

@ -50,7 +50,7 @@ class BetterArgumentDefaultsHelpFormatter(
HelpFormatter with these properties:
- formats option help like argparse.ArgumentDefaultsHelpFormatter except
that it - omits the default value for arguments with action='store_const'
that it omits the default value for arguments with action='store_const'
- like argparse.RawDescriptionHelpFormatter, does not reformat description
string
'''
@ -219,18 +219,24 @@ def init_controller(args):
playback_proxy = None
writer_pool = warcprox.writer.WarcWriterPool(options=options)
warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=listeners, options=options)
# number of warc writer threads = sqrt(proxy.max_threads)
# I came up with this out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
writer_pool=writer_pool, dedup_db=dedup_db,
listeners=listeners, options=options)
for i in range(int(proxy.max_threads ** 0.5))]
if args.rethinkdb_servers:
svcreg = doublethink.ServiceRegistry(rr)
else:
svcreg = None
controller = warcprox.controller.WarcproxController(proxy,
warc_writer_thread, playback_proxy, service_registry=svcreg,
options=options)
controller = warcprox.controller.WarcproxController(
proxy, warc_writer_threads, playback_proxy,
service_registry=svcreg, options=options)
return controller

View File

@ -30,6 +30,7 @@ import os
import socket
import string
import random
import threading
class WarcWriter:
logger = logging.getLogger('warcprox.writer.WarcWriter')
@ -43,7 +44,8 @@ class WarcWriter:
self.gzip = options.gzip or False
digest_algorithm = options.digest_algorithm or 'sha1'
base32 = options.base32
self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32)
self.record_builder = warcprox.warc.WarcRecordBuilder(
digest_algorithm=digest_algorithm, base32=base32)
# warc path and filename stuff
self.directory = options.directory or './warcs'
@ -53,6 +55,7 @@ class WarcWriter:
self._fpath = None
self._f_finalname = None
self._serial = 0
self._lock = threading.RLock()
self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8))
@ -65,33 +68,41 @@ class WarcWriter:
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
def close_writer(self):
if self._fpath:
self.logger.info('closing {0}'.format(self._f_finalname))
self._f.close()
finalpath = os.path.sep.join([self.directory, self._f_finalname])
os.rename(self._fpath, finalpath)
with self._lock:
if self._fpath:
self.logger.info('closing %s', self._f_finalname)
self._f.close()
finalpath = os.path.sep.join(
[self.directory, self._f_finalname])
os.rename(self._fpath, finalpath)
self._fpath = None
self._f = None
self._fpath = None
self._f = None
# h3 default <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
# ${prefix}-${timestamp17}-${randomtoken}-${serialno}.warc.gz"
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
self.close_writer()
with self._lock:
if self._fpath and os.path.getsize(
self._fpath) > self.rollover_size:
self.close_writer()
if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open'])
if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial,
self._randomtoken, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([
self.directory, self._f_finalname + '.open'])
self._f = open(self._fpath, 'wb')
self._f = open(self._fpath, 'wb')
warcinfo_record = self.record_builder.build_warcinfo_record(self._f_finalname)
self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers))
warcinfo_record.write_to(self._f, gzip=self.gzip)
warcinfo_record = self.record_builder.build_warcinfo_record(
self._f_finalname)
self.logger.debug(
'warcinfo_record.headers=%s', warcinfo_record.headers)
warcinfo_record.write_to(self._f, gzip=self.gzip)
self._serial += 1
self._serial += 1
return self._f
@ -101,33 +112,39 @@ class WarcWriter:
"offset" attributes."""
records = self.record_builder.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
with self._lock:
writer = self._writer()
recordset_offset = writer.tell()
for record in records:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
record.offset = offset
record.length = writer.tell() - offset
record.warc_filename = self._f_finalname
self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d',
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, record.offset)
for record in records:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
record.offset = offset
record.length = writer.tell() - offset
record.warc_filename = self._f_finalname
self.logger.debug(
'wrote warc record: warc_type=%s content_length=%s '
'url=%s warc=%s offset=%d',
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, record.offset)
self._f.flush()
self._last_activity = time.time()
self._f.flush()
self._last_activity = time.time()
return records
def maybe_idle_rollover(self):
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity))
self.close_writer()
with self._lock:
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.info(
'rolling over %s after %s seconds idle',
self._f_finalname, time.time() - self._last_activity)
self.close_writer()
class WarcWriterPool:
logger = logging.getLogger("warcprox.writer.WarcWriterPool")
@ -137,6 +154,7 @@ class WarcWriterPool:
self.warc_writers = {} # {prefix:WarcWriter}
self._last_sync = time.time()
self.options = options
self._lock = threading.RLock()
# chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
def _writer(self, recorded_url):
@ -145,9 +163,11 @@ class WarcWriterPool:
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
options = warcprox.Options(**vars(self.options))
options.prefix = recorded_url.warcprox_meta["warc-prefix"]
if not options.prefix in self.warc_writers:
self.warc_writers[options.prefix] = WarcWriter(options=options)
w = self.warc_writers[options.prefix]
with self._lock:
if not options.prefix in self.warc_writers:
self.warc_writers[options.prefix] = WarcWriter(
options=options)
w = self.warc_writers[options.prefix]
return w
def write_records(self, recorded_url):

View File

@ -45,10 +45,11 @@ class WarcWriterThread(threading.Thread):
logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread")
def __init__(
self, recorded_url_q=None, writer_pool=None, dedup_db=None,
listeners=None, options=warcprox.Options()):
self, name='WarcWriterThread', recorded_url_q=None,
writer_pool=None, dedup_db=None, listeners=None,
options=warcprox.Options()):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name='WarcWriterThread')
threading.Thread.__init__(self, name=name)
self.recorded_url_q = recorded_url_q
self.stop = threading.Event()
if writer_pool:
@ -75,9 +76,9 @@ class WarcWriterThread(threading.Thread):
return meth in self._ALWAYS_ACCEPT or meth in self.method_filter
def _run(self):
self.name = '%s(tid=%s)'% (self.name, warcprox.gettid())
while not self.stop.is_set():
try:
self.name = 'WarcWriterThread(tid={})'.format(warcprox.gettid())
while True:
try:
if self.stop.is_set():
@ -105,7 +106,7 @@ class WarcWriterThread(threading.Thread):
self.logger.info('WarcWriterThread shutting down')
self.writer_pool.close_writers()
except BaseException as e:
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(