From c9a39958db4e7d5628fbb302230ecbda2104f3e4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 15 Jan 2018 14:37:27 -0800 Subject: [PATCH] tests are passing --- tests/test_warcprox.py | 120 ++++++++++++++++++++------------------- tests/test_writer.py | 44 +++++++------- warcprox/__init__.py | 17 +++--- warcprox/bigtable.py | 2 +- warcprox/controller.py | 71 ++++++++++++----------- warcprox/dedup.py | 3 +- warcprox/main.py | 6 +- warcprox/mitmproxy.py | 7 ++- warcprox/playback.py | 18 +++--- warcprox/warcproxy.py | 44 ++++++-------- warcprox/writerthread.py | 3 + 11 files changed, 169 insertions(+), 166 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index f5175b9..36b7cea 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -357,7 +357,9 @@ def warcprox_(request): argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url')) args = warcprox.main.parse_args(argv) - warcprox_ = warcprox.main.init_controller(args) + + options = warcprox.Options(**vars(args)) + warcprox_ = warcprox.controller.WarcproxController(options) logging.info('starting warcprox') warcprox_thread = threading.Thread( @@ -490,8 +492,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.content == b'404 Not in Archive\n' # check not in dedup db - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( - b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = warcprox_.dedup_db.lookup( + b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup is None # archive @@ -508,13 +510,13 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup assert dedup_lookup['url'] == url.encode('ascii') @@ -535,12 +537,12 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check in dedup db (no change from prev) - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['id'] == record_id @@ -564,7 +566,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.content == b'404 Not in Archive\n' # check not in dedup db - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup is None @@ -582,13 +584,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check in dedup db # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup assert dedup_lookup['url'] == url.encode('ascii') @@ -609,12 +611,12 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check in dedup db (no change from prev) - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['id'] == record_id @@ -640,7 +642,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) @@ -652,7 +654,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(2.5) @@ -693,12 +695,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check url1 in dedup db bucket_a - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") assert dedup_lookup['url'] == url1.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) @@ -707,7 +709,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup is None @@ -720,12 +722,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check url2 in dedup db bucket_b - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') assert re.match(br'^$', dedup_lookup['id']) @@ -742,7 +744,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) @@ -755,15 +757,15 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # close the warc - assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] - writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] + assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer() + warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -948,7 +950,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) @@ -963,7 +965,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -990,7 +992,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1005,7 +1007,7 @@ def test_domain_doc_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1073,7 +1075,7 @@ def test_domain_data_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1089,7 +1091,7 @@ def test_domain_data_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1105,7 +1107,7 @@ def test_domain_data_soft_limit( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) # rethinkdb stats db update cycle is 2 seconds (at the moment anyway) time.sleep(2.0) @@ -1238,7 +1240,7 @@ def test_dedup_ok_flag( url = 'http://localhost:{}/z/b'.format(http_daemon.server_port) # check not in dedup db - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup is None @@ -1253,12 +1255,12 @@ def test_dedup_ok_flag( assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check that dedup db doesn't give us anything for this - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup is None @@ -1274,18 +1276,18 @@ def test_dedup_ok_flag( assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check that dedup db gives us something for this - dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( + dedup_lookup = warcprox_.dedup_db.lookup( b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', bucket='test_dedup_ok_flag') assert dedup_lookup # inspect what's in rethinkdb more closely - rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db + rethink_captures = warcprox_.dedup_db.captures_db results_iter = rethink_captures.rr.table(rethink_captures.table).get_all( ['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response', 'test_dedup_ok_flag'], index='sha1_warc_type').order_by( @@ -1366,26 +1368,28 @@ def test_controller_with_defaults(): assert controller.proxy.server_address == ('127.0.0.1', 8000) assert controller.proxy.server_port == 8000 assert controller.proxy.running_stats - for wwt in controller.warc_writer_threads: - assert wwt - assert wwt.recorded_url_q - assert wwt.recorded_url_q is controller.proxy.recorded_url_q - assert wwt.writer_pool - assert wwt.writer_pool.default_warc_writer - assert wwt.writer_pool.default_warc_writer.directory == './warcs' - assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None - assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 - assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' - assert wwt.writer_pool.default_warc_writer.gzip is False - assert wwt.writer_pool.default_warc_writer.record_builder - assert not wwt.writer_pool.default_warc_writer.record_builder.base32 - assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + assert not controller.proxy.stats_db + wwt = controller.warc_writer_thread + assert wwt + assert wwt.inq + assert not wwt.outq + assert wwt.writer_pool + assert wwt.writer_pool.default_warc_writer + assert wwt.writer_pool.default_warc_writer.directory == './warcs' + assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None + assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 + assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' + assert wwt.writer_pool.default_warc_writer.gzip is False + assert wwt.writer_pool.default_warc_writer.record_builder + assert not wwt.writer_pool.default_warc_writer.record_builder.base32 + assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_choose_a_port_for_me(warcprox_): options = warcprox.Options() options.port = 0 - controller = warcprox.controller.WarcproxController( - service_registry=warcprox_.service_registry, options=options) + if warcprox_.service_registry: + options.rethinkdb_services_url = 'rethinkdb://localhost/test0/services' + controller = warcprox.controller.WarcproxController(options) assert controller.proxy.server_port != 0 assert controller.proxy.server_port != 8000 assert controller.proxy.server_address == ( @@ -1426,7 +1430,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath + warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: @@ -1644,15 +1648,15 @@ def test_long_warcprox_meta( # wait for writer thread to process time.sleep(0.5) - while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + while warcprox_.postfetch_chain_busy(): time.sleep(0.5) time.sleep(0.5) # check that warcprox-meta was parsed and honored ("warc-prefix" param) - assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] - writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] + assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() assert os.path.exists(warc_path) # read the warc diff --git a/tests/test_writer.py b/tests/test_writer.py index 4474f82..2d9505f 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -88,27 +88,20 @@ def wait(callback, timeout): raise Exception('timed out waiting for %s to return truthy' % callback) def test_special_dont_write_prefix(): - class NotifyMe: - def __init__(self): - self.the_list = [] - def notify(self, recorded_url, records): - self.the_list.append((recorded_url, records)) - with tempfile.TemporaryDirectory() as tmpdir: logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - q = warcprox.TimestampedQueue(maxsize=1) - listener = NotifyMe() + inq = warcprox.TimestampedQueue(maxsize=1) + outq = warcprox.TimestampedQueue(maxsize=1) wwt = warcprox.writerthread.WarcWriterThread( - recorded_url_q=q, options=Options(prefix='-'), - listeners=[listener]) + inq, outq, Options(prefix='-')) try: wwt.start() # not to be written due to default prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + inq.put(RecordedUrl( url='http://example.com/no', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', @@ -117,30 +110,31 @@ def test_special_dont_write_prefix(): # to be written due to warcprox-meta prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + inq.put(RecordedUrl( url='http://example.com/yes', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', timestamp=datetime.utcnow(), payload_digest=recorder.block_digest, warcprox_meta={'warc-prefix': 'normal-warc-prefix'})) - wait(lambda: len(listener.the_list) == 2, 10.0) - assert not listener.the_list[0][1] - assert listener.the_list[1][1] + recorded_url = outq.get(timeout=10) + assert not recorded_url.warc_records + recorded_url = outq.get(timeout=10) + assert recorded_url.warc_records + assert outq.empty() finally: wwt.stop.set() wwt.join() - q = warcprox.TimestampedQueue(maxsize=1) - listener = NotifyMe() - wwt = warcprox.writerthread.WarcWriterThread( - recorded_url_q=q, listeners=[listener]) + inq = warcprox.TimestampedQueue(maxsize=1) + outq = warcprox.TimestampedQueue(maxsize=1) + wwt = warcprox.writerthread.WarcWriterThread(inq, outq) try: wwt.start() # to be written due to default prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + inq.put(RecordedUrl( url='http://example.com/yes', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', @@ -149,16 +143,18 @@ def test_special_dont_write_prefix(): # not to be written due to warcprox-meta prefix recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder.read() - q.put(RecordedUrl( + inq.put(RecordedUrl( url='http://example.com/no', content_type='text/plain', status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, remote_ip='127.0.0.3', timestamp=datetime.utcnow(), payload_digest=recorder.block_digest, warcprox_meta={'warc-prefix': '-'})) - wait(lambda: len(listener.the_list) == 2, 10.0) - assert listener.the_list[0][1] - assert not listener.the_list[1][1] + recorded_url = outq.get(timeout=10) + assert recorded_url.warc_records + recorded_url = outq.get(timeout=10) + assert not recorded_url.warc_records + assert outq.empty() finally: wwt.stop.set() wwt.join() diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 7c628ad..757f1c1 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -100,7 +100,7 @@ class BasePostfetchProcessor(threading.Thread): logger = logging.getLogger("warcprox.BasePostfetchProcessor") def __init__(self, inq, outq, options=Options()): - threading.Thread.__init__(self, name='???') + threading.Thread.__init__(self, name=self.__class__.__name__) self.inq = inq self.outq = outq self.options = options @@ -120,7 +120,8 @@ class BasePostfetchProcessor(threading.Thread): ''' Get url(s) from `self.inq`, process url(s), queue to `self.outq`. - Subclasses must implement this. + Subclasses must implement this. Implementations may operate on + individual urls, or on batches. May raise queue.Empty. ''' @@ -188,16 +189,16 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): def __init__(self, listener, inq, outq, profile=False): BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile) self.listener = listener + self.name = listener.__class__.__name__ + logging.info('self.name=%s', self.name) def _process_url(self, recorded_url): return self.listener.notify(recorded_url, recorded_url.warc_records) - # @classmethod - # def wrap(cls, listener, inq, outq, profile=False): - # if listener: - # return cls(listener, inq, outq, profile) - # else: - # return None + def start(self): + if hasattr(self.listener, 'start'): + self.listener.start() + BaseStandardPostfetchProcessor.start(self) # monkey-patch log levels TRACE and NOTICE TRACE = 5 diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 79d6240..e6674a6 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -215,7 +215,7 @@ class RethinkCaptures: if self._timer: self._timer.join() -class RethinkCapturesDedup: +class RethinkCapturesDedup(warcprox.dedup.DedupDb): logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") def __init__(self, options=warcprox.Options()): diff --git a/warcprox/controller.py b/warcprox/controller.py index 80b9c50..f6a720f 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -33,6 +33,7 @@ import datetime import warcprox import certauth import functools +import doublethink class Factory: @staticmethod @@ -65,22 +66,15 @@ class Factory: options.stats_db_file, options=options) return stats_db - # @staticmethod - # def certauth(options): - # ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] - # ca = certauth.certauth.CertificateAuthority( - # options.cacert, args.certs_dir, ca_name=ca_name) - # return ca - @staticmethod def warc_writer(inq, outq, options): return warcprox.writerthread.WarcWriterThread(inq, outq, options) @staticmethod - def playback_proxy(options): + def playback_proxy(ca, options): if options.playback_port is not None: playback_index_db = warcprox.playback.PlaybackIndexDb( - options.playback_index_db_file, options=options) + options=options) playback_proxy = warcprox.playback.PlaybackProxy( ca=ca, playback_index_db=playback_index_db, options=options) else: @@ -136,12 +130,22 @@ class WarcproxController(object): self.stop = threading.Event() self._start_stop_lock = threading.Lock() - self.proxy = warcprox.warcproxy.WarcProxy(options=options) + self.stats_db = Factory.stats_db(self.options) + + self.proxy = warcprox.warcproxy.WarcProxy(self.stats_db, options) + self.playback_proxy = Factory.playback_proxy( + self.proxy.ca, self.options) self.build_postfetch_chain(self.proxy.recorded_url_q) self.service_registry = Factory.service_registry(options) + def postfetch_chain_busy(self): + for processor in self._postfetch_chain: + if processor.inq.qsize() > 0: + return True + return False + def build_postfetch_chain(self, inq): constructors = [] @@ -155,12 +159,10 @@ class WarcproxController(object): if self.dedup_db: constructors.append(self.dedup_db.storer) - stats_db = Factory.stats_db(self.options) - if stats_db: + if self.stats_db: constructors.append(functools.partial( - warcprox.ListenerPostfetchProcessor, stats_db)) + warcprox.ListenerPostfetchProcessor, self.stats_db)) - self.playback_proxy = Factory.playback_proxy(self.options) if self.playback_proxy: constructors.append(functools.partial( warcprox.ListenerPostfetchProcessor, @@ -175,7 +177,7 @@ class WarcproxController(object): plugin = Factory.plugin(qualname) constructors.append(functools.partial( warcprox.ListenerPostfetchProcessor, plugin)) - + self._postfetch_chain = [] for i, constructor in enumerate(constructors): if i != len(constructors) - 1: @@ -184,6 +186,8 @@ class WarcproxController(object): else: outq = None processor = constructor(inq, outq, self.options) + if isinstance(processor, warcprox.writerthread.WarcWriterThread): + self.warc_writer_thread = processor # ugly self._postfetch_chain.append(processor) inq = outq @@ -277,6 +281,12 @@ class WarcproxController(object): target=self.proxy.serve_forever, name='ProxyThread') self.proxy_thread.start() + if self.playback_proxy: + self.playback_proxy_thread = threading.Thread( + target=self.playback_proxy.serve_forever, + name='PlaybackProxyThread') + self.playback_proxy_thread.start() + for processor in self._postfetch_chain: # logging.info('starting postfetch processor %r', processor) processor.start() @@ -288,34 +298,29 @@ class WarcproxController(object): self.logger.info('warcprox is not running') return - # for wwt in self.warc_writer_threads: - # wwt.stop.set() for processor in self._postfetch_chain: processor.stop.set() self.proxy.shutdown() self.proxy.server_close() + if self.playback_proxy is not None: + self.playback_proxy.shutdown() + self.playback_proxy.server_close() + if self.playback_proxy.playback_index_db is not None: + self.playback_proxy.playback_index_db.close() + for processor in self._postfetch_chain: processor.join() - # if self.playback_proxy is not None: - # self.playback_proxy.shutdown() - # self.playback_proxy.server_close() - # if self.playback_proxy.playback_index_db is not None: - # self.playback_proxy.playback_index_db.close() - # # wait for threads to finish - # for wwt in self.warc_writer_threads: - # wwt.join() + if self.stats_db: + self.stats_db.stop() - # if self.proxy.stats_db: - # self.proxy.stats_db.stop() + self.proxy_thread.join() + if self.playback_proxy is not None: + self.playback_proxy_thread.join() - # self.proxy_thread.join() - # if self.playback_proxy is not None: - # self.playback_proxy_thread.join() - - # if self.service_registry and hasattr(self, "status_info"): - # self.service_registry.unregister(self.status_info["id"]) + if self.service_registry and hasattr(self, "status_info"): + self.service_registry.unregister(self.status_info["id"]) def run_until_shutdown(self): """ diff --git a/warcprox/dedup.py b/warcprox/dedup.py index f62eccb..2aab0aa 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -37,7 +37,8 @@ urllib3.disable_warnings() class DedupLoader(warcprox.BaseStandardPostfetchProcessor): def __init__(self, dedup_db, inq, outq, base32=False, profile=False): - warcprox.BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile) + warcprox.BaseStandardPostfetchProcessor.__init__( + self, inq, outq, profile) self.dedup_db = dedup_db self.base32 = base32 def _process_url(self, recorded_url): diff --git a/warcprox/main.py b/warcprox/main.py index 785c029..3c93504 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -116,9 +116,9 @@ def _build_arg_parser(prog): arg_parser.add_argument('-P', '--playback-port', dest='playback_port', type=int, default=None, help='port to listen on for instant playback') - arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', - default='./warcprox-playback-index.db', - help='playback index database file (only used if --playback-port is specified)') + # arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + # default='./warcprox-playback-index.db', + # help='playback index database file (only used if --playback-port is specified)') group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 130196a..c42fb68 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -562,7 +562,12 @@ class PooledMitmProxy(PooledMixIn, MitmProxy): request_queue_size = 4096 def __init__(self, max_threads, options=warcprox.Options()): - PooledMixIn.__init__(self, max_threads) + if options.max_threads: + self.logger.info( + "max_threads=%s set by command line option", + options.max_threads) + + PooledMixIn.__init__(self, options.max_threads) self.profilers = {} if options.profile: diff --git a/warcprox/playback.py b/warcprox/playback.py index 1a698c0..91f86aa 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -121,9 +121,6 @@ class PlaybackProxyHandler(MitmProxyHandler): def _send_headers_and_refd_payload( self, headers, refers_to_target_uri, refers_to_date, payload_digest): - """Parameters: - - """ location = self.server.playback_index_db.lookup_exact( refers_to_target_uri, refers_to_date, payload_digest) self.logger.debug('loading http payload from {}'.format(location)) @@ -133,11 +130,13 @@ class PlaybackProxyHandler(MitmProxyHandler): for (offset, record, errors) in fh.read_records(limit=1, offsets=True): pass + if not record: + raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename)) + if errors: raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors)) - warc_type = record.get_header(warctools.WarcRecord.TYPE) - if warc_type != warctools.WarcRecord.RESPONSE: + if record.type != warctools.WarcRecord.RESPONSE: raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type)) # find end of headers @@ -158,12 +157,13 @@ class PlaybackProxyHandler(MitmProxyHandler): for (offset, record, errors) in fh.read_records(limit=1, offsets=True): pass + if not record: + raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename)) + if errors: raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) - warc_type = record.get_header(warctools.WarcRecord.TYPE) - - if warc_type == warctools.WarcRecord.RESPONSE: + if record.type == warctools.WarcRecord.RESPONSE: headers_buf = bytearray() while True: line = record.content_file.readline() @@ -173,7 +173,7 @@ class PlaybackProxyHandler(MitmProxyHandler): return self._send_response(headers_buf, record.content_file) - elif warc_type == warctools.WarcRecord.REVISIT: + elif record.type == warctools.WarcRecord.REVISIT: # response consists of http headers from revisit record and # payload from the referenced record warc_profile = record.get_header(warctools.WarcRecord.PROFILE) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 60f79d3..b2ac69e 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -92,6 +92,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.url, rule)) def _enforce_limit(self, limit_key, limit_value, soft=False): + if not self.server.stats_db: + return bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2) _limit_key = limit_key @@ -328,7 +330,7 @@ class RecordedUrl: warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, - payload_digest=None): + payload_digest=None, warc_records=None): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -367,6 +369,7 @@ class RecordedUrl: self.duration = duration self.referer = referer self.payload_digest = payload_digest + self.warc_records = warc_records # inherit from object so that multiple inheritance from this class works # properly in python 2 @@ -374,9 +377,9 @@ class RecordedUrl: class SingleThreadedWarcProxy(http_server.HTTPServer, object): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") - def __init__( - self, ca=None, recorded_url_q=None, stats_db=None, - options=warcprox.Options()): + def __init__(self, stats_db=None, options=warcprox.Options()): + self.options = options + server_address = ( options.address or 'localhost', options.port if options.port is not None else 8000) @@ -395,22 +398,15 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): self.digest_algorithm = options.digest_algorithm or 'sha1' - if ca is not None: - self.ca = ca - else: - ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] - self.ca = CertificateAuthority(ca_file='warcprox-ca.pem', - certs_dir='./warcprox-ca', - ca_name=ca_name) + ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64] + self.ca = CertificateAuthority( + ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca', + ca_name=ca_name) - if recorded_url_q is not None: - self.recorded_url_q = recorded_url_q - else: - self.recorded_url_q = warcprox.TimestampedQueue( - maxsize=options.queue_size or 1000) + self.recorded_url_q = warcprox.TimestampedQueue( + maxsize=options.queue_size or 1000) self.stats_db = stats_db - self.options = options self.running_stats = warcprox.stats.RunningStats() @@ -449,17 +445,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") - def __init__( - self, ca=None, recorded_url_q=None, stats_db=None, - running_stats=None, options=warcprox.Options()): - if options.max_threads: - self.logger.info( - "max_threads=%s set by command line option", - options.max_threads) - warcprox.mitmproxy.PooledMitmProxy.__init__( - self, options.max_threads, options) - SingleThreadedWarcProxy.__init__( - self, ca, recorded_url_q, stats_db, options) + def __init__(self, stats_db=None, options=warcprox.Options()): + warcprox.mitmproxy.PooledMitmProxy.__init__(self, options) + SingleThreadedWarcProxy.__init__(self, stats_db, options) def server_activate(self): http_server.HTTPServer.server_activate(self) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index ae64484..5747428 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -34,6 +34,8 @@ import warcprox class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") + _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} + def __init__(self, inq, outq, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__( self, inq, outq, options=options) @@ -48,6 +50,7 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): self.writer_pool.maybe_idle_rollover() def _process_url(self, recorded_url): + records = [] if self._should_archive(recorded_url): records = self.writer_pool.write_records(recorded_url) recorded_url.warc_records = records