1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00
pywb/tests/test_wsgi_cdxserver.py

190 lines
5.8 KiB
Python
Raw Normal View History

import os
import re
import pytest
from urllib import urlencode
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse, Response
import yaml
from pywb.cdx.cdxobject import CDXObject
from pywb.cdx.wsgi_cdxserver import create_app
from tests.fixture import testconfig
@pytest.fixture
def client(testconfig):
app = create_app(testconfig)
return Client(app, Response)
# ================================================================
def query(client, url, **params):
params['url'] = url
return client.get('/cdx?' + urlencode(params, doseq=1))
# ================================================================
def test_exact_url(client):
"""
basic exact match, no filters, etc.
"""
resp = query(client, 'http://www.iana.org/')
assert resp.status_code == 200
print resp.data
def test_prefix_match(client):
"""
prefix match test
"""
resp = query(client, 'http://www.iana.org/', matchType='prefix')
print resp.data.splitlines()
assert resp.status_code == 200
suburls = 0
for l in resp.data.splitlines():
fields = l.split(' ')
if len(fields[0]) > len('org,iana)/'):
suburls += 1
assert suburls > 0
def test_filters(client):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
filter=('mimetype:warc/revisit', 'filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
for l in resp.data.splitlines():
fields = l.split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[3] == 'warc/revisit'
assert fields[10] == 'dupes.warc.gz'
def test_limit(client):
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
limit='1')
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[1] == '20140126200625'
assert fields[3] == 'text/css'
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
limit='1', reverse='1')
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[1] == '20140127171239'
assert fields[3] == 'warc/revisit'
def test_fields(client):
"""
retrieve subset of fields with ``fields`` parameter.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,statuscode')
assert resp.status_code == 200
cdxes = resp.data.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 3
assert fields[0] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', fields[1])
assert re.match(r'\d{3}|-', fields[2])
def test_fields_undefined(client):
"""
server shall respond with Bad Request (TODO: with proper explanation),
when ``fields`` parameter contains undefined name(s).
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,nosuchfield')
resp.status_code == 400
def test_resolveRevisits(client):
"""
with ``resolveRevisits=true``, server adds three fields pointing to
the *original* capture.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='true'
)
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
originals = {}
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 14
(key, ts, url, mt, st, sha, _, _, size, offset, fn,
orig_size, orig_offset, orig_fn) = fields
# orig_* fields are either all '-' or (int, int, filename)
# check if orig_* fields are equals to corresponding fields
# for the original capture.
if orig_size == '-':
assert orig_offset == '-' and orig_fn == '-'
originals[sha] = (int(size), int(offset), fn)
else:
orig = originals.get(sha)
assert orig == (int(orig_size), int(orig_offset), orig_fn)
def test_resolveRevisits_orig_fields(client):
"""
when resolveRevisits=true, extra three fields are named
``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
it is possible to filter fields by these names.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='1',
fields='urlkey,orig.length,orig.offset,orig.filename'
)
assert resp.status_code == 200
assert resp.mimetype == 'text/plain'
cdxes = resp.data.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 4
key, orig_len, orig_offset, orig_fn = fields
assert (orig_len == '-' and orig_offset == '-' and orig_fn == '-' or
(int(orig_len), int(orig_offset), orig_fn))
def test_collapseTime_resolveRevisits_reverse(client):
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
collapseTime='11',
resolveRevisits='true',
reverse='true'
)
cdxes = [CDXObject(l) for l in resp.data.splitlines()]
assert len(cdxes) == 3
# timestamp is in descending order
for i in range(len(cdxes) - 1):
assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']