Merge pull request #51 from nlevitt/wip-postfetch-chain

WIP postfetch chain
This commit is contained in:
jkafader 2018-01-18 13:01:55 -08:00 committed by GitHub
commit 5a9c9e8e15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 926 additions and 745 deletions

View File

@ -215,9 +215,9 @@ if __name__ == '__main__':
args.cacert = os.path.join(tmpdir, 'benchmark-warcprox-ca.pem') args.cacert = os.path.join(tmpdir, 'benchmark-warcprox-ca.pem')
args.certs_dir = os.path.join(tmpdir, 'benchmark-warcprox-ca') args.certs_dir = os.path.join(tmpdir, 'benchmark-warcprox-ca')
args.directory = os.path.join(tmpdir, 'warcs') args.directory = os.path.join(tmpdir, 'warcs')
if args.rethinkdb_servers: # if args.rethinkdb_servers:
args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % ( # args.rethinkdb_db = 'benchmarks_{:%Y%m%d%H%M%S}' % (
datetime.datetime.utcnow()) # datetime.datetime.utcnow())
start_servers() start_servers()
logging.info( logging.info(
@ -247,7 +247,9 @@ if __name__ == '__main__':
logging.info('SKIPPED') logging.info('SKIPPED')
logging.info('===== baseline benchmark finished =====') logging.info('===== baseline benchmark finished =====')
warcprox_controller = warcprox.main.init_controller(args) options = warcprox.Options(**vars(args))
warcprox_controller = warcprox.controller.WarcproxController(options)
warcprox_controller_thread = threading.Thread( warcprox_controller_thread = threading.Thread(
target=warcprox_controller.run_until_shutdown) target=warcprox_controller.run_until_shutdown)
warcprox_controller_thread.start() warcprox_controller_thread.start()

View File

@ -96,6 +96,14 @@ logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning) warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning) warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning)
def wait(callback, timeout=10):
start = time.time()
while time.time() - start < timeout:
if callback():
return
time.sleep(0.1)
raise Exception('timed out waiting for %s to return truthy' % callback)
# monkey patch dns lookup so we can test domain inheritance on localhost # monkey patch dns lookup so we can test domain inheritance on localhost
orig_getaddrinfo = socket.getaddrinfo orig_getaddrinfo = socket.getaddrinfo
orig_gethostbyname = socket.gethostbyname orig_gethostbyname = socket.gethostbyname
@ -339,6 +347,9 @@ def warcprox_(request):
logging.info('changing to working directory %r', work_dir) logging.info('changing to working directory %r', work_dir)
os.chdir(work_dir) os.chdir(work_dir)
# we can't wait around all day in the tests
warcprox.BaseBatchPostfetchProcessor.MAX_BATCH_SEC = 0.5
argv = ['warcprox', argv = ['warcprox',
'--method-filter=GET', '--method-filter=GET',
'--method-filter=POST', '--method-filter=POST',
@ -357,9 +368,12 @@ def warcprox_(request):
argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url')) argv.append('--rethinkdb-trough-db-url=%s' % request.config.getoption('--rethinkdb-trough-db-url'))
args = warcprox.main.parse_args(argv) args = warcprox.main.parse_args(argv)
warcprox_ = warcprox.main.init_controller(args)
options = warcprox.Options(**vars(args))
warcprox_ = warcprox.controller.WarcproxController(options)
logging.info('starting warcprox') logging.info('starting warcprox')
warcprox_.start()
warcprox_thread = threading.Thread( warcprox_thread = threading.Thread(
name='WarcproxThread', target=warcprox_.run_until_shutdown) name='WarcproxThread', target=warcprox_.run_until_shutdown)
warcprox_thread.start() warcprox_thread.start()
@ -431,17 +445,9 @@ def test_httpds_no_proxy(http_daemon, https_daemon):
assert response.headers['warcprox-test-header'] == 'c!' assert response.headers['warcprox-test-header'] == 'c!'
assert response.content == b'I am the warcprox test payload! dddddddddd!\n' assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
def _poll_playback_until(playback_proxies, url, status, timeout_sec): def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies, warcprox_):
start = time.time() urls_before = warcprox_.proxy.running_stats.urls
# check playback (warc writing is asynchronous, give it up to 10 sec)
while time.time() - start < timeout_sec:
response = requests.get(url, proxies=playback_proxies, verify=False)
if response.status_code == status:
break
time.sleep(0.5)
return response
def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_proxies):
url = 'http://localhost:{}/a/b'.format(http_daemon.server_port) url = 'http://localhost:{}/a/b'.format(http_daemon.server_port)
# ensure playback fails before archiving # ensure playback fails before archiving
@ -455,12 +461,17 @@ def test_archive_and_playback_http_url(http_daemon, archiving_proxies, playback_
assert response.headers['warcprox-test-header'] == 'a!' assert response.headers['warcprox-test-header'] == 'a!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) # wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'a!' assert response.headers['warcprox-test-header'] == 'a!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies): def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playback_proxies, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
url = 'https://localhost:{}/c/d'.format(https_daemon.server_port) url = 'https://localhost:{}/c/d'.format(https_daemon.server_port)
# ensure playback fails before archiving # ensure playback fails before archiving
@ -474,14 +485,19 @@ def test_archive_and_playback_https_url(https_daemon, archiving_proxies, playbac
assert response.headers['warcprox-test-header'] == 'c!' assert response.headers['warcprox-test-header'] == 'c!'
assert response.content == b'I am the warcprox test payload! dddddddddd!\n' assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# test playback # test playback
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'c!' assert response.headers['warcprox-test-header'] == 'c!'
assert response.content == b'I am the warcprox test payload! dddddddddd!\n' assert response.content == b'I am the warcprox test payload! dddddddddd!\n'
# test dedup of same http url with same payload # test dedup of same http url with same payload
def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies): def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:{}/e/f'.format(http_daemon.server_port) url = 'http://localhost:{}/e/f'.format(http_daemon.server_port)
# ensure playback fails before archiving # ensure playback fails before archiving
@ -490,8 +506,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.content == b'404 Not in Archive\n' assert response.content == b'404 Not in Archive\n'
# check not in dedup db # check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup is None assert dedup_lookup is None
# archive # archive
@ -500,21 +516,17 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.headers['warcprox-test-header'] == 'e!' assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n' assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# test playback # test playback
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'e!' assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n' assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db # check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} # {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['url'] == url.encode('ascii')
@ -525,7 +537,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# need revisit to have a later timestamp than original, else playing # need revisit to have a later timestamp than original, else playing
# back the latest record might not hit the revisit # back the latest record might not hit the revisit
time.sleep(1.5) time.sleep(1.1)
# fetch & archive revisit # fetch & archive revisit
response = requests.get(url, proxies=archiving_proxies, verify=False) response = requests.get(url, proxies=archiving_proxies, verify=False)
@ -533,14 +545,11 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.headers['warcprox-test-header'] == 'e!' assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n' assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev) # check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id assert dedup_lookup['id'] == record_id
@ -548,7 +557,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# test playback # test playback
logging.debug('testing playback of revisit of {}'.format(url)) logging.debug('testing playback of revisit of {}'.format(url))
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'e!' assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n' assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
@ -556,6 +565,8 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
# test dedup of same https url with same payload # test dedup of same https url with same payload
def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies): def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'https://localhost:{}/g/h'.format(https_daemon.server_port) url = 'https://localhost:{}/g/h'.format(https_daemon.server_port)
# ensure playback fails before archiving # ensure playback fails before archiving
@ -564,7 +575,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.content == b'404 Not in Archive\n' assert response.content == b'404 Not in Archive\n'
# check not in dedup db # check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup is None assert dedup_lookup is None
@ -574,21 +585,18 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.headers['warcprox-test-header'] == 'g!' assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# test playback # test playback
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'g!' assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# wait for writer thread to process
time.sleep(0.5)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db # check in dedup db
# {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} # {u'id': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'}
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup assert dedup_lookup
assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['url'] == url.encode('ascii')
@ -599,7 +607,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# need revisit to have a later timestamp than original, else playing # need revisit to have a later timestamp than original, else playing
# back the latest record might not hit the revisit # back the latest record might not hit the revisit
time.sleep(1.5) time.sleep(1.1)
# fetch & archive revisit # fetch & archive revisit
response = requests.get(url, proxies=archiving_proxies, verify=False) response = requests.get(url, proxies=archiving_proxies, verify=False)
@ -607,14 +615,11 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.headers['warcprox-test-header'] == 'g!' assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check in dedup db (no change from prev) # check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
assert dedup_lookup['url'] == url.encode('ascii') assert dedup_lookup['url'] == url.encode('ascii')
assert dedup_lookup['id'] == record_id assert dedup_lookup['id'] == record_id
@ -622,13 +627,15 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
# test playback # test playback
logging.debug('testing playback of revisit of {}'.format(url)) logging.debug('testing playback of revisit of {}'.format(url))
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'g!' assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# XXX how to check dedup was used? # XXX how to check dedup was used?
def test_limits(http_daemon, warcprox_, archiving_proxies): def test_limits(http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"stats":{"buckets":["test_limits_bucket"]},"limits":{"test_limits_bucket/total/urls":10}} request_meta = {"stats":{"buckets":["test_limits_bucket"]},"limits":{"test_limits_bucket/total/urls":10}}
headers = {"Warcprox-Meta": json.dumps(request_meta)} headers = {"Warcprox-Meta": json.dumps(request_meta)}
@ -638,11 +645,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.headers['warcprox-test-header'] == 'i!' assert response.headers['warcprox-test-header'] == 'i!'
assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
for i in range(9): for i in range(9):
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
@ -650,11 +654,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.headers['warcprox-test-header'] == 'i!' assert response.headers['warcprox-test-header'] == 'i!'
assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 10)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(2.5)
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420 assert response.status_code == 420
@ -665,6 +666,8 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n" assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"accept": ["capture-metadata"]} request_meta = {"accept": ["capture-metadata"]}
headers = {"Warcprox-Meta": json.dumps(request_meta)} headers = {"Warcprox-Meta": json.dumps(request_meta)}
@ -680,7 +683,12 @@ def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
except ValueError: except ValueError:
pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp']) pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp'])
# wait for postfetch chain (or subsequent test could fail)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
@ -691,15 +699,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n' assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check url1 in dedup db bucket_a # check url1 in dedup db bucket_a
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( # logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_a')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
assert dedup_lookup
assert dedup_lookup['url'] == url1.encode('ascii') assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id']) assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
@ -707,7 +714,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
dedup_date = dedup_lookup['date'] dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_b # check url1 not in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup is None assert dedup_lookup is None
@ -718,14 +725,11 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n' assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check url2 in dedup db bucket_b # check url2 in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup['url'] == url2.encode('ascii') assert dedup_lookup['url'] == url2.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id']) assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
@ -740,11 +744,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n' assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# archive url1 bucket_b # archive url1 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})}
@ -753,17 +754,14 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert response.headers['warcprox-test-header'] == 'k!' assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n' assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# close the warc # close the warc
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"] writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
warc_path = os.path.join(writer.directory, writer._f_finalname) warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_dedup_buckets"].close_writer() warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
assert os.path.exists(warc_path) assert os.path.exists(warc_path)
# read the warc # read the warc
@ -821,6 +819,8 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
fh.close() fh.close()
def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies): def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
rules = [ rules = [
{ {
"domain": "localhost", "domain": "localhost",
@ -857,6 +857,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
url, proxies=archiving_proxies, headers=headers, stream=True) url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 200 assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# blocked by SURT_MATCH # blocked by SURT_MATCH
url = 'http://localhost:{}/fuh/guh'.format(http_daemon.server_port) url = 'http://localhost:{}/fuh/guh'.format(http_daemon.server_port)
response = requests.get( response = requests.get(
@ -872,6 +875,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
# 404 because server set up at the top of this file doesn't handle this url # 404 because server set up at the top of this file doesn't handle this url
assert response.status_code == 404 assert response.status_code == 404
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
# not blocked because surt scheme does not match (differs from heritrix # not blocked because surt scheme does not match (differs from heritrix
# behavior where https urls are coerced to http surt form) # behavior where https urls are coerced to http surt form)
url = 'https://localhost:{}/fuh/guh'.format(https_daemon.server_port) url = 'https://localhost:{}/fuh/guh'.format(https_daemon.server_port)
@ -880,6 +886,9 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
verify=False) verify=False)
assert response.status_code == 200 assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
# blocked by blanket domain block # blocked by blanket domain block
url = 'http://bad.domain.com/' url = 'http://bad.domain.com/'
response = requests.get( response = requests.get(
@ -932,6 +941,8 @@ def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
def test_domain_doc_soft_limit( def test_domain_doc_soft_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies): http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
request_meta = { request_meta = {
"stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]}, "stats": {"buckets": [{"bucket":"test_domain_doc_limit_bucket","tally-domains":["foo.localhost"]}]},
"soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10}, "soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10},
@ -946,11 +957,8 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!' assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n' assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# make sure stats from different domain don't count # make sure stats from different domain don't count
url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port) url = 'http://bar.localhost:{}/o/p'.format(http_daemon.server_port)
@ -961,15 +969,10 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!' assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n' assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# (2) same host but different scheme and port: domain limit applies # (2) same host but different scheme and port: domain limit applies
#
url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port) url = 'https://foo.localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get( response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True, url, proxies=archiving_proxies, headers=headers, stream=True,
@ -988,12 +991,12 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!' assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n' assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) time.sleep(3)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): logging.info(
time.sleep(0.5) 'warcprox_.proxy.running_stats.urls - urls_before = %s',
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway) warcprox_.proxy.running_stats.urls - urls_before)
time.sleep(2.0) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 19)
# (10) # (10)
response = requests.get( response = requests.get(
@ -1003,12 +1006,8 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!' assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n' assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# (11) back to http, and this is the 11th request # (11) back to http, and this is the 11th request
url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port) url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port)
@ -1030,6 +1029,9 @@ def test_domain_doc_soft_limit(
assert response.headers['warcprox-test-header'] == 'o!' assert response.headers['warcprox-test-header'] == 'o!'
assert response.content == b'I am the warcprox test payload! pppppppppp!\n' assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 21)
# https also blocked # https also blocked
url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port) url = 'https://zuh.foo.localhost:{}/o/p'.format(https_daemon.server_port)
response = requests.get( response = requests.get(
@ -1056,6 +1058,8 @@ def test_domain_doc_soft_limit(
def test_domain_data_soft_limit( def test_domain_data_soft_limit(
http_daemon, https_daemon, warcprox_, archiving_proxies): http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
# using idn # using idn
request_meta = { request_meta = {
"stats": {"buckets": [{"bucket":"test_domain_data_limit_bucket","tally-domains":['ÞzZ.LOCALhost']}]}, "stats": {"buckets": [{"bucket":"test_domain_data_limit_bucket","tally-domains":['ÞzZ.LOCALhost']}]},
@ -1071,12 +1075,8 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'y!' assert response.headers['warcprox-test-header'] == 'y!'
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n' assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# duplicate, does not count toward limit # duplicate, does not count toward limit
url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port) url = 'https://baz.Þzz.localhost:{}/y/z'.format(https_daemon.server_port)
@ -1087,12 +1087,8 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'y!' assert response.headers['warcprox-test-header'] == 'y!'
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n' assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# novel, pushes stats over the limit # novel, pushes stats over the limit
url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port) url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port)
@ -1103,12 +1099,8 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'z!' assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n' assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n'
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
# rethinkdb stats db update cycle is 2 seconds (at the moment anyway)
time.sleep(2.0)
# make sure limit doesn't get applied to a different host # make sure limit doesn't get applied to a different host
url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port) url = 'http://baz.localhost:{}/z/~'.format(http_daemon.server_port)
@ -1118,6 +1110,9 @@ def test_domain_data_soft_limit(
assert response.headers['warcprox-test-header'] == 'z!' assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n' assert response.content == b'I am the warcprox test payload! ~~~~~~~~~~!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
# blocked because we're over the limit now # blocked because we're over the limit now
url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port) url = 'http://lOl.wHut.ÞZZ.lOcALHOst:{}/y/z'.format(http_daemon.server_port)
response = requests.get( response = requests.get(
@ -1149,7 +1144,9 @@ def test_domain_data_soft_limit(
# connection to the internet, and relies on a third party site (facebook) being # connection to the internet, and relies on a third party site (facebook) being
# up and behaving a certain way # up and behaving a certain way
@pytest.mark.xfail @pytest.mark.xfail
def test_tor_onion(archiving_proxies): def test_tor_onion(archiving_proxies, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
response = requests.get('http://www.facebookcorewwwi.onion/', response = requests.get('http://www.facebookcorewwwi.onion/',
proxies=archiving_proxies, verify=False, allow_redirects=False) proxies=archiving_proxies, verify=False, allow_redirects=False)
assert response.status_code == 302 assert response.status_code == 302
@ -1158,7 +1155,12 @@ def test_tor_onion(archiving_proxies):
proxies=archiving_proxies, verify=False, allow_redirects=False) proxies=archiving_proxies, verify=False, allow_redirects=False)
assert response.status_code == 200 assert response.status_code == 200
def test_missing_content_length(archiving_proxies, http_daemon, https_daemon): # wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
def test_missing_content_length(archiving_proxies, http_daemon, https_daemon, warcprox_):
urls_before = warcprox_.proxy.running_stats.urls
# double-check that our test http server is responding as expected # double-check that our test http server is responding as expected
url = 'http://localhost:%s/missing-content-length' % ( url = 'http://localhost:%s/missing-content-length' % (
http_daemon.server_port) http_daemon.server_port)
@ -1195,8 +1197,14 @@ def test_missing_content_length(archiving_proxies, http_daemon, https_daemon):
b'This response is missing a Content-Length http header.') b'This response is missing a Content-Length http header.')
assert not 'content-length' in response.headers assert not 'content-length' in response.headers
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
def test_method_filter( def test_method_filter(
https_daemon, http_daemon, archiving_proxies, playback_proxies): warcprox_, https_daemon, http_daemon, archiving_proxies,
playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
# we've configured warcprox with method_filters=['GET','POST'] so HEAD # we've configured warcprox with method_filters=['GET','POST'] so HEAD
# requests should not be archived # requests should not be archived
@ -1207,7 +1215,10 @@ def test_method_filter(
assert response.headers['warcprox-test-header'] == 'z!' assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'' assert response.content == b''
response = _poll_playback_until(playback_proxies, url, status=200, timeout_sec=10) # wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 404 assert response.status_code == 404
assert response.content == b'404 Not in Archive\n' assert response.content == b'404 Not in Archive\n'
@ -1224,13 +1235,17 @@ def test_method_filter(
headers=headers, proxies=archiving_proxies) headers=headers, proxies=archiving_proxies)
assert response.status_code == 204 assert response.status_code == 204
response = _poll_playback_until( # wait for postfetch chain
playback_proxies, url, status=200, timeout_sec=10) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
response = requests.get(url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert response.content == payload assert response.content == payload
def test_dedup_ok_flag( def test_dedup_ok_flag(
https_daemon, http_daemon, warcprox_, archiving_proxies): https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
if not warcprox_.options.rethinkdb_big_table: if not warcprox_.options.rethinkdb_big_table:
# this feature is n/a unless using rethinkdb big table # this feature is n/a unless using rethinkdb big table
return return
@ -1238,7 +1253,7 @@ def test_dedup_ok_flag(
url = 'http://localhost:{}/z/b'.format(http_daemon.server_port) url = 'http://localhost:{}/z/b'.format(http_daemon.server_port)
# check not in dedup db # check not in dedup db
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag') bucket='test_dedup_ok_flag')
assert dedup_lookup is None assert dedup_lookup is None
@ -1252,13 +1267,11 @@ def test_dedup_ok_flag(
assert response.headers['warcprox-test-header'] == 'z!' assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5) # wait for postfetch chain
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
time.sleep(0.5)
time.sleep(0.5)
# check that dedup db doesn't give us anything for this # check that dedup db doesn't give us anything for this
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag') bucket='test_dedup_ok_flag')
assert dedup_lookup is None assert dedup_lookup is None
@ -1273,19 +1286,17 @@ def test_dedup_ok_flag(
assert response.headers['warcprox-test-header'] == 'z!' assert response.headers['warcprox-test-header'] == 'z!'
assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n' assert response.content == b'I am the warcprox test payload! bbbbbbbbbb!\n'
time.sleep(0.5) # wait for postfetch chain
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
time.sleep(0.5)
time.sleep(0.5)
# check that dedup db gives us something for this # check that dedup db gives us something for this
dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079', b'sha1:2d7f13181b90a256ce5e5ebfd6e9c9826ece9079',
bucket='test_dedup_ok_flag') bucket='test_dedup_ok_flag')
assert dedup_lookup assert dedup_lookup
# inspect what's in rethinkdb more closely # inspect what's in rethinkdb more closely
rethink_captures = warcprox_.warc_writer_threads[0].dedup_db.captures_db rethink_captures = warcprox_.dedup_db.captures_db
results_iter = rethink_captures.rr.table(rethink_captures.table).get_all( results_iter = rethink_captures.rr.table(rethink_captures.table).get_all(
['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response', ['FV7RGGA3SCRFNTS6L275N2OJQJXM5EDZ', 'response',
'test_dedup_ok_flag'], index='sha1_warc_type').order_by( 'test_dedup_ok_flag'], index='sha1_warc_type').order_by(
@ -1310,7 +1321,8 @@ def test_status_api(warcprox_):
'role', 'version', 'host', 'address', 'port', 'pid', 'load', 'role', 'version', 'host', 'address', 'port', 'pid', 'load',
'queued_urls', 'queue_max_size', 'seconds_behind', 'threads', 'queued_urls', 'queue_max_size', 'seconds_behind', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min', 'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min',
'active_requests',} 'active_requests','start_time','urls_processed',
'warc_bytes_written','postfetch_chain',}
assert status['role'] == 'warcprox' assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__ assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port assert status['port'] == warcprox_.proxy.server_port
@ -1331,7 +1343,8 @@ def test_svcreg_status(warcprox_):
'queued_urls', 'queue_max_size', 'seconds_behind', 'queued_urls', 'queue_max_size', 'seconds_behind',
'first_heartbeat', 'ttl', 'last_heartbeat', 'threads', 'first_heartbeat', 'ttl', 'last_heartbeat', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_5min', 'rates_1min', 'unaccepted_requests',
'rates_15min', 'active_requests',} 'rates_15min', 'active_requests','start_time','urls_processed',
'warc_bytes_written','postfetch_chain',}
assert status['role'] == 'warcprox' assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__ assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port assert status['port'] == warcprox_.proxy.server_port
@ -1366,32 +1379,51 @@ def test_controller_with_defaults():
assert controller.proxy.server_address == ('127.0.0.1', 8000) assert controller.proxy.server_address == ('127.0.0.1', 8000)
assert controller.proxy.server_port == 8000 assert controller.proxy.server_port == 8000
assert controller.proxy.running_stats assert controller.proxy.running_stats
for wwt in controller.warc_writer_threads: assert not controller.proxy.stats_db
assert wwt wwt = controller.warc_writer_thread
assert wwt.recorded_url_q assert wwt
assert wwt.recorded_url_q is controller.proxy.recorded_url_q assert wwt.inq
assert wwt.writer_pool assert wwt.outq
assert wwt.writer_pool.default_warc_writer assert wwt.writer_pool
assert wwt.writer_pool.default_warc_writer.directory == './warcs' assert wwt.writer_pool.default_warc_writer
assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None assert wwt.writer_pool.default_warc_writer.directory == './warcs'
assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None
assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000
assert wwt.writer_pool.default_warc_writer.gzip is False assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox'
assert wwt.writer_pool.default_warc_writer.record_builder assert wwt.writer_pool.default_warc_writer.gzip is False
assert not wwt.writer_pool.default_warc_writer.record_builder.base32 assert wwt.writer_pool.default_warc_writer.record_builder
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' assert not wwt.writer_pool.default_warc_writer.record_builder.base32
assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
def test_load_plugin():
options = warcprox.Options(port=0, plugins=['warcprox.stats.RunningStats'])
controller = warcprox.controller.WarcproxController(options)
assert isinstance(
controller._postfetch_chain[-1],
warcprox.ListenerPostfetchProcessor)
assert isinstance(
controller._postfetch_chain[-1].listener,
warcprox.stats.RunningStats)
assert isinstance(
controller._postfetch_chain[-2],
warcprox.ListenerPostfetchProcessor)
assert isinstance(
controller._postfetch_chain[-2].listener,
warcprox.stats.RunningStats)
def test_choose_a_port_for_me(warcprox_): def test_choose_a_port_for_me(warcprox_):
options = warcprox.Options() options = warcprox.Options()
options.port = 0 options.port = 0
controller = warcprox.controller.WarcproxController( if warcprox_.service_registry:
service_registry=warcprox_.service_registry, options=options) options.rethinkdb_services_url = 'rethinkdb://localhost/test0/services'
controller = warcprox.controller.WarcproxController(options)
assert controller.proxy.server_port != 0 assert controller.proxy.server_port != 0
assert controller.proxy.server_port != 8000 assert controller.proxy.server_port != 8000
assert controller.proxy.server_address == ( assert controller.proxy.server_address == (
'127.0.0.1', controller.proxy.server_port) '127.0.0.1', controller.proxy.server_port)
th = threading.Thread(target=controller.run_until_shutdown) th = threading.Thread(target=controller.run_until_shutdown)
controller.start()
th.start() th.start()
try: try:
# check that the status api lists the correct port # check that the status api lists the correct port
@ -1417,16 +1449,21 @@ def test_choose_a_port_for_me(warcprox_):
th.join() th.join()
def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback_proxies): def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:%s/a/z' % http_daemon.server_port url = 'http://localhost:%s/a/z' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies) response = requests.get(url, proxies=archiving_proxies)
assert response.headers['via'] == '1.1 warcprox' assert response.headers['via'] == '1.1 warcprox'
playback_response = _poll_playback_until( # wait for postfetch chain
playback_proxies, url, status=200, timeout_sec=10) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
playback_response = requests.get(
url, proxies=playback_proxies, verify=False)
assert response.status_code == 200 assert response.status_code == 200
assert not 'via' in playback_response assert not 'via' in playback_response
warc = warcprox_.warc_writer_threads[0].writer_pool.default_warc_writer._fpath warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath
with open(warc, 'rb') as f: with open(warc, 'rb') as f:
for record in warcio.archiveiterator.ArchiveIterator(f): for record in warcio.archiveiterator.ArchiveIterator(f):
if record.rec_headers.get_header('warc-target-uri') == url: if record.rec_headers.get_header('warc-target-uri') == url:
@ -1449,15 +1486,19 @@ def test_slash_in_warc_prefix(warcprox_, http_daemon, archiving_proxies):
assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix' assert response.reason == 'request rejected by warcprox: slash and backslash are not permitted in warc-prefix'
def test_crawl_log(warcprox_, http_daemon, archiving_proxies): def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
try: try:
os.unlink(os.path.join(warcprox_.options.crawl_log_dir, 'crawl.log')) os.unlink(os.path.join(warcprox_.options.crawl_log_dir, 'crawl.log'))
except: except:
pass pass
# should go to default crawl log
url = 'http://localhost:%s/b/aa' % http_daemon.server_port url = 'http://localhost:%s/b/aa' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies) response = requests.get(url, proxies=archiving_proxies)
assert response.status_code == 200 assert response.status_code == 200
# should go to test_crawl_log_1.log
url = 'http://localhost:%s/b/bb' % http_daemon.server_port url = 'http://localhost:%s/b/bb' % http_daemon.server_port
headers = { headers = {
"Warcprox-Meta": json.dumps({"warc-prefix":"test_crawl_log_1"}), "Warcprox-Meta": json.dumps({"warc-prefix":"test_crawl_log_1"}),
@ -1466,13 +1507,12 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
response = requests.get(url, proxies=archiving_proxies, headers=headers) response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
start = time.time() # wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log') file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file) assert os.path.exists(file)
assert os.stat(file).st_size > 0
assert os.path.exists(os.path.join( assert os.path.exists(os.path.join(
warcprox_.options.crawl_log_dir, 'crawl.log')) warcprox_.options.crawl_log_dir, 'crawl.log'))
@ -1527,13 +1567,12 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
response = requests.get(url, proxies=archiving_proxies, headers=headers) response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 200 assert response.status_code == 200
start = time.time() # wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 3)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log') file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file) assert os.path.exists(file)
assert os.stat(file).st_size > 0
crawl_log_2 = open(file, 'rb').read() crawl_log_2 = open(file, 'rb').read()
@ -1557,17 +1596,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert extra_info['contentSize'] == 145 assert extra_info['contentSize'] == 145
# a request that is not saved to a warc (because of --method-filter) # a request that is not saved to a warc (because of --method-filter)
# currently not logged at all (XXX maybe it should be)
url = 'http://localhost:%s/b/cc' % http_daemon.server_port url = 'http://localhost:%s/b/cc' % http_daemon.server_port
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})} headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})}
response = requests.head(url, proxies=archiving_proxies, headers=headers) response = requests.head(url, proxies=archiving_proxies, headers=headers)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log') file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')
start = time.time()
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file) assert os.path.exists(file)
crawl_log_3 = open(file, 'rb').read() crawl_log_3 = open(file, 'rb').read()
@ -1602,13 +1638,10 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
headers=headers, proxies=archiving_proxies) headers=headers, proxies=archiving_proxies)
assert response.status_code == 204 assert response.status_code == 204
start = time.time() # wait for postfetch chain
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log') wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 5)
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')
assert os.path.exists(file) assert os.path.exists(file)
crawl_log_4 = open(file, 'rb').read() crawl_log_4 = open(file, 'rb').read()
@ -1633,6 +1666,8 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
def test_long_warcprox_meta( def test_long_warcprox_meta(
warcprox_, http_daemon, archiving_proxies, playback_proxies): warcprox_, http_daemon, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url = 'http://localhost:%s/b/g' % http_daemon.server_port url = 'http://localhost:%s/b/g' % http_daemon.server_port
# create a very long warcprox-meta header # create a very long warcprox-meta header
@ -1642,17 +1677,14 @@ def test_long_warcprox_meta(
url, proxies=archiving_proxies, headers=headers, verify=False) url, proxies=archiving_proxies, headers=headers, verify=False)
assert response.status_code == 200 assert response.status_code == 200
# wait for writer thread to process # wait for postfetch chain
time.sleep(0.5) wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
while not all(wwt.idle for wwt in warcprox_.warc_writer_threads):
time.sleep(0.5)
time.sleep(0.5)
# check that warcprox-meta was parsed and honored ("warc-prefix" param) # check that warcprox-meta was parsed and honored ("warc-prefix" param)
assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"]
writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"]
warc_path = os.path.join(writer.directory, writer._f_finalname) warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer()
assert os.path.exists(warc_path) assert os.path.exists(warc_path)
# read the warc # read the warc
@ -1672,7 +1704,6 @@ def test_long_warcprox_meta(
def test_empty_response( def test_empty_response(
warcprox_, http_daemon, https_daemon, archiving_proxies, warcprox_, http_daemon, https_daemon, archiving_proxies,
playback_proxies): playback_proxies):
url = 'http://localhost:%s/empty-response' % http_daemon.server_port url = 'http://localhost:%s/empty-response' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies, verify=False) response = requests.get(url, proxies=archiving_proxies, verify=False)
assert response.status_code == 502 assert response.status_code == 502

View File

@ -89,27 +89,19 @@ def wait(callback, timeout):
raise Exception('timed out waiting for %s to return truthy' % callback) raise Exception('timed out waiting for %s to return truthy' % callback)
def test_special_dont_write_prefix(): def test_special_dont_write_prefix():
class NotifyMe:
def __init__(self):
self.the_list = []
def notify(self, recorded_url, records):
self.the_list.append((recorded_url, records))
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
logging.debug('cd %s', tmpdir) logging.debug('cd %s', tmpdir)
os.chdir(tmpdir) os.chdir(tmpdir)
q = warcprox.TimestampedQueue(maxsize=1) wwt = warcprox.writerthread.WarcWriterThread(Options(prefix='-'))
listener = NotifyMe() wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt = warcprox.writerthread.WarcWriterThread( wwt.outq = warcprox.TimestampedQueue(maxsize=1)
recorded_url_q=q, options=Options(prefix='-'),
listeners=[listener])
try: try:
wwt.start() wwt.start()
# not to be written due to default prefix # not to be written due to default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read() recorder.read()
q.put(RecordedUrl( wwt.inq.put(RecordedUrl(
url='http://example.com/no', content_type='text/plain', url='http://example.com/no', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc', status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3', response_recorder=recorder, remote_ip='127.0.0.3',
@ -118,30 +110,31 @@ def test_special_dont_write_prefix():
# to be written due to warcprox-meta prefix # to be written due to warcprox-meta prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read() recorder.read()
q.put(RecordedUrl( wwt.inq.put(RecordedUrl(
url='http://example.com/yes', content_type='text/plain', url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc', status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3', response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(), timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest, payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': 'normal-warc-prefix'})) warcprox_meta={'warc-prefix': 'normal-warc-prefix'}))
wait(lambda: len(listener.the_list) == 2, 10.0) recorded_url = wwt.outq.get(timeout=10)
assert not listener.the_list[0][1] assert not recorded_url.warc_records
assert listener.the_list[1][1] recorded_url = wwt.outq.get(timeout=10)
assert recorded_url.warc_records
assert wwt.outq.empty()
finally: finally:
wwt.stop.set() wwt.stop.set()
wwt.join() wwt.join()
q = warcprox.TimestampedQueue(maxsize=1) wwt = warcprox.writerthread.WarcWriterThread()
listener = NotifyMe() wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt = warcprox.writerthread.WarcWriterThread( wwt.outq = warcprox.TimestampedQueue(maxsize=1)
recorded_url_q=q, listeners=[listener])
try: try:
wwt.start() wwt.start()
# to be written due to default prefix # to be written due to default prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read() recorder.read()
q.put(RecordedUrl( wwt.inq.put(RecordedUrl(
url='http://example.com/yes', content_type='text/plain', url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc', status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3', response_recorder=recorder, remote_ip='127.0.0.3',
@ -150,16 +143,18 @@ def test_special_dont_write_prefix():
# not to be written due to warcprox-meta prefix # not to be written due to warcprox-meta prefix
recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None) recorder = ProxyingRecorder(io.BytesIO(b'some payload'), None)
recorder.read() recorder.read()
q.put(RecordedUrl( wwt.inq.put(RecordedUrl(
url='http://example.com/no', content_type='text/plain', url='http://example.com/no', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc', status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3', response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(), timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest, payload_digest=recorder.block_digest,
warcprox_meta={'warc-prefix': '-'})) warcprox_meta={'warc-prefix': '-'}))
wait(lambda: len(listener.the_list) == 2, 10.0) recorded_url = wwt.outq.get(timeout=10)
assert listener.the_list[0][1] assert recorded_url.warc_records
assert not listener.the_list[1][1] recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records
assert wwt.outq.empty()
finally: finally:
wwt.stop.set() wwt.stop.set()
wwt.join() wwt.join()

View File

@ -1,7 +1,7 @@
""" """
warcprox/__init__.py - warcprox package main file, contains some utility code warcprox/__init__.py - warcprox package main file, contains some utility code
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -19,6 +19,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA. USA.
""" """
import sys
import datetime
import threading
import time
import logging
from argparse import Namespace as _Namespace from argparse import Namespace as _Namespace
from pkg_resources import get_distribution as _get_distribution from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('warcprox').version __version__ = _get_distribution('warcprox').version
@ -26,7 +31,6 @@ try:
import queue import queue
except ImportError: except ImportError:
import Queue as queue import Queue as queue
import datetime
def digest_str(hash_obj, base32=False): def digest_str(hash_obj, base32=False):
import base64 import base64
@ -92,9 +96,149 @@ class RequestBlockedByRule(Exception):
def __str__(self): def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.msg) return "%s: %s" % (self.__class__.__name__, self.msg)
class BasePostfetchProcessor(threading.Thread):
logger = logging.getLogger("warcprox.BasePostfetchProcessor")
def __init__(self, options=Options()):
threading.Thread.__init__(self, name=self.__class__.__name__)
self.options = options
self.stop = threading.Event()
# these should be set before thread is started
self.inq = None
self.outq = None
self.profiler = None
def run(self):
if self.options.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else:
self._run()
def _get_process_put(self):
'''
Get url(s) from `self.inq`, process url(s), queue to `self.outq`.
Subclasses must implement this. Implementations may operate on
individual urls, or on batches.
May raise queue.Empty.
'''
raise Exception('not implemented')
def _run(self):
logging.info('%s starting up', self)
self._startup()
while not self.stop.is_set():
try:
while True:
try:
self._get_process_put()
except queue.Empty:
if self.stop.is_set():
break
logging.info('%s shutting down', self)
self._shutdown()
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(
'shutting down due to fatal problem: %s: %s',
e.__class__.__name__, e)
self._shutdown()
sys.exit(1)
self.logger.critical(
'%s will try to continue after unexpected error',
self.name, exc_info=True)
time.sleep(0.5)
def _startup(self):
pass
def _shutdown(self):
pass
class BaseStandardPostfetchProcessor(BasePostfetchProcessor):
def _get_process_put(self):
recorded_url = self.inq.get(block=True, timeout=0.5)
self._process_url(recorded_url)
if self.outq:
self.outq.put(recorded_url)
def _process_url(self, recorded_url):
raise Exception('not implemented')
class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
MAX_BATCH_SIZE = 500
MAX_BATCH_SEC = 10
MIN_BATCH_SEC = 2.0
def _get_process_put(self):
batch = []
start = time.time()
while True:
try:
batch.append(self.inq.get(block=True, timeout=0.5))
except queue.Empty:
if self.stop.is_set():
break
# else maybe keep adding to the batch
if len(batch) >= self.MAX_BATCH_SIZE:
break # full batch
elapsed = time.time() - start
if elapsed >= self.MAX_BATCH_SEC:
break # been batching for a while
if (elapsed >= self.MIN_BATCH_SEC and self.outq
and len(self.outq.queue) == 0):
break # next processor is waiting on us
if not batch:
raise queue.Empty
self.logger.info(
'gathered batch of %s in %0.2f sec',
len(batch), time.time() - start)
self._process_batch(batch)
if self.outq:
for recorded_url in batch:
self.outq.put(recorded_url)
def _process_batch(self, batch):
raise Exception('not implemented')
class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
def __init__(self, listener, options=Options()):
BaseStandardPostfetchProcessor.__init__(self, options)
self.listener = listener
self.name = listener.__class__.__name__
def _process_url(self, recorded_url):
return self.listener.notify(recorded_url, recorded_url.warc_records)
def start(self):
if hasattr(self.listener, 'start'):
self.listener.start()
BaseStandardPostfetchProcessor.start(self)
def _shutdown(self):
if hasattr(self.listener, 'stop'):
try:
self.listener.stop()
except:
self.logger.error(
'%s raised exception', listener.stop, exc_info=True)
# monkey-patch log levels TRACE and NOTICE # monkey-patch log levels TRACE and NOTICE
TRACE = 5 TRACE = 5
import logging
def _logger_trace(self, msg, *args, **kwargs): def _logger_trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE): if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs) self._log(TRACE, msg, args, **kwargs)
@ -103,7 +247,6 @@ logging.trace = logging.root.trace
logging.addLevelName(TRACE, 'TRACE') logging.addLevelName(TRACE, 'TRACE')
NOTICE = (logging.INFO + logging.WARN) // 2 NOTICE = (logging.INFO + logging.WARN) // 2
import logging
def _logger_notice(self, msg, *args, **kwargs): def _logger_notice(self, msg, *args, **kwargs):
if self.isEnabledFor(NOTICE): if self.isEnabledFor(NOTICE):
self._log(NOTICE, msg, args, **kwargs) self._log(NOTICE, msg, args, **kwargs)

View File

@ -215,7 +215,7 @@ class RethinkCaptures:
if self._timer: if self._timer:
self._timer.join() self._timer.join()
class RethinkCapturesDedup: class RethinkCapturesDedup(warcprox.dedup.DedupDb):
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):

View File

@ -4,7 +4,7 @@ starting up and shutting down the various components of warcprox, and for
sending heartbeats to the service registry if configured to do so; also has sending heartbeats to the service registry if configured to do so; also has
some memory profiling capabilities some memory profiling capabilities
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -27,55 +27,190 @@ from __future__ import absolute_import
import logging import logging
import threading import threading
import time import time
import warcprox
import sys import sys
import gc import gc
import datetime import datetime
import warcprox
import certauth
import functools
import doublethink
import importlib
class Factory:
@staticmethod
def dedup_db(options):
if options.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif options.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif options.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif options.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(
cdx_url=options.cdxserver_dedup)
elif options.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(options.dedup_db_file, options=options)
return dedup_db
@staticmethod
def stats_processor(options):
# return warcprox.stats.StatsProcessor(options)
if options.rethinkdb_stats_url:
stats_processor = warcprox.stats.RethinkStatsProcessor(options)
elif options.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_processor = None
else:
stats_processor = warcprox.stats.StatsProcessor(options)
return stats_processor
@staticmethod
def warc_writer(options):
return warcprox.writerthread.WarcWriterThread(options)
@staticmethod
def playback_proxy(ca, options):
if options.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
else:
playback_index_db = None
playback_proxy = None
return playback_proxy
@staticmethod
def crawl_logger(options):
if options.crawl_log_dir:
return warcprox.crawl_log.CrawlLogger(
options.crawl_log_dir, options=options)
else:
return None
@staticmethod
def plugin(qualname):
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
plugin = class_()
plugin.notify # make sure it has this method
return plugin
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
@staticmethod
def service_registry(options):
if options.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
return doublethink.ServiceRegistry(rr, table=parsed.table)
else:
return None
class WarcproxController(object): class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController") logger = logging.getLogger("warcprox.controller.WarcproxController")
HEARTBEAT_INTERVAL = 20.0 HEARTBEAT_INTERVAL = 20.0
def __init__( def __init__(self, options=warcprox.Options()):
self, proxy=None, warc_writer_threads=None, playback_proxy=None,
service_registry=None, options=warcprox.Options()):
""" """
Create warcprox controller. Create warcprox controller based on `options`.
If supplied, `proxy` should be an instance of WarcProxy, and
`warc_writer_threads` should be a list of WarcWriterThread instances.
If not supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If
not supplied, no playback proxy will run.
""" """
if proxy is not None: self.options = options
self.proxy = proxy
else:
self.proxy = warcprox.warcproxy.WarcProxy(options=options)
if warc_writer_threads is not None:
self.warc_writer_threads = warc_writer_threads
else:
self.warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i,
recorded_url_q=self.proxy.recorded_url_q,
listeners=[self.proxy.running_stats], options=options)
for i in range(int(self.proxy.max_threads ** 0.5))]
self.proxy_thread = None self.proxy_thread = None
self.playback_proxy_thread = None self.playback_proxy_thread = None
self.playback_proxy = playback_proxy
self.service_registry = service_registry
self.options = options
self._last_rss = None self._last_rss = None
self.stop = threading.Event() self.stop = threading.Event()
self._start_stop_lock = threading.Lock() self._start_stop_lock = threading.Lock()
self.stats_processor = Factory.stats_processor(self.options)
self.proxy = warcprox.warcproxy.WarcProxy(
self.stats_processor, self.postfetch_status, options)
self.playback_proxy = Factory.playback_proxy(
self.proxy.ca, self.options)
self.build_postfetch_chain(self.proxy.recorded_url_q)
self.service_registry = Factory.service_registry(options)
def postfetch_status(self):
result = {'postfetch_chain': []}
for processor in self._postfetch_chain:
if processor.__class__ == warcprox.ListenerPostfetchProcessor:
name = processor.listener.__class__.__name__
else:
name = processor.__class__.__name__
queued = len(processor.inq.queue)
if hasattr(processor, 'batch'):
queued += len(processor.batch)
result['postfetch_chain'].append({
'processor': name,
'queued_urls': len(processor.inq.queue)})
return result
def chain(self, processor0, processor1):
'''
Sets `processor0.outq` = `processor1.inq` = `queue.Queue()`
'''
assert not processor0.outq
assert not processor1.inq
q = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
processor0.outq = q
processor1.inq = q
def build_postfetch_chain(self, inq):
self._postfetch_chain = []
self.dedup_db = Factory.dedup_db(self.options)
if self.dedup_db:
self._postfetch_chain.append(self.dedup_db.loader())
self.warc_writer_thread = Factory.warc_writer(self.options)
self._postfetch_chain.append(self.warc_writer_thread)
if self.dedup_db:
self._postfetch_chain.append(self.dedup_db.storer())
if self.stats_processor:
self._postfetch_chain.append(self.stats_processor)
if self.playback_proxy:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.playback_proxy.playback_index_db, self.options))
crawl_logger = Factory.crawl_logger(self.options)
if crawl_logger:
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
crawl_logger, self.options))
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(
self.proxy.running_stats, self.options))
for qualname in self.options.plugins or []:
plugin = Factory.plugin(qualname)
self._postfetch_chain.append(
warcprox.ListenerPostfetchProcessor(plugin, self.options))
# chain them all up
self._postfetch_chain[0].inq = inq
for i in range(1, len(self._postfetch_chain)):
self.chain(self._postfetch_chain[i-1], self._postfetch_chain[i])
def debug_mem(self): def debug_mem(self):
self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize()) self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize())
with open("/proc/self/status") as f: with open("/proc/self/status") as f:
@ -162,35 +297,27 @@ class WarcproxController(object):
self.logger.info('warcprox is already running') self.logger.info('warcprox is already running')
return return
if self.proxy.stats_db:
self.proxy.stats_db.start()
self.proxy_thread = threading.Thread( self.proxy_thread = threading.Thread(
target=self.proxy.serve_forever, name='ProxyThread') target=self.proxy.serve_forever, name='ProxyThread')
self.proxy_thread.start() self.proxy_thread.start()
assert(all( if self.playback_proxy:
wwt.dedup_db is self.warc_writer_threads[0].dedup_db
for wwt in self.warc_writer_threads))
if any((t.dedup_db for t in self.warc_writer_threads)):
self.warc_writer_threads[0].dedup_db.start()
for wwt in self.warc_writer_threads:
wwt.start()
if self.playback_proxy is not None:
self.playback_proxy_thread = threading.Thread( self.playback_proxy_thread = threading.Thread(
target=self.playback_proxy.serve_forever, target=self.playback_proxy.serve_forever,
name='PlaybackProxyThread') name='PlaybackProxyThread')
self.playback_proxy_thread.start() self.playback_proxy_thread.start()
for processor in self._postfetch_chain:
processor.start()
def shutdown(self): def shutdown(self):
with self._start_stop_lock: with self._start_stop_lock:
if not self.proxy_thread or not self.proxy_thread.is_alive(): if not self.proxy_thread or not self.proxy_thread.is_alive():
self.logger.info('warcprox is not running') self.logger.info('warcprox is not running')
return return
for wwt in self.warc_writer_threads: for processor in self._postfetch_chain:
wwt.stop.set() processor.stop.set()
self.proxy.shutdown() self.proxy.shutdown()
self.proxy.server_close() self.proxy.server_close()
@ -200,12 +327,8 @@ class WarcproxController(object):
if self.playback_proxy.playback_index_db is not None: if self.playback_proxy.playback_index_db is not None:
self.playback_proxy.playback_index_db.close() self.playback_proxy.playback_index_db.close()
# wait for threads to finish for processor in self._postfetch_chain:
for wwt in self.warc_writer_threads: processor.join()
wwt.join()
if self.proxy.stats_db:
self.proxy.stats_db.stop()
self.proxy_thread.join() self.proxy_thread.join()
if self.playback_proxy is not None: if self.playback_proxy is not None:
@ -288,18 +411,17 @@ class WarcproxController(object):
'aggregate performance profile of %s proxy threads:\n%s', 'aggregate performance profile of %s proxy threads:\n%s',
len(files), buf.getvalue()) len(files), buf.getvalue())
# warc writer threads # postfetch processors
files = [] for processor in self._postfetch_chain:
for wwt in self.warc_writer_threads: if not processor.profiler:
file = os.path.join(tmpdir, '%s.dat' % wwt.ident) self.logger.notice('%s has no profiling data', processor)
wwt.profiler.dump_stats(file) continue
files.append(file) file = os.path.join(tmpdir, '%s.dat' % processor.ident)
processor.profiler.dump_stats(file)
buf = io.StringIO() buf = io.StringIO()
stats = pstats.Stats(*files, stream=buf) stats = pstats.Stats(file, stream=buf)
stats.sort_stats('cumulative') stats.sort_stats('cumulative')
stats.print_stats(0.1) stats.print_stats(0.1)
self.logger.notice( self.logger.notice(
'aggregate performance profile of %s warc writer threads:\n%s', 'performance profile of %s:\n%s', processor,
len(self.warc_writer_threads), buf.getvalue()) buf.getvalue())

View File

@ -1,7 +1,7 @@
''' '''
warcprox/dedup.py - identical payload digest deduplication using sqlite db warcprox/dedup.py - identical payload digest deduplication using sqlite db
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -32,9 +32,19 @@ import doublethink
import datetime import datetime
import urllib3 import urllib3
from urllib3.exceptions import HTTPError from urllib3.exceptions import HTTPError
import collections
urllib3.disable_warnings() urllib3.disable_warnings()
class DedupLoader(warcprox.BaseStandardPostfetchProcessor):
def __init__(self, dedup_db, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.dedup_db = dedup_db
def _process_url(self, recorded_url):
decorate_with_dedup_info(
self.dedup_db, recorded_url, self.options.base32)
class DedupDb(object): class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb") logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -61,6 +71,12 @@ class DedupDb(object):
conn.commit() conn.commit()
conn.close() conn.close()
def loader(self, *args, **kwargs):
return DedupLoader(self, self.options)
def storer(self, *args, **kwargs):
return warcprox.ListenerPostfetchProcessor(self, self.options)
def save(self, digest_key, response_record, bucket=""): def save(self, digest_key, response_record, bucket=""):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
@ -106,20 +122,20 @@ class DedupDb(object):
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if (recorded_url.response_recorder if (recorded_url.response_recorder
and recorded_url.payload_digest and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.payload_digest, base32) digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], recorded_url.dedup_info = dedup_db.lookup(
recorded_url.url) digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
else: else:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.dedup_info = dedup_db.lookup(
url=recorded_url.url) digest_key, url=recorded_url.url)
class RethinkDedupDb: class RethinkDedupDb(DedupDb):
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
@ -181,7 +197,7 @@ class RethinkDedupDb:
else: else:
self.save(digest_key, records[0]) self.save(digest_key, records[0])
class CdxServerDedup(object): class CdxServerDedup(DedupDb):
"""Query a CDX server to perform deduplication. """Query a CDX server to perform deduplication.
""" """
logger = logging.getLogger("warcprox.dedup.CdxServerDedup") logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
@ -244,7 +260,82 @@ class CdxServerDedup(object):
""" """
pass pass
class TroughDedupDb(object): class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _filter_and_bucketize(self, batch):
'''
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should
have dedup info stored.
'''
buckets = collections.defaultdict(list)
for recorded_url in batch:
if (recorded_url.warc_records
and recorded_url.warc_records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
return buckets
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
self.trough_dedup_db.batch_save(buckets[bucket], bucket)
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
def __init__(self, trough_dedup_db, options=warcprox.Options()):
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
self.trough_dedup_db = trough_dedup_db
def _startup(self):
self.trough_dedup_db.start()
def _filter_and_bucketize(self, batch):
'''
Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not
be looked up.
'''
buckets = collections.defaultdict(list)
for recorded_url in batch:
if (recorded_url.response_recorder
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
if (recorded_url.warcprox_meta
and 'captures-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['captures-bucket']
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
return buckets
def _build_key_index(self, batch):
'''
Returns `{digest_key: [recorded_url, ...]}`.
'''
key_index = collections.defaultdict(list)
for recorded_url in batch:
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
key_index[digest_key].append(recorded_url)
return key_index
def _process_batch(self, batch):
buckets = self._filter_and_bucketize(batch)
for bucket in buckets:
key_index = self._build_key_index(buckets[bucket])
results = self.trough_dedup_db.batch_lookup(
key_index.keys(), bucket)
for result in results:
for recorded_url in key_index[result['digest_key']]:
recorded_url.dedup_info = result
class TroughDedupDb(DedupDb):
''' '''
https://github.com/internetarchive/trough https://github.com/internetarchive/trough
''' '''
@ -256,7 +347,8 @@ class TroughDedupDb(object):
' url varchar(2100) not null,\n' ' url varchar(2100) not null,\n'
' date datetime not null,\n' ' date datetime not null,\n'
' id varchar(100));\n') # warc record id ' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) ' WRITE_SQL_TMPL = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values (%s, %s, %s, %s);') 'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
@ -264,6 +356,12 @@ class TroughDedupDb(object):
self._trough_cli = warcprox.trough.TroughClient( self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60) options.rethinkdb_trough_db_url, promotion_interval=60*60)
def loader(self, *args, **kwargs):
return BatchTroughLoader(self, self.options)
def storer(self, *args, **kwargs):
return BatchTroughStorer(self, self.options)
def start(self): def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL) self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
@ -275,6 +373,21 @@ class TroughDedupDb(object):
bucket, self.WRITE_SQL_TMPL, bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID) (digest_key, url, warc_date, record_id), self.SCHEMA_ID)
def batch_save(self, batch, bucket='__unspecified__'):
sql_tmpl = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values %s;' % ','.join(
'(%s,%s,%s,%s)' for i in range(len(batch))))
values = []
for recorded_url in batch:
values.extend([
warcprox.digest_str(
recorded_url.payload_digest, self.options.base32),
recorded_url.url,
recorded_url.warc_records[0].date,
recorded_url.warc_records[0].id,])
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID)
def lookup(self, digest_key, bucket='__unspecified__', url=None): def lookup(self, digest_key, bucket='__unspecified__', url=None):
results = self._trough_cli.read( results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;', bucket, 'select * from dedup where digest_key=%s;',
@ -291,6 +404,23 @@ class TroughDedupDb(object):
else: else:
return None return None
def batch_lookup(self, digest_keys, bucket='__unspecified__'):
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
','.join('%s' for i in range(len(digest_keys))))
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
if results is None:
return []
self.logger.debug(
'trough batch lookup of %s keys returned %s results',
len(digest_keys), len(results))
assert len(results) >= 0 and len(results) <= len(digest_keys)
for result in results:
result['id'] = result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
result['date'] = result['date'].encode('ascii')
result['digest_key'] = result['digest_key'].encode('ascii')
return results
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records and records[0].type == b'response' if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):

View File

@ -42,7 +42,6 @@ import certauth.certauth
import warcprox import warcprox
import doublethink import doublethink
import cryptography.hazmat.backends.openssl import cryptography.hazmat.backends.openssl
import importlib
class BetterArgumentDefaultsHelpFormatter( class BetterArgumentDefaultsHelpFormatter(
argparse.ArgumentDefaultsHelpFormatter, argparse.ArgumentDefaultsHelpFormatter,
@ -61,7 +60,7 @@ class BetterArgumentDefaultsHelpFormatter(
else: else:
return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action) return argparse.ArgumentDefaultsHelpFormatter._get_help_string(self, action)
def _build_arg_parser(prog): def _build_arg_parser(prog='warcprox'):
arg_parser = argparse.ArgumentParser(prog=prog, arg_parser = argparse.ArgumentParser(prog=prog,
description='warcprox - WARC writing MITM HTTP/S proxy', description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=BetterArgumentDefaultsHelpFormatter) formatter_class=BetterArgumentDefaultsHelpFormatter)
@ -119,9 +118,9 @@ def _build_arg_parser(prog):
arg_parser.add_argument('-P', '--playback-port', dest='playback_port', arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback') type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', # arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db', # default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)') # help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group() group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
@ -201,12 +200,12 @@ def dump_state(signum=None, frame=None):
'dumping state (caught signal %s)\n%s', 'dumping state (caught signal %s)\n%s',
signum, '\n'.join(state_strs)) signum, '\n'.join(state_strs))
def init_controller(args): def parse_args(argv):
''' '''
Creates a warcprox.controller.WarcproxController configured according to Parses command line arguments with argparse.
the supplied arguments (normally the result of parse_args(sys.argv)).
''' '''
options = warcprox.Options(**vars(args)) arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
try: try:
hashlib.new(args.digest_algorithm) hashlib.new(args.digest_algorithm)
@ -214,106 +213,6 @@ def init_controller(args):
logging.fatal(e) logging.fatal(e)
exit(1) exit(1)
listeners = []
if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
elif args.rethinkdb_big_table_url:
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif args.cdxserver_dedup:
cdxserver_maxsize = args.writer_threads or 200
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup,
maxsize=cdxserver_maxsize)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None
else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
if dedup_db:
listeners.append(dedup_db)
if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled')
stats_db = None
else:
stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db)
recorded_url_q = warcprox.TimestampedQueue(maxsize=args.queue_size)
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir,
ca_name=ca_name)
proxy = warcprox.warcproxy.WarcProxy(
ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db,
options=options)
listeners.append(proxy.running_stats)
if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(
args.playback_index_db_file, options=options)
playback_proxy = warcprox.playback.PlaybackProxy(
ca=ca, playback_index_db=playback_index_db, options=options)
listeners.append(playback_index_db)
else:
playback_index_db = None
playback_proxy = None
if args.crawl_log_dir:
listeners.append(warcprox.crawl_log.CrawlLogger(
args.crawl_log_dir, options=options))
for qualname in args.plugins or []:
try:
(module_name, class_name) = qualname.rsplit('.', 1)
module_ = importlib.import_module(module_name)
class_ = getattr(module_, class_name)
listener = class_()
listener.notify # make sure it has this method
listeners.append(listener)
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
sys.exit(1)
writer_pool = warcprox.writer.WarcWriterPool(options=options)
# number of warc writer threads = sqrt(proxy.max_threads)
# I came up with this out of thin air because it strikes me as reasonable
# 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45
num_writer_threads = args.writer_threads or int(proxy.max_threads ** 0.5)
logging.debug('initializing %d warc writer threads', num_writer_threads)
warc_writer_threads = [
warcprox.writerthread.WarcWriterThread(
name='WarcWriterThread%03d' % i, recorded_url_q=recorded_url_q,
writer_pool=writer_pool, dedup_db=dedup_db,
listeners=listeners, options=options)
for i in range(num_writer_threads)]
if args.rethinkdb_services_url:
parsed = doublethink.parse_rethinkdb_url(
options.rethinkdb_services_url)
rr = doublethink.Rethinker(servers=parsed.hosts, db=parsed.database)
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
else:
svcreg = None
controller = warcprox.controller.WarcproxController(
proxy, warc_writer_threads, playback_proxy,
service_registry=svcreg, options=options)
return controller
def parse_args(argv):
'''
Parses command line arguments with argparse.
'''
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:])
return args return args
def main(argv=None): def main(argv=None):
@ -339,7 +238,8 @@ def main(argv=None):
# see https://github.com/pyca/cryptography/issues/2911 # see https://github.com/pyca/cryptography/issues/2911
cryptography.hazmat.backends.openssl.backend.activate_builtin_random() cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
controller = init_controller(args) options = warcprox.Options(**vars(args))
controller = warcprox.controller.WarcproxController(options)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())
@ -410,7 +310,8 @@ def ensure_rethinkdb_tables(argv=None):
svcreg = doublethink.ServiceRegistry(rr, table=parsed.table) svcreg = doublethink.ServiceRegistry(rr, table=parsed.table)
did_something = True did_something = True
if args.rethinkdb_stats_url: if args.rethinkdb_stats_url:
stats_db = warcprox.stats.RethinkStatsDb(options=options) stats_db = warcprox.stats.RethinkStatsProcessor(options=options)
stats_db._ensure_db_table()
did_something = True did_something = True
if args.rethinkdb_dedup_url: if args.rethinkdb_dedup_url:
dedup_db = warcprox.dedup.RethinkDedupDb(options=options) dedup_db = warcprox.dedup.RethinkDedupDb(options=options)
@ -421,7 +322,7 @@ def ensure_rethinkdb_tables(argv=None):
if args.rethinkdb_trough_db_url: if args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options) dedup_db = warcprox.dedup.TroughDedupDb(options)
logging.warn( logging.warn(
'trough it responsible for creating most of the rethinkdb ' 'trough is responsible for creating most of the rethinkdb '
'tables that it uses') 'tables that it uses')
did_something = True did_something = True

View File

@ -62,6 +62,8 @@ except ImportError:
import concurrent.futures import concurrent.futures
import urlcanon import urlcanon
import time import time
import collections
import cProfile
class ProxyingRecorder(object): class ProxyingRecorder(object):
""" """
@ -562,9 +564,14 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
# See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html # See also https://blog.dubbelboer.com/2012/04/09/syn-cookies.html
request_queue_size = 4096 request_queue_size = 4096
def __init__(self, max_threads, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
PooledMixIn.__init__(self, max_threads) if options.max_threads:
self.profilers = {} self.logger.info(
'max_threads=%s set by command line option',
options.max_threads)
PooledMixIn.__init__(self, options.max_threads)
self.profilers = collections.defaultdict(cProfile.Profile)
if options.profile: if options.profile:
self.process_request_thread = self._profile_process_request_thread self.process_request_thread = self._profile_process_request_thread
@ -572,9 +579,6 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
self.process_request_thread = self._process_request_thread self.process_request_thread = self._process_request_thread
def _profile_process_request_thread(self, request, client_address): def _profile_process_request_thread(self, request, client_address):
if not threading.current_thread().ident in self.profilers:
import cProfile
self.profilers[threading.current_thread().ident] = cProfile.Profile()
profiler = self.profilers[threading.current_thread().ident] profiler = self.profilers[threading.current_thread().ident]
profiler.enable() profiler.enable()
self._process_request_thread(request, client_address) self._process_request_thread(request, client_address)

View File

@ -121,9 +121,6 @@ class PlaybackProxyHandler(MitmProxyHandler):
def _send_headers_and_refd_payload( def _send_headers_and_refd_payload(
self, headers, refers_to_target_uri, refers_to_date, payload_digest): self, headers, refers_to_target_uri, refers_to_date, payload_digest):
"""Parameters:
"""
location = self.server.playback_index_db.lookup_exact( location = self.server.playback_index_db.lookup_exact(
refers_to_target_uri, refers_to_date, payload_digest) refers_to_target_uri, refers_to_date, payload_digest)
self.logger.debug('loading http payload from {}'.format(location)) self.logger.debug('loading http payload from {}'.format(location))
@ -133,11 +130,13 @@ class PlaybackProxyHandler(MitmProxyHandler):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True): for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass pass
if not record:
raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename))
if errors: if errors:
raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors)) raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE) if record.type != warctools.WarcRecord.RESPONSE:
if warc_type != warctools.WarcRecord.RESPONSE:
raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type)) raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type))
# find end of headers # find end of headers
@ -158,12 +157,13 @@ class PlaybackProxyHandler(MitmProxyHandler):
for (offset, record, errors) in fh.read_records(limit=1, offsets=True): for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass pass
if not record:
raise Exception('failed to read record at offset %s from %s' % (offset, warcfilename))
if errors: if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE) if record.type == warctools.WarcRecord.RESPONSE:
if warc_type == warctools.WarcRecord.RESPONSE:
headers_buf = bytearray() headers_buf = bytearray()
while True: while True:
line = record.content_file.readline() line = record.content_file.readline()
@ -173,7 +173,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
return self._send_response(headers_buf, record.content_file) return self._send_response(headers_buf, record.content_file)
elif warc_type == warctools.WarcRecord.REVISIT: elif record.type == warctools.WarcRecord.REVISIT:
# response consists of http headers from revisit record and # response consists of http headers from revisit record and
# payload from the referenced record # payload from the referenced record
warc_profile = record.get_header(warctools.WarcRecord.PROFILE) warc_profile = record.get_header(warctools.WarcRecord.PROFILE)

View File

@ -53,45 +53,88 @@ def _empty_bucket(bucket):
}, },
} }
class StatsDb: class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
logger = logging.getLogger("warcprox.stats.StatsDb") logger = logging.getLogger("warcprox.stats.StatsProcessor")
def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()): def _startup(self):
self.file = file if os.path.exists(self.options.stats_db_file):
self.options = options self.logger.info(
self._lock = threading.RLock() 'opening existing stats database %s',
self.options.stats_db_file)
else:
self.logger.info(
'creating new stats database %s',
self.options.stats_db_file)
def start(self): conn = sqlite3.connect(self.options.stats_db_file)
with self._lock: conn.execute(
if os.path.exists(self.file): 'create table if not exists buckets_of_stats ('
self.logger.info( ' bucket varchar(300) primary key,'
'opening existing stats database %s', self.file) ' stats varchar(4000)'
else: ');')
self.logger.info( conn.commit()
'creating new stats database %s', self.file) conn.close()
conn = sqlite3.connect(self.file) self.logger.info(
'created table buckets_of_stats in %s',
self.options.stats_db_file)
def _process_batch(self, batch):
batch_buckets = self._tally_batch(batch)
self._update_db(batch_buckets)
logging.trace('updated stats from batch of %s', len(batch))
def _update_db(self, batch_buckets):
conn = sqlite3.connect(self.options.stats_db_file)
for bucket in batch_buckets:
bucket_stats = batch_buckets[bucket]
cursor = conn.execute(
'select stats from buckets_of_stats where bucket=?',
(bucket,))
result_tuple = cursor.fetchone()
cursor.close()
if result_tuple:
old_bucket_stats = json.loads(result_tuple[0])
bucket_stats['total']['urls'] += old_bucket_stats['total']['urls']
bucket_stats['total']['wire_bytes'] += old_bucket_stats['total']['wire_bytes']
bucket_stats['revisit']['urls'] += old_bucket_stats['revisit']['urls']
bucket_stats['revisit']['wire_bytes'] += old_bucket_stats['revisit']['wire_bytes']
bucket_stats['new']['urls'] += old_bucket_stats['new']['urls']
bucket_stats['new']['wire_bytes'] += old_bucket_stats['new']['wire_bytes']
json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute( conn.execute(
'create table if not exists buckets_of_stats (' 'insert or replace into buckets_of_stats '
' bucket varchar(300) primary key,' '(bucket, stats) values (?, ?)', (bucket, json_value))
' stats varchar(4000)'
');')
conn.commit() conn.commit()
conn.close() conn.close()
self.logger.info('created table buckets_of_stats in %s', self.file) def _tally_batch(self, batch):
batch_buckets = {}
for recorded_url in batch:
for bucket in self.buckets(recorded_url):
bucket_stats = batch_buckets.get(bucket)
if not bucket_stats:
bucket_stats = _empty_bucket(bucket)
batch_buckets[bucket] = bucket_stats
def stop(self): bucket_stats["total"]["urls"] += 1
pass bucket_stats["total"]["wire_bytes"] += recorded_url.size
def close(self): if recorded_url.warc_records:
pass if recorded_url.warc_records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
def sync(self): bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
pass else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
return batch_buckets
def value(self, bucket0="__all__", bucket1=None, bucket2=None): def value(self, bucket0="__all__", bucket1=None, bucket2=None):
conn = sqlite3.connect(self.file) conn = sqlite3.connect(self.options.stats_db_file)
cursor = conn.execute( cursor = conn.execute(
'select stats from buckets_of_stats where bucket = ?', 'select stats from buckets_of_stats where bucket = ?',
(bucket0,)) (bucket0,))
@ -109,9 +152,6 @@ class StatsDb:
else: else:
return None return None
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
def buckets(self, recorded_url): def buckets(self, recorded_url):
''' '''
Unravels bucket definitions in Warcprox-Meta header. Each bucket Unravels bucket definitions in Warcprox-Meta header. Each bucket
@ -154,117 +194,20 @@ class StatsDb:
return buckets return buckets
def tally(self, recorded_url, records): class RethinkStatsProcessor(StatsProcessor):
with self._lock: logger = logging.getLogger("warcprox.stats.RethinkStatsProcessor")
conn = sqlite3.connect(self.file)
for bucket in self.buckets(recorded_url):
cursor = conn.execute(
'select stats from buckets_of_stats where bucket=?',
(bucket,))
result_tuple = cursor.fetchone()
cursor.close()
if result_tuple:
bucket_stats = json.loads(result_tuple[0])
else:
bucket_stats = _empty_bucket(bucket)
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute(
'insert or replace into buckets_of_stats '
'(bucket, stats) values (?, ?)', (bucket, json_value))
conn.commit()
conn.close()
class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
StatsProcessor.__init__(self, options)
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url) parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url)
self.rr = doublethink.Rethinker( self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database) servers=parsed.hosts, db=parsed.database)
self.table = parsed.table self.table = parsed.table
self.replicas = min(3, len(self.rr.servers)) self.replicas = min(3, len(self.rr.servers))
def _startup(self):
self._ensure_db_table() self._ensure_db_table()
self.options = options
self._stop = threading.Event()
self._batch_lock = threading.RLock()
with self._batch_lock:
self._batch = {}
self._timer = None
def start(self):
"""Starts batch update repeating timer."""
self._update_batch() # starts repeating timer
def _bucket_batch_update_reql(self, bucket, batch):
return self.rr.table(self.table).get(bucket).replace(
lambda old: r.branch(
old.eq(None), batch[bucket], old.merge({
"total": {
"urls": old["total"]["urls"].add(
batch[bucket]["total"]["urls"]),
"wire_bytes": old["total"]["wire_bytes"].add(
batch[bucket]["total"]["wire_bytes"]),
},
"new": {
"urls": old["new"]["urls"].add(
batch[bucket]["new"]["urls"]),
"wire_bytes": old["new"]["wire_bytes"].add(
batch[bucket]["new"]["wire_bytes"]),
},
"revisit": {
"urls": old["revisit"]["urls"].add(
batch[bucket]["revisit"]["urls"]),
"wire_bytes": old["revisit"]["wire_bytes"].add(
batch[bucket]["revisit"]["wire_bytes"]),
},
})))
def _update_batch(self):
with self._batch_lock:
batch_copy = copy.deepcopy(self._batch)
self._batch = {}
try:
if len(batch_copy) > 0:
# XXX can all the buckets be done in one query?
for bucket in batch_copy:
result = self._bucket_batch_update_reql(
bucket, batch_copy).run()
if (not result["inserted"] and not result["replaced"]
or sorted(result.values()) != [0,0,0,0,0,1]):
raise Exception(
"unexpected result %s updating stats %s" % (
result, batch_copy[bucket]))
except Exception as e:
self.logger.error("problem updating stats", exc_info=True)
# now we need to restore the stats that didn't get saved to the
# batch so that they are saved in the next call to _update_batch()
with self._batch_lock:
self._add_to_batch(batch_copy)
finally:
if not self._stop.is_set():
self._timer = threading.Timer(2.0, self._update_batch)
self._timer.name = "RethinkStats-batch-update-timer-%s" % (
datetime.datetime.utcnow().isoformat())
self._timer.start()
else:
self.logger.info("finished")
def _ensure_db_table(self): def _ensure_db_table(self):
dbs = self.rr.db_list().run() dbs = self.rr.db_list().run()
@ -282,17 +225,38 @@ class RethinkStatsDb(StatsDb):
self.table, primary_key="bucket", shards=1, self.table, primary_key="bucket", shards=1,
replicas=self.replicas).run() replicas=self.replicas).run()
def close(self): def _update_db(self, batch_buckets):
self.stop() # XXX can all the buckets be done in one query?
for bucket in batch_buckets:
result = self._bucket_batch_update_reql(
bucket, batch_buckets[bucket]).run()
if (not result['inserted'] and not result['replaced']
or sorted(result.values()) != [0,0,0,0,0,1]):
self.logger.error(
'unexpected result %s updating stats %s' % (
result, batch_buckets[bucket]))
def stop(self): def _bucket_batch_update_reql(self, bucket, new):
self.logger.info("stopping rethinkdb stats table batch updates") return self.rr.table(self.table).get(bucket).replace(
self._stop.set() lambda old: r.branch(
if self._timer: old.eq(None), new, old.merge({
self._timer.join() 'total': {
'urls': old['total']['urls'].add(new['total']['urls']),
def sync(self): 'wire_bytes': old['total']['wire_bytes'].add(
pass new['total']['wire_bytes']),
},
'new': {
'urls': old['new']['urls'].add(new['new']['urls']),
'wire_bytes': old['new']['wire_bytes'].add(
new['new']['wire_bytes']),
},
'revisit': {
'urls': old['revisit']['urls'].add(
new['revisit']['urls']),
'wire_bytes': old['revisit']['wire_bytes'].add(
new['revisit']['wire_bytes']),
},
})))
def value(self, bucket0="__all__", bucket1=None, bucket2=None): def value(self, bucket0="__all__", bucket1=None, bucket2=None):
bucket0_stats = self.rr.table(self.table).get(bucket0).run() bucket0_stats = self.rr.table(self.table).get(bucket0).run()
@ -307,39 +271,6 @@ class RethinkStatsDb(StatsDb):
return bucket0_stats[bucket1] return bucket0_stats[bucket1]
return bucket0_stats return bucket0_stats
def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url)
with self._batch_lock:
for bucket in buckets:
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
def _add_to_batch(self, add_me):
with self._batch_lock:
for bucket in add_me:
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += add_me[bucket]["total"]["urls"]
bucket_stats["total"]["wire_bytes"] += add_me[bucket]["total"]["wire_bytes"]
bucket_stats["revisit"]["urls"] += add_me[bucket]["revisit"]["urls"]
bucket_stats["revisit"]["wire_bytes"] += add_me[bucket]["revisit"]["wire_bytes"]
bucket_stats["new"]["urls"] += add_me[bucket]["new"]["urls"]
bucket_stats["new"]["wire_bytes"] += add_me[bucket]["new"]["wire_bytes"]
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
class RunningStats: class RunningStats:
''' '''
In-memory stats for measuring overall warcprox performance. In-memory stats for measuring overall warcprox performance.

View File

@ -2,7 +2,7 @@
warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic, warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic,
enqueue info on the recorded url queue enqueue info on the recorded url queue
Copyright (C) 2013-2016 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -92,6 +92,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self.url, rule)) self.url, rule))
def _enforce_limit(self, limit_key, limit_value, soft=False): def _enforce_limit(self, limit_key, limit_value, soft=False):
if not self.server.stats_db:
return
bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2) bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2)
_limit_key = limit_key _limit_key = limit_key
@ -328,7 +330,7 @@ class RecordedUrl:
warcprox_meta=None, content_type=None, custom_type=None, warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None, status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None, timestamp=None, host=None, duration=None, referer=None,
payload_digest=None): payload_digest=None, warc_records=None):
# XXX should test what happens with non-ascii url (when does # XXX should test what happens with non-ascii url (when does
# url-encoding happen?) # url-encoding happen?)
if type(url) is not bytes: if type(url) is not bytes:
@ -367,6 +369,7 @@ class RecordedUrl:
self.duration = duration self.duration = duration
self.referer = referer self.referer = referer
self.payload_digest = payload_digest self.payload_digest = payload_digest
self.warc_records = warc_records
# inherit from object so that multiple inheritance from this class works # inherit from object so that multiple inheritance from this class works
# properly in python 2 # properly in python 2
@ -375,8 +378,12 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy") logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__( def __init__(
self, ca=None, recorded_url_q=None, stats_db=None, self, stats_db=None, status_callback=None,
options=warcprox.Options()): options=warcprox.Options()):
self.status_callback = status_callback
self.stats_db = stats_db
self.options = options
server_address = ( server_address = (
options.address or 'localhost', options.address or 'localhost',
options.port if options.port is not None else 8000) options.port if options.port is not None else 8000)
@ -395,22 +402,13 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.digest_algorithm = options.digest_algorithm or 'sha1' self.digest_algorithm = options.digest_algorithm or 'sha1'
if ca is not None: ca_name = ('Warcprox CA on %s' % socket.gethostname())[:64]
self.ca = ca self.ca = CertificateAuthority(
else: ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca',
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] ca_name=ca_name)
self.ca = CertificateAuthority(ca_file='warcprox-ca.pem',
certs_dir='./warcprox-ca',
ca_name=ca_name)
if recorded_url_q is not None: self.recorded_url_q = warcprox.TimestampedQueue(
self.recorded_url_q = recorded_url_q maxsize=options.queue_size or 1000)
else:
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.stats_db = stats_db
self.options = options
self.running_stats = warcprox.stats.RunningStats() self.running_stats = warcprox.stats.RunningStats()
@ -425,6 +423,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
'queued_urls': self.recorded_url_q.qsize(), 'queued_urls': self.recorded_url_q.qsize(),
'queue_max_size': self.recorded_url_q.maxsize, 'queue_max_size': self.recorded_url_q.maxsize,
'seconds_behind': self.recorded_url_q.seconds_behind(), 'seconds_behind': self.recorded_url_q.seconds_behind(),
'urls_processed': self.running_stats.urls,
'warc_bytes_written': self.running_stats.warc_bytes,
'start_time': self.running_stats.first_snap_time,
}) })
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1) elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1)
result['rates_1min'] = { result['rates_1min'] = {
@ -444,22 +445,20 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
'urls_per_sec': urls_per_sec, 'urls_per_sec': urls_per_sec,
'warc_bytes_per_sec': warc_bytes_per_sec, 'warc_bytes_per_sec': warc_bytes_per_sec,
} }
# gets postfetch chain status from the controller
if self.status_callback:
result.update(self.status_callback())
return result return result
class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy") logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__( def __init__(
self, ca=None, recorded_url_q=None, stats_db=None, self, stats_db=None, status_callback=None,
options=warcprox.Options()): options=warcprox.Options()):
if options.max_threads: warcprox.mitmproxy.PooledMitmProxy.__init__(self, options)
self.logger.info(
"max_threads=%s set by command line option",
options.max_threads)
warcprox.mitmproxy.PooledMitmProxy.__init__(
self, options.max_threads, options)
SingleThreadedWarcProxy.__init__( SingleThreadedWarcProxy.__init__(
self, ca, recorded_url_q, stats_db, options) self, stats_db, status_callback, options)
def server_activate(self): def server_activate(self):
http_server.HTTPServer.server_activate(self) http_server.HTTPServer.server_activate(self)

View File

@ -2,7 +2,7 @@
warcprox/writerthread.py - warc writer thread, reads from the recorded url warcprox/writerthread.py - warc writer thread, reads from the recorded url
queue, writes warc records, runs final tasks after warc records are written queue, writes warc records, runs final tasks after warc records are written
Copyright (C) 2013-2017 Internet Archive Copyright (C) 2013-2018 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -28,44 +28,36 @@ except ImportError:
import Queue as queue import Queue as queue
import logging import logging
import threading
import time import time
from datetime import datetime
from hanzo import warctools
import warcprox import warcprox
import sys
class WarcWriterThread(threading.Thread): class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor):
logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") logger = logging.getLogger("warcprox.writerthread.WarcWriterThread")
def __init__(
self, recorded_url_q, name='WarcWriterThread', writer_pool=None,
dedup_db=None, listeners=[], options=warcprox.Options()):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name=name)
self.recorded_url_q = recorded_url_q
self.stop = threading.Event()
if writer_pool:
self.writer_pool = writer_pool
else:
self.writer_pool = warcprox.writer.WarcWriterPool()
self.dedup_db = dedup_db
self.listeners = listeners
self.options = options
self.idle = None
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
def run(self):
if self.options.profile:
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
self._run()
self.profiler.disable()
else:
self._run()
_ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'}
def __init__(self, options=warcprox.Options()):
warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options)
self.options = options
self.writer_pool = warcprox.writer.WarcWriterPool(options)
self.method_filter = set(method.upper() for method in self.options.method_filter or [])
def _get_process_put(self):
try:
warcprox.BaseStandardPostfetchProcessor._get_process_put(self)
finally:
self.writer_pool.maybe_idle_rollover()
def _process_url(self, recorded_url):
records = []
if self._should_archive(recorded_url):
records = self.writer_pool.write_records(recorded_url)
recorded_url.warc_records = records
self._log(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
def _filter_accepts(self, recorded_url): def _filter_accepts(self, recorded_url):
if not self.method_filter: if not self.method_filter:
return True return True
@ -81,68 +73,9 @@ class WarcWriterThread(threading.Thread):
# special warc name prefix '-' means "don't archive" # special warc name prefix '-' means "don't archive"
return prefix != '-' and self._filter_accepts(recorded_url) return prefix != '-' and self._filter_accepts(recorded_url)
def _run(self):
self.name = '%s(tid=%s)'% (self.name, warcprox.gettid())
while not self.stop.is_set():
try:
while True:
try:
if self.stop.is_set():
qsize = self.recorded_url_q.qsize()
if qsize % 50 == 0:
self.logger.info("%s urls left to write", qsize)
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
records = []
self.idle = None
if self._should_archive(recorded_url):
if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
recorded_url.response_recorder.tempfile.close()
except queue.Empty:
if self.stop.is_set():
break
self.idle = time.time()
finally:
self.writer_pool.maybe_idle_rollover()
self.logger.info('WarcWriterThread shutting down')
self._shutdown()
except Exception as e:
if isinstance(e, OSError) and e.errno == 28:
# OSError: [Errno 28] No space left on device
self.logger.critical(
'shutting down due to fatal problem: %s: %s',
e.__class__.__name__, e)
self._shutdown()
sys.exit(1)
self.logger.critical(
'WarcWriterThread will try to continue after unexpected '
'error', exc_info=True)
time.sleep(0.5)
def _shutdown(self):
self.writer_pool.close_writers()
for listener in self.listeners:
if hasattr(listener, 'stop'):
try:
listener.stop()
except:
self.logger.error(
'%s raised exception', listener.stop, exc_info=True)
# closest thing we have to heritrix crawl log at the moment
def _log(self, recorded_url, records): def _log(self, recorded_url, records):
try: try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8") payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8")
except: except:
payload_digest = "-" payload_digest = "-"
@ -156,13 +89,3 @@ class WarcWriterThread(threading.Thread):
recorded_url.method, recorded_url.url.decode("utf-8"), recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.mimetype, recorded_url.size, payload_digest, recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset) type_, filename, offset)
def _final_tasks(self, recorded_url, records):
if self.listeners:
for listener in self.listeners:
try:
listener.notify(recorded_url, records)
except:
self.logger.error('%s raised exception',
listener.notify, exc_info=True)
self._log(recorded_url, records)