tests are passing

This commit is contained in:
Noah Levitt 2018-01-15 14:37:27 -08:00
parent bd25991a0d
commit c9a39958db
11 changed files with 169 additions and 166 deletions

View File

@ -357,7 +357,9 @@ def warcprox_(request):
argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url'))
args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args)
options = warcprox.Options(**vars(args))
warcprox_ = warcprox.controller.WarcproxController(options)
logging.info('starting warcprox')
warcprox_thread = threading.Thread(
@ -490,8 +492,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.content == b'404 Not in Archive\n'
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup is None
# archive
@ -508,13 +510,13 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
@ -535,12 +537,12 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id
@ -564,7 +566,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.content == b'404 Not in Archive\n'
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup is None
@ -582,13 +584,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii')
@ -609,12 +611,12 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id
@ -640,7 +642,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
@ -652,7 +654,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(2.5)
@ -693,12 +695,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check url1 in dedup db bucket_a
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
@ -707,7 +709,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup is None
@ -720,12 +722,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check url2 in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup['url'] == url2.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
@ -742,7 +744,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
@ -755,15 +757,15 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# close the warc
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"]
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer()
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
assert os.path.exists(warc_path)
# read the warc
@ -948,7 +950,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
@ -963,7 +965,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -990,7 +992,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1005,7 +1007,7 @@ def test_domain_doc_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1073,7 +1075,7 @@ def test_domain_data_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1089,7 +1091,7 @@ def test_domain_data_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1105,7 +1107,7 @@ def test_domain_data_soft_limit(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
@ -1238,7 +1240,7 @@ def test_dedup_ok_flag(
url = 'http://localhost:{}/z/b'.format(http_daemon.server_port)
# check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup is None
@ -1253,12 +1255,12 @@ def test_dedup_ok_flag(
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check that dedup db doesn't give us anything for this
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup is None
@ -1274,18 +1276,18 @@ def test_dedup_ok_flag(
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check that dedup db gives us something for this
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup(
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag')
assert dedup_lookup
# inspect what's in rethinkdb more closely
rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db
rethink_captures = warcprox_.dedup_db.captures_db
results_iter = rethink_captures.rr.table(rethink_captures.table).get_all(
['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response',
'test_dedup_ok_flag'], index='sha1_warc_type').order_by(
@ -1366,26 +1368,28 @@ def test_controller_with_defaults():
assert controller.proxy.server_address == ('127.0.0.1', 8000)
assert controller.proxy.server_port == 8000
assert controller.proxy.running_stats
for wwt in controller.warc_writer_threads:
assert wwt
assert wwt.recorded_url_q
assert wwt.recorded_url_q is controller.proxy.recorded_url_q
assert wwt.writer_pool
assert wwt.writer_pool.default_warc_writer
assert wwt.writer_pool.default_warc_writer.directory == './warcs'
assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None
assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000
assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox'
assert wwt.writer_pool.default_warc_writer.gzip is False
assert wwt.writer_pool.default_warc_writer.record_builder
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
assert not controller.proxy.stats_db
wwt = controller.warc_writer_thread
assert wwt
assert wwt.inq
assert not wwt.outq
assert wwt.writer_pool
assert wwt.writer_pool.default_warc_writer
assert wwt.writer_pool.default_warc_writer.directory == './warcs'
assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None
assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000
assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox'
assert wwt.writer_pool.default_warc_writer.gzip is False
assert wwt.writer_pool.default_warc_writer.record_builder
assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
def test_choose_a_port_for_me(warcprox_):
options = warcprox.Options()
options.port = 0
controller = warcprox.controller.WarcproxController(
service_registry=warcprox_.service_registry, options=options)
if warcprox_.service_registry:
options.rethinkdb_services_url = 'rethinkdb://localhost/test0/services'
controller = warcprox.controller.WarcproxController(options)
assert controller.proxy.server_port != 0
assert controller.proxy.server_port != 8000
assert controller.proxy.server_address == (
@ -1426,7 +1430,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback
assert response.status_code == 200
assert not 'via' in playback_response
warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath
warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath
with open(warc, 'rb') as f:
for record in warcio.archiveiterator.ArchiveIterator(f):
if record.rec_headers.get_header('warc-target-uri') == url:
@ -1644,15 +1648,15 @@ def test_long_warcprox_meta(
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
while warcprox_.postfetch_chain_busy():
time.sleep(0.5)
time.sleep(0.5)
# check that warcprox-meta was parsed and honored ("warc-prefix" param)
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"]
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
assert os.path.exists(warc_path)
# read the warc

View File

@ -88,27 +88,20 @@ def wait(callback, timeout):
raise Exception('timed out waiting for %s to return truthy' % callback)
def test_special_dont_write_prefix():
class NotifyMe:
def __init__(self):
self.the_list = []
def notify(self, recorded_url, records):
self.the_list.append((recorded_url, records))
with tempfile.TemporaryDirectory() as tmpdir:
logging.debug('cd %s', tmpdir)
os.chdir(tmpdir)
q = warcprox.TimestampedQueue(maxsize=1)
listener = NotifyMe()
inq = warcprox.TimestampedQueue(maxsize=1)
outq = warcprox.TimestampedQueue(maxsize=1)
wwt = warcprox.writerthread.WarcWriterThread(
recorded_url_q=q, options=Options(prefix='-'),
listeners=[listener])
inq, outq, Options(prefix='-'))
try:
wwt.start()
# not to be written due to default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
q.put(RecordedUrl(
inq.put(RecordedUrl(
url='http://example.com/no', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
@ -117,30 +110,31 @@ def test_special_dont_write_prefix():
# to be written due to warcprox-meta prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
q.put(RecordedUrl(
inq.put(RecordedUrl(
url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'normal-warc-prefix'}))
wait(lambda: len(listener.the_list) == 2, 10.0)
assert not listener.the_list[0][1]
assert listener.the_list[1][1]
recorded_url = outq.get(timeout=10)
assert not recorded_url.warc_records
recorded_url = outq.get(timeout=10)
assert recorded_url.warc_records
assert outq.empty()
finally:
wwt.stop.set()
wwt.join()
q = warcprox.TimestampedQueue(maxsize=1)
listener = NotifyMe()
wwt = warcprox.writerthread.WarcWriterThread(
recorded_url_q=q, listeners=[listener])
inq = warcprox.TimestampedQueue(maxsize=1)
outq = warcprox.TimestampedQueue(maxsize=1)
wwt = warcprox.writerthread.WarcWriterThread(inq, outq)
try:
wwt.start()
# to be written due to default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
q.put(RecordedUrl(
inq.put(RecordedUrl(
url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
@ -149,16 +143,18 @@ def test_special_dont_write_prefix():
# not to be written due to warcprox-meta prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read()
q.put(RecordedUrl(
inq.put(RecordedUrl(
url='http://example.com/no', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': '-'}))
wait(lambda: len(listener.the_list) == 2, 10.0)
assert listener.the_list[0][1]
assert not listener.the_list[1][1]
recorded_url = outq.get(timeout=10)
assert recorded_url.warc_records
recorded_url = outq.get(timeout=10)
assert not recorded_url.warc_records
assert outq.empty()
finally:
wwt.stop.set()
wwt.join()

View File

@ -100,7 +100,7 @@ class BasePostfetchProcessor(threading.Thread):
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
def __init__(self, inq, outq, options=Options()):
threading.Thread.__init__(self, name='???')
threading.Thread.__init__(self, name=self.__class__.__name__)
self.inq = inq
self.outq = outq
self.options = options
@ -120,7 +120,8 @@ class BasePostfetchProcessor(threading.Thread):
'''
Get url(s) from `self.inq`, process url(s), queue to `self.outq`.
Subclasses must implement this.
Subclasses must implement this. Implementations may operate on
individual urls, or on batches.
May raise queue.Empty.
'''
@ -188,16 +189,16 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def __init__(self, listener, inq, outq, profile=False):
BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile)
self.listener = listener
self.name = listener.__class__.__name__
logging.info('self.name=%s', self.name)
def _process_url(self, recorded_url):
return self.listener.notify(recorded_url, recorded_url.warc_records)
# @classmethod
# def wrap(cls, listener, inq, outq, profile=False):
# if listener:
# return cls(listener, inq, outq, profile)
# else:
# return None
def start(self):
if hasattr(self.listener, 'start'):
self.listener.start()
BaseStandardPostfetchProcessor.start(self)
# monkey-patch log levels TRACE and NOTICE
TRACE = 5

View File

@ -215,7 +215,7 @@ class RethinkCaptures:
if self._timer:
self._timer.join()
class RethinkCapturesDedup:
class RethinkCapturesDedup(warcprox.dedup.DedupDb):
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, options=warcprox.Options()):

View File

@ -33,6 +33,7 @@ import datetime
import warcprox
import certauth
import functools
import doublethink
class Factory:
@staticmethod
@ -65,22 +66,15 @@ class Factory:
options.stats_db_file, options=options)
return stats_db
# @staticmethod
# def certauth(options):
# ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
# ca = certauth.certauth.CertificateAuthority(
# options.cacert, args.certs_dir, ca_name=ca_name)
# return ca
@staticmethod
def warc_writer(inq, outq, options):
return warcprox.writerthread.WarcWriterThread(inq, outq, options)
@staticmethod
def playback_proxy(options):
def playback_proxy(ca, options):
if options.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
options.playback_index_db_file, options=options)
options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
else:
@ -136,12 +130,22 @@ class WarcproxController(object):
self.stop = threading.Event()
self._start_stop_lock = threading.Lock()
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
self.stats_db = Factory.stats_db(self.options)
self.proxy = warcprox.warcproxy.WarcProxy(self.stats_db, options)
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
self.build_postfetch_chain(self.proxy.recorded_url_q)
self.service_registry = Factory.service_registry(options)
def postfetch_chain_busy(self):
for processor in self._postfetch_chain:
if processor.inq.qsize() > 0:
return True
return False
def build_postfetch_chain(self, inq):
constructors = []
@ -155,12 +159,10 @@ class WarcproxController(object):
if self.dedup_db:
constructors.append(self.dedup_db.storer)
stats_db = Factory.stats_db(self.options)
if stats_db:
if self.stats_db:
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor, stats_db))
warcprox.ListenerPostfetchProcessor, self.stats_db))
self.playback_proxy = Factory.playback_proxy(self.options)
if self.playback_proxy:
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor,
@ -175,7 +177,7 @@ class WarcproxController(object):
plugin = Factory.plugin(qualname)
constructors.append(functools.partial(
warcprox.ListenerPostfetchProcessor, plugin))
self._postfetch_chain = []
for i, constructor in enumerate(constructors):
if i != len(constructors) - 1:
@ -184,6 +186,8 @@ class WarcproxController(object):
else:
outq = None
processor = constructor(inq, outq, self.options)
if isinstance(processor, warcprox.writerthread.WarcWriterThread):
self.warc_writer_thread = processor # ugly
self._postfetch_chain.append(processor)
inq = outq
@ -277,6 +281,12 @@ class WarcproxController(object):
target=self.proxy.serve_forever, name='ProxyThread')
self.proxy_thread.start()
if self.playback_proxy:
self.playback_proxy_thread = threading.Thread(
target=self.playback_proxy.serve_forever,
name='PlaybackProxyThread')
self.playback_proxy_thread.start()
for processor in self._postfetch_chain:
# logging.info('starting postfetch processor %r', processor)
processor.start()
@ -288,34 +298,29 @@ class WarcproxController(object):
self.logger.info('warcprox is not running')
return
# for wwt in self.warc_writer_threads:
# wwt.stop.set()
for processor in self._postfetch_chain:
processor.stop.set()
self.proxy.shutdown()
self.proxy.server_close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()
self.playback_proxy.server_close()
if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close()
for processor in self._postfetch_chain:
processor.join()
# if self.playback_proxy is not None:
# self.playback_proxy.shutdown()
# self.playback_proxy.server_close()
# if self.playback_proxy.playback_index_db is not None:
# self.playback_proxy.playback_index_db.close()
# # wait for threads to finish
# for wwt in self.warc_writer_threads:
# wwt.join()
if self.stats_db:
self.stats_db.stop()
# if self.proxy.stats_db:
# self.proxy.stats_db.stop()
self.proxy_thread.join()
if self.playback_proxy is not None:
self.playback_proxy_thread.join()
# self.proxy_thread.join()
# if self.playback_proxy is not None:
# self.playback_proxy_thread.join()
# if self.service_registry and hasattr(self, "status_info"):
# self.service_registry.unregister(self.status_info["id"])
if self.service_registry and hasattr(self, "status_info"):
self.service_registry.unregister(self.status_info["id"])
def run_until_shutdown(self):
"""

View File

@ -37,7 +37,8 @@ urllib3.disable_warnings()
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
def __init__(self, dedup_db, inq, outq, base32=False, profile=False):
warcprox.BaseStandardPostfetchProcessor.__init__(self, inq, outq, profile)
warcprox.BaseStandardPostfetchProcessor.__init__(
self, inq, outq, profile)
self.dedup_db = dedup_db
self.base32 = base32
def _process_url(self, recorded_url):

View File

@ -116,9 +116,9 @@ def _build_arg_parser(prog):
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
# arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
# default='./warcprox-playback-index.db',
# help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')

View File

@ -562,7 +562,12 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
request_queue_size = 4096
def __init__(self, max_threads, options=warcprox.Options()):
PooledMixIn.__init__(self, max_threads)
if options.max_threads:
self.logger.info(
"max_threads=%s set by command line option",
options.max_threads)
PooledMixIn.__init__(self, options.max_threads)
self.profilers = {}
if options.profile:

View File

@ -121,9 +121,6 @@ class PlaybackProxyHandler(MitmProxyHandler):
def _send_headers_and_refd_payload(
self, headers, refers_to_target_uri, refers_to_date, payload_digest):
"""Parameters:
"""
location = self.server.playback_index_db.lookup_exact(
refers_to_target_uri, refers_to_date, payload_digest)
self.logger.debug('loading http payload from {}'.format(location))
@ -133,11 +130,13 @@ class PlaybackProxyHandler(MitmProxyHandler):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if not record:
raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename))
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
if record.type != warctools.WarcRecord.RESPONSE:
raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type))
# find end of headers
@ -158,12 +157,13 @@ class PlaybackProxyHandler(MitmProxyHandler):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if not record:
raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename))
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type == warctools.WarcRecord.RESPONSE:
if record.type == warctools.WarcRecord.RESPONSE:
headers_buf = bytearray()
while True:
line = record.content_file.readline()
@ -173,7 +173,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
return self._send_response(headers_buf, record.content_file)
elif warc_type == warctools.WarcRecord.REVISIT:
elif record.type == warctools.WarcRecord.REVISIT:
# response consists of http headers from revisit record and
# payload from the referenced record
warc_profile = record.get_header(warctools.WarcRecord.PROFILE)

View File

@ -92,6 +92,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self.url, rule))
def _enforce_limit(self, limit_key, limit_value, soft=False):
if not self.server.stats_db:
return
bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2)
_limit_key = limit_key
@ -328,7 +330,7 @@ class RecordedUrl:
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None,
payload_digest=None):
payload_digest=None, warc_records=None):
# XXX should test what happens with non-ascii url (when does
# url-encoding happen?)
if type(url) is not bytes:
@ -367,6 +369,7 @@ class RecordedUrl:
self.duration = duration
self.referer = referer
self.payload_digest = payload_digest
self.warc_records = warc_records
# inherit from object so that multiple inheritance from this class works
# properly in python 2
@ -374,9 +377,9 @@ class RecordedUrl:
class SingleThreadedWarcProxy(http_server.HTTPServer, object):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
options=warcprox.Options()):
def __init__(self, stats_db=None, options=warcprox.Options()):
self.options = options
server_address = (
options.address or 'localhost',
options.port if options.port is not None else 8000)
@ -395,22 +398,15 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.digest_algorithm = options.digest_algorithm or 'sha1'
if ca is not None:
self.ca = ca
else:
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
self.ca = CertificateAuthority(ca_file='warcprox-ca.pem',
certs_dir='./warcprox-ca',
ca_name=ca_name)
ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64]
self.ca = CertificateAuthority(
ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca',
ca_name=ca_name)
if recorded_url_q is not None:
self.recorded_url_q = recorded_url_q
else:
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.stats_db = stats_db
self.options = options
self.running_stats = warcprox.stats.RunningStats()
@ -449,17 +445,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
running_stats=None, options=warcprox.Options()):
if options.max_threads:
self.logger.info(
"max_threads=%s set by command line option",
options.max_threads)
warcprox.mitmproxy.PooledMitmProxy.__init__(
self, options.max_threads, options)
SingleThreadedWarcProxy.__init__(
self, ca, recorded_url_q, stats_db, options)
def __init__(self, stats_db=None, options=warcprox.Options()):
warcprox.mitmproxy.PooledMitmProxy.__init__(self, options)
SingleThreadedWarcProxy.__init__(self, stats_db, options)
def server_activate(self):
http_server.HTTPServer.server_activate(self)

View File

@ -34,6 +34,8 @@ import warcprox
class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.writerthread.WarcWriterThread")
_ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'}
def __init__(self, inq, outq, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(
self, inq, outq, options=options)
@ -48,6 +50,7 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
self.writer_pool.maybe_idle_rollover()
def _process_url(self, recorded_url):
records = []
if self._should_archive(recorded_url):
records = self.writer_pool.write_records(recorded_url)
recorded_url.warc_records = records