1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-14 15:53:28 +01:00
pywb/tests/test_cdx_server_app.py

308 lines
11 KiB
Python

from gevent import monkey; monkey.patch_all(thread=False)
import re
import json
import os
import webtest
from six.moves.urllib.parse import urlencode
from pywb.warcserver.index.cdxobject import CDXObject
from pywb.warcserver.test.testutils import BaseTestClass
from pywb.warcserver.warcserver import WarcServer
# ============================================================================
class TestCDXApp(BaseTestClass):
@classmethod
def setup_class(cls):
super(TestCDXApp, cls).setup_class()
config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config_test.yaml')
cls.testapp = webtest.TestApp(WarcServer(config_file=config_file))
def query(self, url, is_error=False, **params):
params['url'] = url
return self.testapp.get('/pywb/index?' + urlencode(params, doseq=1), expect_errors=is_error)
def test_exact_url(self):
"""
basic exact match, no filters, etc.
"""
resp = self.query('http://www.iana.org/')
assert resp.status_code == 200
assert len(resp.text.splitlines()) == 3, resp.text
def test_exact_url_json(self):
"""
basic exact match, no filters, etc.
"""
resp = self.query('http://www.iana.org/', output='json')
assert resp.status_code == 200
lines = resp.text.splitlines()
assert len(lines) == 3, resp.text
assert len(list(map(json.loads, lines))) == 3
def test_exact_url_plain_text(self):
"""
basic exact match, no filters, etc.
"""
resp = self.query('http://www.iana.org/', output='text')
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
assert '{' not in resp.text
lines = resp.text.splitlines()
assert len(lines) == 3, resp.text
def test_prefix_match(self):
"""
prefix match test
"""
resp = self.query('http://www.iana.org/', matchType='prefix')
assert resp.status_code == 200
suburls = 0
for l in resp.text.splitlines():
fields = l.split(' ')
if len(fields[0]) > len('org,iana)/'):
suburls += 1
assert suburls > 0
def test_filters_1(self):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
filter=('mime:warc/revisit', 'filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
lines = resp.text.splitlines()
assert len(lines) > 0
for l in lines:
cdx = CDXObject(l.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140127171239'
assert cdx['mime'] == 'warc/revisit'
assert cdx['filename'] == 'dupes.warc.gz'
def test_filters_2_no_fuzzy_no_match(self):
"""
two filters, disable fuzzy matching
"""
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
filter=('!mime:warc/revisit', 'filename:dupes.warc.gz'),
allowFuzzy='false')
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
lines = resp.text.splitlines()
assert len(lines) == 0
def test_filters_3(self):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
filter=('!mime:warc/revisit', '!filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
lines = resp.text.splitlines()
assert len(lines) == 1
for l in lines:
cdx = CDXObject(l.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140126200625'
assert cdx['mime'] == 'text/css'
assert cdx['filename'] == 'iana.warc.gz'
def test_limit(self):
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
limit='1')
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
cdx = CDXObject(cdxes[0].encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140126200625'
assert cdx['mime'] == 'text/css'
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
limit='1', reverse='1')
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
cdx = CDXObject(cdxes[0].encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140127171239'
assert cdx['mime'] == 'warc/revisit'
def test_fields(self):
"""
retrieve subset of fields with ``fields`` parameter.
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,status')
assert resp.status_code == 200
cdxes = resp.text.splitlines()
for cdx in cdxes:
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', cdx['timestamp'])
assert re.match(r'\d{3}|-', cdx['status'])
def test_fields_json(self):
"""
retrieve subset of fields with ``fields`` parameter, in json
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,status',
output='json')
assert resp.status_code == 200
cdxes = resp.text.splitlines()
for cdx in cdxes:
print(cdx)
fields = json.loads(cdx)
assert len(fields) == 3
assert fields['urlkey'] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', fields['timestamp'])
assert re.match(r'\d{3}|-', fields['status'])
def test_fields_undefined(self):
"""
server shall respond with Bad Request and name of undefined
when ``fields`` parameter contains undefined name(s).
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,nosuchfield')
resp.status_code == 400
def test_fields_undefined_json(self):
"""
server shall respond with Bad Request and name of undefined
when ``fields`` parameter contains undefined name(s).
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,nosuchfield',
output='json')
resp.status_code == 400
def test_resolveRevisits(self):
"""
with ``resolveRevisits=true``, server adds three fields pointing to
the *original* capture.
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='true'
)
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
originals = {}
for cdx in cdxes:
cdx = CDXObject(cdx.encode('utf-8'))
assert len(cdx) == 16
# orig.* fields are either all '-' or (int, int, filename)
# check if orig.* fields are equals to corresponding fields
# for the original capture.
sha = cdx['digest']
if cdx['orig.length'] == '-':
assert cdx['orig.offset'] == '-' and cdx['orig.filename'] == '-'
originals[sha] = (int(cdx['length']), int(cdx['offset']), cdx['filename'])
else:
orig = originals.get(sha)
assert orig == (int(cdx['orig.length']), int(cdx['orig.offset']), cdx['orig.filename'])
def test_resolveRevisits_orig_fields(self):
"""
when resolveRevisits=true, extra three fields are named
``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
it is possible to filter fields by these names.
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='1',
fields='urlkey,orig.length,orig.offset,orig.filename'
)
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
cdx = cdxes[0]
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['orig.offset'] == '-'
assert cdx['orig.length'] == '-'
assert cdx['orig.filename'] == '-'
for cdx in cdxes[1:]:
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['orig.offset'] != '-'
assert cdx['orig.length'] != '-'
assert cdx['orig.filename'] == 'iana.warc.gz'
def test_collapseTime_resolveRevisits_reverse(self):
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
collapseTime='11',
resolveRevisits='true',
reverse='true'
)
cdxes = [CDXObject(l) for l in resp.body.splitlines()]
assert len(cdxes) == 3
# timestamp is in descending order
for i in range(len(cdxes) - 1):
assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']
def test_error_unknown_output_format(self):
"""test unknown output format in combination with a list of output fields"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,timestamp,status',
output='foo')
assert resp.status_code == 400
assert resp.json == {'message': 'output=foo not supported'}
def test_error_unknown_match_type(self):
"""test unknown/unsupported matchType"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,timestamp,status',
matchType='foo')
assert resp.status_code == 400
assert resp.json == {'message': 'Invalid match_type: foo'}