mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
308 lines
11 KiB
Python
308 lines
11 KiB
Python
from gevent import monkey; monkey.patch_all(thread=False)
|
|
|
|
import re
|
|
import json
|
|
import os
|
|
|
|
import webtest
|
|
|
|
from six.moves.urllib.parse import urlencode
|
|
|
|
from pywb.warcserver.index.cdxobject import CDXObject
|
|
|
|
from pywb.warcserver.test.testutils import BaseTestClass
|
|
from pywb.warcserver.warcserver import WarcServer
|
|
|
|
|
|
# ============================================================================
|
|
class TestCDXApp(BaseTestClass):
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestCDXApp, cls).setup_class()
|
|
config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config_test.yaml')
|
|
cls.testapp = webtest.TestApp(WarcServer(config_file=config_file))
|
|
|
|
def query(self, url, is_error=False, **params):
|
|
params['url'] = url
|
|
return self.testapp.get('/pywb/index?' + urlencode(params, doseq=1), expect_errors=is_error)
|
|
|
|
def test_exact_url(self):
|
|
"""
|
|
basic exact match, no filters, etc.
|
|
"""
|
|
resp = self.query('http://www.iana.org/')
|
|
|
|
assert resp.status_code == 200
|
|
assert len(resp.text.splitlines()) == 3, resp.text
|
|
|
|
def test_exact_url_json(self):
|
|
"""
|
|
basic exact match, no filters, etc.
|
|
"""
|
|
resp = self.query('http://www.iana.org/', output='json')
|
|
|
|
assert resp.status_code == 200
|
|
lines = resp.text.splitlines()
|
|
assert len(lines) == 3, resp.text
|
|
assert len(list(map(json.loads, lines))) == 3
|
|
|
|
def test_exact_url_plain_text(self):
|
|
"""
|
|
basic exact match, no filters, etc.
|
|
"""
|
|
resp = self.query('http://www.iana.org/', output='text')
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/plain'
|
|
assert '{' not in resp.text
|
|
|
|
lines = resp.text.splitlines()
|
|
assert len(lines) == 3, resp.text
|
|
|
|
def test_prefix_match(self):
|
|
"""
|
|
prefix match test
|
|
"""
|
|
resp = self.query('http://www.iana.org/', matchType='prefix')
|
|
|
|
assert resp.status_code == 200
|
|
|
|
suburls = 0
|
|
for l in resp.text.splitlines():
|
|
fields = l.split(' ')
|
|
if len(fields[0]) > len('org,iana)/'):
|
|
suburls += 1
|
|
assert suburls > 0
|
|
|
|
def test_filters_1(self):
|
|
"""
|
|
filter cdxes by mimetype and filename field, exact match.
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
|
|
filter=('mime:warc/revisit', 'filename:dupes.warc.gz'))
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
lines = resp.text.splitlines()
|
|
assert len(lines) > 0
|
|
|
|
for l in lines:
|
|
cdx = CDXObject(l.encode('utf-8'))
|
|
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
|
|
assert cdx['timestamp'] == '20140127171239'
|
|
assert cdx['mime'] == 'warc/revisit'
|
|
assert cdx['filename'] == 'dupes.warc.gz'
|
|
|
|
def test_filters_2_no_fuzzy_no_match(self):
|
|
"""
|
|
two filters, disable fuzzy matching
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
|
|
filter=('!mime:warc/revisit', 'filename:dupes.warc.gz'),
|
|
allowFuzzy='false')
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
lines = resp.text.splitlines()
|
|
assert len(lines) == 0
|
|
|
|
def test_filters_3(self):
|
|
"""
|
|
filter cdxes by mimetype and filename field, exact match.
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
|
|
filter=('!mime:warc/revisit', '!filename:dupes.warc.gz'))
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
lines = resp.text.splitlines()
|
|
assert len(lines) == 1
|
|
|
|
for l in lines:
|
|
cdx = CDXObject(l.encode('utf-8'))
|
|
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
|
|
assert cdx['timestamp'] == '20140126200625'
|
|
assert cdx['mime'] == 'text/css'
|
|
assert cdx['filename'] == 'iana.warc.gz'
|
|
|
|
def test_limit(self):
|
|
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
|
|
limit='1')
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
cdxes = resp.text.splitlines()
|
|
assert len(cdxes) == 1
|
|
|
|
cdx = CDXObject(cdxes[0].encode('utf-8'))
|
|
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
|
|
assert cdx['timestamp'] == '20140126200625'
|
|
assert cdx['mime'] == 'text/css'
|
|
|
|
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
|
|
limit='1', reverse='1')
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
cdxes = resp.text.splitlines()
|
|
assert len(cdxes) == 1
|
|
|
|
cdx = CDXObject(cdxes[0].encode('utf-8'))
|
|
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
|
|
assert cdx['timestamp'] == '20140127171239'
|
|
assert cdx['mime'] == 'warc/revisit'
|
|
|
|
def test_fields(self):
|
|
"""
|
|
retrieve subset of fields with ``fields`` parameter.
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
fields='urlkey,timestamp,status')
|
|
|
|
assert resp.status_code == 200
|
|
|
|
cdxes = resp.text.splitlines()
|
|
|
|
for cdx in cdxes:
|
|
cdx = CDXObject(cdx.encode('utf-8'))
|
|
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/print.css'
|
|
assert re.match(r'\d{14}$', cdx['timestamp'])
|
|
assert re.match(r'\d{3}|-', cdx['status'])
|
|
|
|
def test_fields_json(self):
|
|
"""
|
|
retrieve subset of fields with ``fields`` parameter, in json
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
fields='urlkey,timestamp,status',
|
|
output='json')
|
|
|
|
assert resp.status_code == 200
|
|
|
|
cdxes = resp.text.splitlines()
|
|
|
|
for cdx in cdxes:
|
|
print(cdx)
|
|
fields = json.loads(cdx)
|
|
assert len(fields) == 3
|
|
assert fields['urlkey'] == 'org,iana)/_css/2013.1/print.css'
|
|
assert re.match(r'\d{14}$', fields['timestamp'])
|
|
assert re.match(r'\d{3}|-', fields['status'])
|
|
|
|
def test_fields_undefined(self):
|
|
"""
|
|
server shall respond with Bad Request and name of undefined
|
|
when ``fields`` parameter contains undefined name(s).
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
is_error=True,
|
|
fields='urlkey,nosuchfield')
|
|
|
|
resp.status_code == 400
|
|
|
|
def test_fields_undefined_json(self):
|
|
"""
|
|
server shall respond with Bad Request and name of undefined
|
|
when ``fields`` parameter contains undefined name(s).
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
is_error=True,
|
|
fields='urlkey,nosuchfield',
|
|
output='json')
|
|
|
|
resp.status_code == 400
|
|
|
|
def test_resolveRevisits(self):
|
|
"""
|
|
with ``resolveRevisits=true``, server adds three fields pointing to
|
|
the *original* capture.
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
resolveRevisits='true'
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
cdxes = resp.text.splitlines()
|
|
originals = {}
|
|
for cdx in cdxes:
|
|
cdx = CDXObject(cdx.encode('utf-8'))
|
|
assert len(cdx) == 16
|
|
|
|
# orig.* fields are either all '-' or (int, int, filename)
|
|
# check if orig.* fields are equals to corresponding fields
|
|
# for the original capture.
|
|
|
|
sha = cdx['digest']
|
|
if cdx['orig.length'] == '-':
|
|
assert cdx['orig.offset'] == '-' and cdx['orig.filename'] == '-'
|
|
originals[sha] = (int(cdx['length']), int(cdx['offset']), cdx['filename'])
|
|
else:
|
|
orig = originals.get(sha)
|
|
assert orig == (int(cdx['orig.length']), int(cdx['orig.offset']), cdx['orig.filename'])
|
|
|
|
def test_resolveRevisits_orig_fields(self):
|
|
"""
|
|
when resolveRevisits=true, extra three fields are named
|
|
``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
|
|
it is possible to filter fields by these names.
|
|
"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
resolveRevisits='1',
|
|
fields='urlkey,orig.length,orig.offset,orig.filename'
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.content_type == 'text/x-cdxj'
|
|
|
|
cdxes = resp.text.splitlines()
|
|
cdx = cdxes[0]
|
|
cdx = CDXObject(cdx.encode('utf-8'))
|
|
assert cdx['orig.offset'] == '-'
|
|
assert cdx['orig.length'] == '-'
|
|
assert cdx['orig.filename'] == '-'
|
|
|
|
for cdx in cdxes[1:]:
|
|
cdx = CDXObject(cdx.encode('utf-8'))
|
|
assert cdx['orig.offset'] != '-'
|
|
assert cdx['orig.length'] != '-'
|
|
assert cdx['orig.filename'] == 'iana.warc.gz'
|
|
|
|
def test_collapseTime_resolveRevisits_reverse(self):
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
collapseTime='11',
|
|
resolveRevisits='true',
|
|
reverse='true'
|
|
)
|
|
|
|
cdxes = [CDXObject(l) for l in resp.body.splitlines()]
|
|
|
|
assert len(cdxes) == 3
|
|
|
|
# timestamp is in descending order
|
|
for i in range(len(cdxes) - 1):
|
|
assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']
|
|
|
|
def test_error_unknown_output_format(self):
|
|
"""test unknown output format in combination with a list of output fields"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
is_error=True,
|
|
fields='urlkey,timestamp,status',
|
|
output='foo')
|
|
assert resp.status_code == 400
|
|
assert resp.json == {'message': 'output=foo not supported'}
|
|
|
|
def test_error_unknown_match_type(self):
|
|
"""test unknown/unsupported matchType"""
|
|
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
|
|
is_error=True,
|
|
fields='urlkey,timestamp,status',
|
|
matchType='foo')
|
|
assert resp.status_code == 400
|
|
assert resp.json == {'message': 'Invalid match_type: foo'}
|
|
|