1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-24 06:59:52 +01:00
pywb/tests/test_wsgi_cdxserver.py
Kenji Nagahashi 2c40c9b112 refactor cdxserver, add tests focused on wsgi_cdxserver, add docstrings.
align cdxops function interfaces - all cdx_iter.
  move module functions / common ops to class methods
  support both 0/1 and true/false for boolean parameters
  move CDXObject to text conversion to wsgi_cdxserver (may have broken
    embedded cdxserver mode).
  pass config object as function arg rather than as global var.
2014-02-27 01:58:07 +00:00

198 lines
6.0 KiB
Python

import os
import re
import pytest
from urllib import urlencode
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse, Response
import yaml
from pywb.cdx.cdxobject import CDXObject
from pywb.cdx.wsgi_cdxserver import create_app
@pytest.fixture
def testconfig():
config = yaml.load(open('test_config.yaml'))
assert config
if 'index_paths' not in config:
config['index_paths'] = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'../sample_archive/cdx')
return config
@pytest.fixture
def client(testconfig):
app = create_app(testconfig)
return Client(app, Response)
# ================================================================
def query(client, url, **params):
params['url'] = url
return client.get('/cdx?' + urlencode(params, doseq=1))
# ================================================================
def test_exact_url(client):
"""
basic exact match, no filters, etc.
"""
resp = query(client, 'http://www.iana.org/')
assert resp.status_code == 200
print resp.data
def test_prefix_match(client):
"""
prefix match test
"""
resp = query(client, 'http://www.iana.org/', matchType='prefix')
print resp.data.splitlines()
assert resp.status_code == 200
suburls = 0
for l in resp.data.splitlines():
fields = l.split(' ')
if len(fields[0]) > len('org,iana)/'):
suburls += 1
assert suburls > 0
def test_filters(client):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
filter=('mimetype:warc/revisit', 'filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
for l in resp.data.splitlines():
fields = l.split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[3] == 'warc/revisit'
assert fields[10] == 'dupes.warc.gz'
def test_limit(client):
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
limit='1')
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[1] == '20140126200625'
assert fields[3] == 'text/css'
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
limit='1', reverse='1')
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[1] == '20140127171239'
assert fields[3] == 'warc/revisit'
def test_fields(client):
"""
retrieve subset of fields with ``fields`` parameter.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,statuscode')
assert resp.status_code == 200
cdxes = resp.data.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 3
assert fields[0] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', fields[1])
assert re.match(r'\d{3}|-', fields[2])
def test_fields_undefined(client):
"""
server shall respond with Bad Request (TODO: with proper explanation),
when ``fields`` parameter contains undefined name(s).
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,nosuchfield')
resp.status_code == 400
def test_resolveRevisits(client):
"""
with ``resolveRevisits=true``, server adds three fields pointing to
the *original* capture.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='true'
)
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
originals = {}
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 14
(key, ts, url, mt, st, sha, _, _, size, offset, fn,
orig_size, orig_offset, orig_fn) = fields
# orig_* fields are either all '-' or (int, int, filename)
# check if orig_* fields are equals to corresponding fields
# for the original capture.
if orig_size == '-':
assert orig_offset == '-' and orig_fn == '-'
originals[sha] = (int(size), int(offset), fn)
else:
orig = originals.get(sha)
assert orig == (int(orig_size), int(orig_offset), orig_fn)
def test_resolveRevisits_orig_fields(client):
"""
when resolveRevisits=true, extra three fields are named
``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
it is possible to filter fields by these names.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='1',
fields='urlkey,orig.length,orig.offset,orig.filename'
)
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 4
key, orig_len, orig_offset, orig_fn = fields
assert (orig_len == '-' and orig_offset == '-' and orig_fn == '-' or
(int(orig_len), int(orig_offset), orig_fn))
def test_collapseTime_resolveRevisits_reverse(client):
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
collapseTime='11',
resolveRevisits='true',
reverse='true'
)
cdxes = [CDXObject(l) for l in resp.data.splitlines()]
assert len(cdxes) == 3
# timestamp is in descending order
for i in range(len(cdxes) - 1):
assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']