From ef5dd2e4ae69085d1c862df6a5416cd900de238f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 19 May 2017 16:10:44 -0700 Subject: [PATCH] multiple warc writer threads (hacked in with little thought to code organization) --- setup.py | 2 +- tests/test_warcprox.py | 125 +++++++++++++++++++++------------------ warcprox/controller.py | 45 ++++++++------ warcprox/kafkafeed.py | 27 +++++---- warcprox/main.py | 20 ++++--- warcprox/writer.py | 106 +++++++++++++++++++-------------- warcprox/writerthread.py | 11 ++-- 7 files changed, 195 insertions(+), 141 deletions(-) diff --git a/setup.py b/setup.py index 9f0c6d7..5ce3203 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev83', + version='2.1b1.dev84', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 77cdaaf..696cfaa 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -375,15 +375,18 @@ def warcprox_(request, captures_db, dedup_db, stats_db, service_registry): options.method_filter = ['GET','POST'] writer_pool = warcprox.writer.WarcWriterPool(options) - warc_writer_thread = warcprox.writerthread.WarcWriterThread( - recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, listeners=[ - captures_db or dedup_db, playback_index_db, stats_db], - options=options) + warc_writer_threads = [ + warcprox.writerthread.WarcWriterThread( + recorded_url_q=recorded_url_q, writer_pool=writer_pool, + dedup_db=dedup_db, listeners=[ + captures_db or dedup_db, playback_index_db, stats_db], + options=options) + for i in range(int(proxy.max_threads ** 0.5))] - warcprox_ = warcprox.controller.WarcproxController(proxy=proxy, - warc_writer_thread=warc_writer_thread, playback_proxy=playback_proxy, - service_registry=service_registry, options=options) + warcprox_ = warcprox.controller.WarcproxController( + proxy=proxy, warc_writer_threads=warc_writer_threads, + playback_proxy=playback_proxy, service_registry=service_registry, + options=options) logging.info('starting warcprox') warcprox_thread = threading.Thread(name='WarcproxThread', target=warcprox_.run_until_shutdown) @@ -503,7 +506,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'404 Not in Archive\n' # check not in dedup db - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup is None # archive @@ -520,13 +524,14 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) @@ -545,12 +550,13 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check in dedup db (no change from prev) - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date @@ -573,7 +579,8 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.content == b'404 Not in Archive\n' # check not in dedup db - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup is None # archive @@ -590,13 +597,14 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) @@ -615,12 +623,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check in dedup db (no change from prev) - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date @@ -645,7 +654,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) @@ -657,7 +666,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(2.5) @@ -682,12 +691,13 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check url1 in dedup db bucket_a - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") assert dedup_lookup['url'] == url1.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) @@ -695,7 +705,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup is None # archive url2 bucket_b @@ -707,12 +718,13 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check url2 in dedup db bucket_b - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) @@ -735,15 +747,15 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # close the warc - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] + assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() + warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -928,7 +940,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) @@ -943,7 +955,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -970,7 +982,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -985,7 +997,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1053,7 +1065,7 @@ def test_domain_data_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1069,7 +1081,7 @@ def test_domain_data_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1085,7 +1097,7 @@ def test_domain_data_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1219,7 +1231,7 @@ def test_dedup_ok_flag( url = 'http://localhost:{}/z/b'.format(http_daemon.server_port) # check not in dedup db - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup( + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup is None @@ -1234,12 +1246,12 @@ def test_dedup_ok_flag( assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check that dedup db doesn't give us anything for this - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup( + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup is None @@ -1255,18 +1267,18 @@ def test_dedup_ok_flag( assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' time.sleep(0.5) - while not warcprox_.warc_writer_thread.idle: + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): time.sleep(0.5) time.sleep(0.5) # check that dedup db gives us something for this - dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup( + dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup # inspect what's in rethinkdb more closely - rethink_captures = warcprox_.warc_writer_thread.dedup_db.captures_db + rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db results_iter = rethink_captures.rr.table(rethink_captures.table).get_all( ['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response', 'test_dedup_ok_flag'], index='sha1_warc_type').order_by( @@ -1336,7 +1348,6 @@ def test_controller_with_defaults(): assert not controller.proxy_thread assert not controller.playback_proxy assert not controller.playback_proxy_thread - assert controller.warc_writer_thread assert controller.proxy.RequestHandlerClass == warcprox.warcproxy.WarcProxyHandler assert controller.proxy.ca assert controller.proxy.digest_algorithm == 'sha1' @@ -1344,18 +1355,20 @@ def test_controller_with_defaults(): assert controller.proxy.recorded_url_q assert controller.proxy.server_address == ('127.0.0.1', 8000) assert controller.proxy.server_port == 8000 - assert controller.warc_writer_thread.recorded_url_q - assert controller.warc_writer_thread.recorded_url_q is controller.proxy.recorded_url_q - assert controller.warc_writer_thread.writer_pool - assert controller.warc_writer_thread.writer_pool.default_warc_writer - assert controller.warc_writer_thread.writer_pool.default_warc_writer.directory == './warcs' - assert controller.warc_writer_thread.writer_pool.default_warc_writer.rollover_idle_time is None - assert controller.warc_writer_thread.writer_pool.default_warc_writer.rollover_size == 1000000000 - assert controller.warc_writer_thread.writer_pool.default_warc_writer.prefix == 'warcprox' - assert controller.warc_writer_thread.writer_pool.default_warc_writer.gzip is False - assert controller.warc_writer_thread.writer_pool.default_warc_writer.record_builder - assert not controller.warc_writer_thread.writer_pool.default_warc_writer.record_builder.base32 - assert controller.warc_writer_thread.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + for wwt in controller.warc_writer_threads: + assert wwt + assert wwt.recorded_url_q + assert wwt.recorded_url_q is controller.proxy.recorded_url_q + assert wwt.writer_pool + assert wwt.writer_pool.default_warc_writer + assert wwt.writer_pool.default_warc_writer.directory == './warcs' + assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None + assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 + assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' + assert wwt.writer_pool.default_warc_writer.gzip is False + assert wwt.writer_pool.default_warc_writer.record_builder + assert not wwt.writer_pool.default_warc_writer.record_builder.base32 + assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_choose_a_port_for_me(service_registry): options = warcprox.Options() @@ -1402,7 +1415,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath + warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: diff --git a/warcprox/controller.py b/warcprox/controller.py index a11f0a6..b0882e3 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -37,15 +37,15 @@ class WarcproxController(object): HEARTBEAT_INTERVAL = 20.0 - def __init__(self, proxy=None, warc_writer_thread=None, - playback_proxy=None, service_registry=None, - options=warcprox.Options()): + def __init__( + self, proxy=None, warc_writer_threads=None, playback_proxy=None, + service_registry=None, options=warcprox.Options()): """ Create warcprox controller. - If supplied, proxy should be an instance of WarcProxy, and - warc_writer_thread should be an instance of WarcWriterThread. If not - supplied, they are created with default values. + If supplied, `proxy` should be an instance of WarcProxy, and + `warc_writer_threads` should be an list of WarcWriterThread instances. + If not supplied, they are created with default values. If supplied, playback_proxy should be an instance of PlaybackProxy. If not supplied, no playback proxy will run. @@ -55,11 +55,15 @@ class WarcproxController(object): else: self.proxy = warcprox.warcproxy.WarcProxy(options=options) - if warc_writer_thread is not None: - self.warc_writer_thread = warc_writer_thread + if warc_writer_threads is not None: + self.warc_writer_threads = warc_writer_threads else: - self.warc_writer_thread = warcprox.writerthread.WarcWriterThread( - recorded_url_q=self.proxy.recorded_url_q) + self.warc_writer_threads = [ + warcprox.writerthread.WarcWriterThread( + name='WarcWriterThread%03d' % i, + recorded_url_q=self.proxy.recorded_url_q, + options=options) + for i in range(int(self.proxy.max_threads ** 0.5))] self.proxy_thread = None self.playback_proxy_thread = None @@ -169,9 +173,14 @@ class WarcproxController(object): target=self.proxy.serve_forever, name='ProxyThread') self.proxy_thread.start() - if self.warc_writer_thread.dedup_db: - self.warc_writer_thread.dedup_db.start() - self.warc_writer_thread.start() + assert(all( + wwt.dedup_db is self.warc_writer_threads[0].dedup_db + for wwt in self.warc_writer_threads)) + if self.warc_writer_threads[0].dedup_db: + self.warc_writer_threads[0].dedup_db.start() + + for wwt in self.warc_writer_threads: + wwt.start() if self.playback_proxy is not None: self.playback_proxy_thread = threading.Thread( @@ -185,7 +194,8 @@ class WarcproxController(object): self.logger.info('warcprox is not running') return - self.warc_writer_thread.stop.set() + for wwt in self.warc_writer_threads: + wwt.stop.set() self.proxy.shutdown() self.proxy.server_close() @@ -196,12 +206,13 @@ class WarcproxController(object): self.playback_proxy.playback_index_db.close() # wait for threads to finish - self.warc_writer_thread.join() + for wwt in self.warc_writer_threads: + wwt.join() if self.proxy.stats_db: self.proxy.stats_db.stop() - if self.warc_writer_thread.dedup_db: - self.warc_writer_thread.dedup_db.close() + if self.warc_writer_threads[0].dedup_db: + self.warc_writer_threads[0].dedup_db.close() self.proxy_thread.join() if self.playback_proxy is not None: diff --git a/warcprox/kafkafeed.py b/warcprox/kafkafeed.py index e17d2c4..612101b 100644 --- a/warcprox/kafkafeed.py +++ b/warcprox/kafkafeed.py @@ -25,6 +25,7 @@ import datetime import json import logging from hanzo import warctools +import threading class CaptureFeed: logger = logging.getLogger('warcprox.kafkafeed.CaptureFeed') @@ -34,20 +35,22 @@ class CaptureFeed: self.topic = topic self.__producer = None self._connection_exception = None + self._lock = threading.RLock() def _producer(self): - if not self.__producer: - try: - # acks=0 to avoid ever blocking - self.__producer = kafka.KafkaProducer( - bootstrap_servers=self.broker_list, acks=0) - if self._connection_exception: - logging.info('connected to kafka successfully!') - self._connection_exception = None - except Exception as e: - if not self._connection_exception: - self._connection_exception = e - logging.error('problem connecting to kafka', exc_info=True) + with self._lock: + if not self.__producer: + try: + # acks=0 to avoid ever blocking + self.__producer = kafka.KafkaProducer( + bootstrap_servers=self.broker_list, acks=0) + if self._connection_exception: + logging.info('connected to kafka successfully!') + self._connection_exception = None + except Exception as e: + if not self._connection_exception: + self._connection_exception = e + logging.error('problem connecting to kafka', exc_info=1) return self.__producer diff --git a/warcprox/main.py b/warcprox/main.py index 60bf407..efba06d 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -50,7 +50,7 @@ class BetterArgumentDefaultsHelpFormatter( HelpFormatter with these properties: - formats option help like argparse.ArgumentDefaultsHelpFormatter except - that it - omits the default value for arguments with action='store_const' + that it omits the default value for arguments with action='store_const' - like argparse.RawDescriptionHelpFormatter, does not reformat description string ''' @@ -219,18 +219,24 @@ def init_controller(args): playback_proxy = None writer_pool = warcprox.writer.WarcWriterPool(options=options) - warc_writer_thread = warcprox.writerthread.WarcWriterThread( - recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, listeners=listeners, options=options) + # number of warc writer threads = sqrt(proxy.max_threads) + # I came up with this out of thin air because it strikes me as reasonable + # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + warc_writer_threads = [ + warcprox.writerthread.WarcWriterThread( + name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q, + writer_pool=writer_pool, dedup_db=dedup_db, + listeners=listeners, options=options) + for i in range(int(proxy.max_threads ** 0.5))] if args.rethinkdb_servers: svcreg = doublethink.ServiceRegistry(rr) else: svcreg = None - controller = warcprox.controller.WarcproxController(proxy, - warc_writer_thread, playback_proxy, service_registry=svcreg, - options=options) + controller = warcprox.controller.WarcproxController( + proxy, warc_writer_threads, playback_proxy, + service_registry=svcreg, options=options) return controller diff --git a/warcprox/writer.py b/warcprox/writer.py index 72c292f..6913560 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -30,6 +30,7 @@ import os import socket import string import random +import threading class WarcWriter: logger = logging.getLogger('warcprox.writer.WarcWriter') @@ -43,7 +44,8 @@ class WarcWriter: self.gzip = options.gzip or False digest_algorithm = options.digest_algorithm or 'sha1' base32 = options.base32 - self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32) + self.record_builder = warcprox.warc.WarcRecordBuilder( + digest_algorithm=digest_algorithm, base32=base32) # warc path and filename stuff self.directory = options.directory or './warcs' @@ -53,6 +55,7 @@ class WarcWriter: self._fpath = None self._f_finalname = None self._serial = 0 + self._lock = threading.RLock() self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8)) @@ -65,33 +68,41 @@ class WarcWriter: return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) def close_writer(self): - if self._fpath: - self.logger.info('closing {0}'.format(self._f_finalname)) - self._f.close() - finalpath = os.path.sep.join([self.directory, self._f_finalname]) - os.rename(self._fpath, finalpath) + with self._lock: + if self._fpath: + self.logger.info('closing %s', self._f_finalname) + self._f.close() + finalpath = os.path.sep.join( + [self.directory, self._f_finalname]) + os.rename(self._fpath, finalpath) - self._fpath = None - self._f = None + self._fpath = None + self._f = None # h3 default # ${prefix}-${timestamp17}-${randomtoken}-${serialno}.warc.gz" def _writer(self): - if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: - self.close_writer() + with self._lock: + if self._fpath and os.path.getsize( + self._fpath) > self.rollover_size: + self.close_writer() - if self._f == None: - self._f_finalname = '{}-{}-{:05d}-{}.warc{}'.format( - self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') - self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open']) + if self._f == None: + self._f_finalname = '{}-{}-{:05d}-{}.warc{}'.format( + self.prefix, self.timestamp17(), self._serial, + self._randomtoken, '.gz' if self.gzip else '') + self._fpath = os.path.sep.join([ + self.directory, self._f_finalname + '.open']) - self._f = open(self._fpath, 'wb') + self._f = open(self._fpath, 'wb') - warcinfo_record = self.record_builder.build_warcinfo_record(self._f_finalname) - self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers)) - warcinfo_record.write_to(self._f, gzip=self.gzip) + warcinfo_record = self.record_builder.build_warcinfo_record( + self._f_finalname) + self.logger.debug( + 'warcinfo_record.headers=%s', warcinfo_record.headers) + warcinfo_record.write_to(self._f, gzip=self.gzip) - self._serial += 1 + self._serial += 1 return self._f @@ -101,33 +112,39 @@ class WarcWriter: "offset" attributes.""" records = self.record_builder.build_warc_records(recorded_url) - writer = self._writer() - recordset_offset = writer.tell() + with self._lock: + writer = self._writer() + recordset_offset = writer.tell() - for record in records: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) - record.offset = offset - record.length = writer.tell() - offset - record.warc_filename = self._f_finalname - self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d', - record.get_header(warctools.WarcRecord.TYPE), - record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - self._fpath, record.offset) + for record in records: + offset = writer.tell() + record.write_to(writer, gzip=self.gzip) + record.offset = offset + record.length = writer.tell() - offset + record.warc_filename = self._f_finalname + self.logger.debug( + 'wrote warc record: warc_type=%s content_length=%s ' + 'url=%s warc=%s offset=%d', + record.get_header(warctools.WarcRecord.TYPE), + record.get_header(warctools.WarcRecord.CONTENT_LENGTH), + record.get_header(warctools.WarcRecord.URL), + self._fpath, record.offset) - self._f.flush() - self._last_activity = time.time() + self._f.flush() + self._last_activity = time.time() return records def maybe_idle_rollover(self): - if (self._fpath is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity)) - self.close_writer() + with self._lock: + if (self._fpath is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + self.logger.info( + 'rolling over %s after %s seconds idle', + self._f_finalname, time.time() - self._last_activity) + self.close_writer() class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") @@ -137,6 +154,7 @@ class WarcWriterPool: self.warc_writers = {} # {prefix:WarcWriter} self._last_sync = time.time() self.options = options + self._lock = threading.RLock() # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set def _writer(self, recorded_url): @@ -145,9 +163,11 @@ class WarcWriterPool: # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) options = warcprox.Options(**vars(self.options)) options.prefix = recorded_url.warcprox_meta["warc-prefix"] - if not options.prefix in self.warc_writers: - self.warc_writers[options.prefix] = WarcWriter(options=options) - w = self.warc_writers[options.prefix] + with self._lock: + if not options.prefix in self.warc_writers: + self.warc_writers[options.prefix] = WarcWriter( + options=options) + w = self.warc_writers[options.prefix] return w def write_records(self, recorded_url): diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index a845e37..f480251 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -45,10 +45,11 @@ class WarcWriterThread(threading.Thread): logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") def __init__( - self, recorded_url_q=None, writer_pool=None, dedup_db=None, - listeners=None, options=warcprox.Options()): + self, name='WarcWriterThread', recorded_url_q=None, + writer_pool=None, dedup_db=None, listeners=None, + options=warcprox.Options()): """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" - threading.Thread.__init__(self, name='WarcWriterThread') + threading.Thread.__init__(self, name=name) self.recorded_url_q = recorded_url_q self.stop = threading.Event() if writer_pool: @@ -75,9 +76,9 @@ class WarcWriterThread(threading.Thread): return meth in self._ALWAYS_ACCEPT or meth in self.method_filter def _run(self): + self.name = '%s(tid=%s)'% (self.name, warcprox.gettid()) while not self.stop.is_set(): try: - self.name = 'WarcWriterThread(tid={})'.format(warcprox.gettid()) while True: try: if self.stop.is_set(): @@ -105,7 +106,7 @@ class WarcWriterThread(threading.Thread): self.logger.info('WarcWriterThread shutting down') self.writer_pool.close_writers() - except BaseException as e: + except Exception as e: if isinstance(e, OSError) and e.errno == 28: # OSError: [Errno 28] No space left on device self.logger.critical(