1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

new-pywb refactor!

frontendapp compatibility
- add support for separate not found page for 404s (not_found.html)
- support for exception handling with error template (error.html)
- support for home page (index.html)
- add memento headers for replay
- add referrer fallback check
- tests: port integration tests for front-end replay, cdx server
- not included: proxy mode, exact redirect mode, non-framed replay
- move unused tests to tests_disabled
- cli: add optional werkzeug profiler with --profile flag
This commit is contained in:
Ilya Kreymer 2017-02-27 19:07:51 -08:00
parent 0dbc803422
commit a4b770d34e
44 changed files with 603 additions and 598 deletions

View File

@ -41,6 +41,7 @@ class BaseCli(object):
parser.add_argument('-t', '--threads', type=int, default=4)
parser.add_argument('-s', '--server', default='gevent')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--profile', action='store_true')
self.desc = desc
@ -59,11 +60,12 @@ class BaseCli(object):
logging.debug('No Gevent')
self.r.server = 'wsgiref'
from pywb.framework.wsgi_wrappers import init_app
self.init_app = init_app
self.application = self.load()
if self.r.profile:
from werkzeug.contrib.profiler import ProfilerMiddleware
self.application = ProfilerMiddleware(self.application)
def _extend_parser(self, parser): #pragma: no cover
pass
@ -109,7 +111,9 @@ class LiveCli(BaseCli):
collections={'live': '$liveweb'})
from pywb.webapp.pywb_init import create_wb_router
return self.init_app(create_wb_router, load_yaml=False, config=config)
from pywb.framework.wsgi_wrappers import init_app
return init_app(create_wb_router, load_yaml=False, config=config)
#=============================================================================
@ -149,18 +153,20 @@ class ReplayCli(BaseCli):
class CdxCli(ReplayCli): #pragma: no cover
def load(self):
from pywb.webapp.pywb_init import create_cdx_server_app
from pywb.framework.wsgi_wrappers import init_app
super(CdxCli, self).load()
return self.init_app(create_cdx_server_app,
load_yaml=True)
return init_app(create_cdx_server_app,
load_yaml=True)
#=============================================================================
class WaybackCli(ReplayCli):
def load(self):
from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
super(WaybackCli, self).load()
return self.init_app(create_wb_router,
load_yaml=True)
return init_app(create_wb_router,
load_yaml=True)
#=============================================================================

View File

@ -149,7 +149,7 @@ class HeaderRewriter(object):
new_headers.append((name, urlrewriter.rewrite(value)))
elif lowername in self.KEEP_NO_REWRITE_HEADERS:
if content_modified:
if content_modified and value != '0':
removed_header_dict[lowername] = value
add_prefixed_header(name, value)
else:

View File

@ -205,7 +205,7 @@ class RewriteContent(object):
except Exception:
content_len = None
if content_len and content_len >= 0:
if content_len is not None and content_len >= 0:
content_len = str(content_len + len(head_insert_str))
status_headers.replace_header('Content-Length',
content_len)

View File

@ -0,0 +1,16 @@
<!DOCTYPE html>
<html>
<body>
<h2>pywb Wayback Machine (new)</h2>
This archive contains the following collections:
<ul>
{% for route in routes %}
<li>
<a href="{{ '/' + route }}">{{ '/' + route }}</a>
</li>
{% endfor %}
</ul>
</body>
</html>

View File

@ -2,9 +2,9 @@
The url <b>{{ url }}</b> could not be found in this collection.
{% if wbrequest.env.pywb_proxy_magic and url %}
{% if wbrequest and wbrequest.env.pywb_proxy_magic and url %}
<p>
<a href="//select.{{ wbrequest.env.pywb_proxy_magic }}/{{ url }}">Try Different Collection</a>
<a href="//select.{{ wbrequest and wbrequest.env.pywb_proxy_magic }}/{{ url }}">Try Different Collection</a>
</p>
{% endif %}

View File

@ -1,3 +1,5 @@
{% if wbrequest.user_metadata %}
<h2>{{ wbrequest.user_metadata.title if wbrequest.user_metadata.title else wbrequest.coll }} Search Page</h2>
<div>
@ -8,6 +10,8 @@
</table>
</div>
{% endif %}
<p>
Search this collection by url:
<form onsubmit="url = document.getElementById('search').value; if (url != '') { document.location.href = '{{ wbrequest.wb_prefix }}' + '*/' + url; } return false;">

View File

@ -2,8 +2,9 @@ from gevent.monkey import patch_all; patch_all()
#from bottle import run, Bottle, request, response, debug
from werkzeug.routing import Map, Rule
from werkzeug.exceptions import HTTPException
from werkzeug.exceptions import HTTPException, NotFound
from werkzeug.wsgi import pop_path_info
from six.moves.urllib.parse import urljoin
from pywb.webagg.autoapp import AutoConfigApp
from pywb.webapp.handlers import StaticHandler
@ -23,7 +24,6 @@ class NewWbRequest(object):
self.env = env
self.wb_url_str = wb_url_str
self.full_prefix = full_prefix
self.user_metadata = {}
# ============================================================================
@ -43,7 +43,8 @@ class FrontEndApp(RewriterApp):
self.url_map.add(Rule('/static/__pywb/<path:filepath>', endpoint=self.serve_static))
self.url_map.add(Rule('/<coll>/', endpoint=self.serve_coll_page))
self.url_map.add(Rule('/<coll>/<path:url>', endpoint=self.serve_content))
self.url_map.add(Rule('/_coll_info.json', endpoint=self.serve_listing))
self.url_map.add(Rule('/collinfo.json', endpoint=self.serve_listing))
self.url_map.add(Rule('/', endpoint=self.serve_home))
self.paths = self.get_upstream_paths(self.webagg_server.port)
@ -52,14 +53,28 @@ class FrontEndApp(RewriterApp):
'replay-fixed': 'http://localhost:%s/{coll}/resource/postreq' % port
}
def serve_home(self, environ):
home_view = BaseInsertView(self.jinja_env, 'new_index.html')
routes = self.webagg.list_fixed_routes() + self.webagg.list_dynamic_routes()
content = home_view.render_to_string(environ, routes=routes)
return WbResponse.text_response(content, content_type='text/html; charset="utf-8"')
def serve_static(self, environ, filepath=''):
return self.static_handler(NewWbRequest(environ, filepath, ''))
try:
return self.static_handler(NewWbRequest(environ, filepath, ''))
except:
raise NotFound(response=self._error_response(environ, 'Static File Not Found: {0}'.format(filepath)))
def serve_coll_page(self, environ, coll):
view = BaseInsertView(self.jinja_env, 'search.html')
if not self.is_valid_coll(coll):
raise NotFound(response=self._error_response(environ, 'No handler for "/{0}"'.format(coll)))
wbrequest = NewWbRequest(environ, '', '/')
return WbResponse.text_response(view.render_to_string(environ, wbrequest=wbrequest),
content_type='text/html; charset="utf-8"')
view = BaseInsertView(self.jinja_env, 'search.html')
content = view.render_to_string(environ, wbrequest=wbrequest)
return WbResponse.text_response(content, content_type='text/html; charset="utf-8"')
def serve_listing(self, environ):
result = {'fixed': self.webagg.list_fixed_routes(),
@ -68,7 +83,14 @@ class FrontEndApp(RewriterApp):
return WbResponse.json_response(result)
def is_valid_coll(self, coll):
return (coll in self.webagg.list_fixed_routes() or
coll in self.webagg.list_dynamic_routes())
def serve_content(self, environ, coll='', url=''):
if not self.is_valid_coll(coll):
raise NotFound(response=self._error_response(environ, 'No handler for "/{0}"'.format(coll)))
pop_path_info(environ)
wb_url = self.get_wburl(environ)
@ -83,30 +105,59 @@ class FrontEndApp(RewriterApp):
response = self.render_content(wb_url, kwargs, environ)
except UpstreamException as ue:
response = self.handle_error(environ, ue)
raise HTTPException(response=response)
return response
def _check_refer_redirect(self, environ):
referer = environ.get('HTTP_REFERER')
if not referer:
return
host = environ.get('HTTP_HOST')
if host not in referer:
return
inx = referer[1:].find('http')
if not inx:
inx = referer[1:].find('///')
if inx > 0:
inx + 1
if inx < 0:
return
url = referer[inx + 1:]
host = referer[:inx + 1]
orig_url = environ['PATH_INFO']
if environ.get('QUERY_STRING'):
orig_url += '?' + environ['QUERY_STRING']
full_url = host + urljoin(url, orig_url)
return WbResponse.redir_response(full_url, '307 Redirect')
def __call__(self, environ, start_response):
urls = self.url_map.bind_to_environ(environ)
try:
endpoint, args = urls.match()
except HTTPException as e:
return e(environ, start_response)
try:
response = endpoint(environ, **args)
return response(environ, start_response)
except HTTPException as e:
redir = self._check_refer_redirect(environ)
if redir:
return redir(environ, start_response)
return e(environ, start_response)
except Exception as e:
if self.debug:
traceback.print_exc()
#message = 'Internal Error: ' + str(e)
#status = 500
#return self.send_error({}, start_response,
# message=message,
# status=status)
return self._error_response(environ, 'Internal Error: ' + str(e), '500 Server Error')
@classmethod
def create_app(cls, port):

View File

@ -16,6 +16,9 @@ from pywb.cdx.cdxobject import CDXObject
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.framework.wbrequestresponse import WbResponse
from pywb.webagg.utils import MementoUtils, buffer_iter
from werkzeug.http import HTTP_STATUS_CODES
from six.moves.urllib.parse import urlencode
from pywb.urlrewrite.rewriteinputreq import RewriteInputRequest
@ -62,6 +65,7 @@ class RewriterApp(object):
self.head_insert_view = HeadInsertView(self.jinja_env, 'head_insert.html', 'banner.html')
self.frame_insert_view = TopFrameView(self.jinja_env, 'frame_insert.html', 'banner.html')
self.error_view = BaseInsertView(self.jinja_env, 'error.html')
self.not_found_view = BaseInsertView(self.jinja_env, 'not_found.html')
self.query_view = BaseInsertView(self.jinja_env, config.get('query_html', 'query.html'))
self.cookie_tracker = None
@ -185,10 +189,13 @@ class RewriterApp(object):
stream = BufferedReader(r.raw, block_size=BUFF_SIZE)
record = self.loader.parse_record_stream(stream)
memento_dt = r.headers.get('Memento-Datetime')
target_uri = r.headers.get('WARC-Target-URI')
cdx = CDXObject()
cdx['urlkey'] = urlkey
cdx['timestamp'] = http_date_to_timestamp(r.headers.get('Memento-Datetime'))
cdx['url'] = wb_url.url
cdx['timestamp'] = http_date_to_timestamp(memento_dt)
cdx['url'] = target_uri
self._add_custom_params(cdx, r.headers, kwargs)
@ -237,8 +244,30 @@ class RewriterApp(object):
if ' ' not in status_headers.statusline:
status_headers.statusline += ' None'
self._add_memento_links(urlrewriter, full_prefix, memento_dt, status_headers)
#if cdx['timestamp'] != wb_url.timestamp:
status_headers.headers.append(('Content-Location', urlrewriter.get_new_url(timestamp=cdx['timestamp'],
url=cdx['url'])))
#gen = buffer_iter(status_headers, gen)
return WbResponse(status_headers, gen)
def _add_memento_links(self, urlrewriter, full_prefix, memento_dt, status_headers):
wb_url = urlrewriter.wburl
status_headers.headers.append(('Memento-Datetime', memento_dt))
memento_url = full_prefix + wb_url._original_url
timegate_url = urlrewriter.get_new_url(timestamp='')
link = []
link.append(MementoUtils.make_link(timegate_url, 'timegate'))
link.append(MementoUtils.make_memento_link(memento_url, 'memento', memento_dt))
link_str = ', '.join(link)
status_headers.headers.append(('Link', link_str))
def get_top_url(self, full_prefix, wb_url, cdx, kwargs):
top_url = full_prefix
top_url += wb_url.to_str(mod='')
@ -264,11 +293,26 @@ class RewriterApp(object):
pass
def handle_error(self, environ, ue):
error_html = self.error_view.render_to_string(environ,
err_msg=ue.url,
err_details=ue.msg)
if ue.status_code == 404:
return self._not_found_response(environ, ue.url)
else:
status = str(ue.status_code) + ' ' + HTTP_STATUS_CODES.get(ue.status_code, 'Unknown Error')
return self._error_response(environ, ue.url, ue.msg,
status=status)
def _not_found_response(self, environ, url):
resp = self.not_found_view.render_to_string(environ, url=url)
return WbResponse.text_response(resp, status='404 Not Found', content_type='text/html')
def _error_response(self, environ, msg='', details='', status='404 Not Found'):
resp = self.error_view.render_to_string(environ,
err_msg=msg,
err_details=details)
return WbResponse.text_response(resp, status=status, content_type='text/html')
return WbResponse.text_response(error_html, content_type='text/html')
def _do_req(self, inputreq, wb_url, kwargs, skip):
req_data = inputreq.reconstruct_request(wb_url.url)

View File

@ -94,11 +94,8 @@ class AutoConfigApp(ResAggApp):
indexes_templ = self.AUTO_DIR_INDEX_PATH.replace('/', os.path.sep)
dir_source = CacheDirectoryIndexSource(self.root_dir, indexes_templ)
archive_templ = self.config.get('archive_paths')
if not archive_templ:
archive_templ = self.AUTO_DIR_ARCHIVE_PATH.replace('/', os.path.sep)
archive_templ = os.path.join(self.root_dir, archive_templ)
#archive_templ = os.path.join('.', root_dir, '{coll}', 'archive') + os.path.sep
archive_templ = self.AUTO_DIR_ARCHIVE_PATH.replace('/', os.path.sep)
archive_templ = os.path.join(self.root_dir, archive_templ)
handler = DefaultResourceHandler(dir_source, archive_templ)
@ -123,8 +120,15 @@ class AutoConfigApp(ResAggApp):
if not colls:
return routes
self.default_archive_paths = self.config.get('archive_paths')
for name, coll_config in iteritems(colls):
handler = self.load_coll(name, coll_config)
try:
handler = self.load_coll(name, coll_config)
except:
print('Invalid Collection: ' + name)
continue
routes[name] = handler
return routes
@ -132,10 +136,15 @@ class AutoConfigApp(ResAggApp):
def load_coll(self, name, coll_config):
if isinstance(coll_config, str):
index = coll_config
resource = None
resource = None
elif isinstance(coll_config, dict):
index = coll_config.get('index')
if not index:
index = coll_config.get('index_paths')
resource = coll_config.get('resource')
if not resource:
resource = coll_config.get('archive_paths')
else:
raise Exception('collection config must be string or dict')
@ -154,10 +163,12 @@ class AutoConfigApp(ResAggApp):
if not index_group:
raise Exception('no index, index_group or sequence found')
timeout = int(coll_config.get('timeout', 0))
agg = init_index_agg(index_group, True, timeout)
if not resource:
resource = self.default_archive_paths
return DefaultResourceHandler(agg, resource)
def init_sequence(self, coll_name, seq_config):
@ -170,7 +181,7 @@ class AutoConfigApp(ResAggApp):
if not isinstance(entry, dict):
raise Exception('"sequence" entry must be a dict')
name = entry.get('name')
name = entry.get('name', '')
handler = self.load_coll(name, entry)
handlers.append(handler)

View File

@ -100,7 +100,10 @@ class IndexHandler(object):
output = params.get('output', self.DEF_OUTPUT)
fields = params.get('fields')
handler = self.OUTPUTS.get(output)
if fields and isinstance(fields, str):
fields = fields.split(',')
handler = self.OUTPUTS.get(output, fields)
if not handler:
errs = dict(last_exc=BadRequestException('output={0} not supported'.format(output)))
return None, None, errs

View File

@ -53,9 +53,10 @@ class BaseLoader(object):
return out_headers, StreamIter(stream)
out_headers['Link'] = MementoUtils.make_link(
warc_headers.get_header('WARC-Target-URI'),
'original')
target_uri = warc_headers.get_header('WARC-Target-URI')
out_headers['WARC-Target-URI'] = target_uri
out_headers['Link'] = MementoUtils.make_link(target_uri, 'original')
memento_dt = iso_date_to_datetime(warc_headers.get_header('WARC-Date'))
out_headers['Memento-Datetime'] = datetime_to_http_date(memento_dt)
@ -315,7 +316,10 @@ class LiveWebLoader(BaseLoader):
data = input_req.get_req_body()
p = PreparedRequest()
p.prepare_url(load_url, None)
try:
p.prepare_url(load_url, None)
except:
raise LiveResourceException(load_url)
p.prepare_headers(None)
p.prepare_auth(None, load_url)

View File

@ -86,7 +86,6 @@ class MementoUtils(object):
return memento.format(url, rel, datetime, cdx.get('source', ''))
@staticmethod
def make_timemap(cdx_iter):
# get first memento as it'll be used for 'from' field
@ -116,6 +115,10 @@ class MementoUtils(object):
def make_link(url, type):
return '<{0}>; rel="{1}"'.format(url, type)
@staticmethod
def make_memento_link(url, type, dt):
return '<{0}>; rel="{1}"; datetime="{2}"'.format(url, type, dt)
#=============================================================================
class ParamFormatter(string.Formatter):

19
tests/base_config_test.py Normal file
View File

@ -0,0 +1,19 @@
from gevent import monkey; monkey.patch_all(thread=False)
from webtest import TestApp
from pywb.webagg.test.testutils import BaseTestClass
from pywb.urlrewrite.frontendapp import FrontEndApp
import os
# ============================================================================
class BaseConfigTest(BaseTestClass):
@classmethod
def setup_class(cls, config_file):
super(BaseConfigTest, cls).setup_class()
config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file)
cls.testapp = TestApp(FrontEndApp(config_file=config_file))

33
tests/config_test.yaml Normal file
View File

@ -0,0 +1,33 @@
# pywb config file
debug: true
collections:
pywb: ./sample_archive/cdx/
# live collection
live: $live
# coll with fallback
pywb-fallback:
sequence:
-
index: ./sample_archive/cdx/
name: local
-
index: $live
#pywb-norange:
# index_paths: ./sample_archive/cdx/
# enable_ranges: false
pywb-cdxj:
index_paths: ./sample_archive/cdxj/
archive_paths:
- ./invalid/path/to/ignore/
- ./sample_archive/warcs/

View File

@ -1,247 +1,240 @@
from gevent import monkey; monkey.patch_all(thread=False)
import re
import webtest
import json
import os
from webtest import TestApp
from six.moves.urllib.parse import urlencode
from pywb.cdx.cdxobject import CDXObject
from pywb.apps.cdx_server import application
import pytest
import json
from pywb.webagg.test.testutils import BaseTestClass
from pywb.webagg.autoapp import AutoConfigApp
#================================================================
@pytest.fixture
def client():
return webtest.TestApp(application)
# ============================================================================
class TestCDXApp(BaseTestClass):
@classmethod
def setup_class(cls):
super(TestCDXApp, cls).setup_class()
config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config_test.yaml')
cls.testapp = TestApp(AutoConfigApp(config_file=config_file))
def query(self, url, is_error=False, **params):
params['url'] = url
return self.testapp.get('/pywb-cdx?' + urlencode(params, doseq=1), expect_errors=is_error)
def test_exact_url(self):
"""
basic exact match, no filters, etc.
"""
resp = self.query('http://www.iana.org/')
assert resp.status_code == 200
assert len(resp.text.splitlines()) == 3, resp.text
def test_exact_url_json(self):
"""
basic exact match, no filters, etc.
"""
resp = self.query('http://www.iana.org/', output='json')
assert resp.status_code == 200
lines = resp.text.splitlines()
assert len(lines) == 3, resp.text
assert len(list(map(json.loads, lines))) == 3
def test_prefix_match(self):
"""
prefix match test
"""
resp = self.query('http://www.iana.org/', matchType='prefix')
assert resp.status_code == 200
suburls = 0
for l in resp.text.splitlines():
fields = l.split(' ')
if len(fields[0]) > len('org,iana)/'):
suburls += 1
assert suburls > 0
def test_filters(self):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
filter=('mime:warc/revisit', 'filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
for l in resp.text.splitlines():
cdx = CDXObject(l.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['mime'] == 'warc/revisit'
assert cdx['filename'] == 'dupes.warc.gz'
def test_limit(self):
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
limit='1')
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
cdx = CDXObject(cdxes[0].encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140126200625'
assert cdx['mime'] == 'text/css'
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
limit='1', reverse='1')
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
cdx = CDXObject(cdxes[0].encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140127171239'
assert cdx['mime'] == 'warc/revisit'
def test_fields(self):
"""
retrieve subset of fields with ``fields`` parameter.
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,status')
assert resp.status_code == 200
cdxes = resp.text.splitlines()
for cdx in cdxes:
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', cdx['timestamp'])
assert re.match(r'\d{3}|-', cdx['status'])
def test_fields_json(self):
"""
retrieve subset of fields with ``fields`` parameter, in json
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,status',
output='json')
assert resp.status_code == 200
cdxes = resp.text.splitlines()
for cdx in cdxes:
print(cdx)
fields = json.loads(cdx)
assert len(fields) == 3
assert fields['urlkey'] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', fields['timestamp'])
assert re.match(r'\d{3}|-', fields['status'])
def test_fields_undefined(self):
"""
server shall respond with Bad Request and name of undefined
when ``fields`` parameter contains undefined name(s).
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,nosuchfield')
resp.status_code == 400
def test_fields_undefined_json(self):
"""
server shall respond with Bad Request and name of undefined
when ``fields`` parameter contains undefined name(s).
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,nosuchfield',
output='json')
resp.status_code == 400
def test_resolveRevisits(self):
"""
with ``resolveRevisits=true``, server adds three fields pointing to
the *original* capture.
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='true'
)
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
originals = {}
for cdx in cdxes:
cdx = CDXObject(cdx.encode('utf-8'))
assert len(cdx) == 15
# orig.* fields are either all '-' or (int, int, filename)
# check if orig.* fields are equals to corresponding fields
# for the original capture.
sha = cdx['digest']
if cdx['orig.length'] == '-':
assert cdx['orig.offset'] == '-' and cdx['orig.filename'] == '-'
originals[sha] = (int(cdx['length']), int(cdx['offset']), cdx['filename'])
else:
orig = originals.get(sha)
assert orig == (int(cdx['orig.length']), int(cdx['orig.offset']), cdx['orig.filename'])
def test_resolveRevisits_orig_fields(self):
"""
when resolveRevisits=true, extra three fields are named
``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
it is possible to filter fields by these names.
"""
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='1',
fields='urlkey,orig.length,orig.offset,orig.filename'
)
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
cdx = cdxes[0]
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['orig.offset'] == '-'
assert cdx['orig.length'] == '-'
assert cdx['orig.filename'] == '-'
for cdx in cdxes[1:]:
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['orig.offset'] != '-'
assert cdx['orig.length'] != '-'
assert cdx['orig.filename'] == 'iana.warc.gz'
def test_collapseTime_resolveRevisits_reverse(self):
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
collapseTime='11',
resolveRevisits='true',
reverse='true'
)
cdxes = [CDXObject(l) for l in resp.body.splitlines()]
assert len(cdxes) == 3
# timestamp is in descending order
for i in range(len(cdxes) - 1):
assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']
#================================================================
def query(client, url, is_error=False, **params):
params['url'] = url
return client.get('/pywb-cdx?' + urlencode(params, doseq=1), expect_errors=is_error)
#================================================================
def test_exact_url(client):
"""
basic exact match, no filters, etc.
"""
resp = query(client, 'http://www.iana.org/')
assert resp.status_code == 200
assert len(resp.text.splitlines()) == 3, resp.text
#================================================================
def test_exact_url_json(client):
"""
basic exact match, no filters, etc.
"""
resp = query(client, 'http://www.iana.org/', output='json')
assert resp.status_code == 200
lines = resp.text.splitlines()
assert len(lines) == 3, resp.text
assert len(list(map(json.loads, lines))) == 3
#================================================================
def test_prefix_match(client):
"""
prefix match test
"""
resp = query(client, 'http://www.iana.org/', matchType='prefix')
print(resp.text.splitlines())
assert resp.status_code == 200
suburls = 0
for l in resp.text.splitlines():
fields = l.split(' ')
if len(fields[0]) > len('org,iana)/'):
suburls += 1
assert suburls > 0
#================================================================
def test_filters(client):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
filter=('mime:warc/revisit', 'filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
for l in resp.text.splitlines():
fields = l.split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[3] == 'warc/revisit'
assert fields[10] == 'dupes.warc.gz'
#================================================================
def test_limit(client):
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
limit='1')
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[1] == '20140126200625'
assert fields[3] == 'text/css'
resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css',
limit='1', reverse='1')
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.text.splitlines()
assert len(cdxes) == 1
fields = cdxes[0].split(' ')
assert fields[0] == 'org,iana)/_css/2013.1/screen.css'
assert fields[1] == '20140127171239'
assert fields[3] == 'warc/revisit'
#================================================================
def test_fields(client):
"""
retrieve subset of fields with ``fields`` parameter.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,status')
assert resp.status_code == 200
cdxes = resp.text.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 3
assert fields[0] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', fields[1])
assert re.match(r'\d{3}|-', fields[2])
#================================================================
def test_fields_json(client):
"""
retrieve subset of fields with ``fields`` parameter, in json
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
fields='urlkey,timestamp,status',
output='json')
assert resp.status_code == 200
cdxes = resp.text.splitlines()
for cdx in cdxes:
fields = json.loads(cdx)
assert len(fields) == 3
assert fields['urlkey'] == 'org,iana)/_css/2013.1/print.css'
assert re.match(r'\d{14}$', fields['timestamp'])
assert re.match(r'\d{3}|-', fields['status'])
#================================================================
def test_fields_undefined(client):
"""
server shall respond with Bad Request and name of undefined
when ``fields`` parameter contains undefined name(s).
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,nosuchfield')
resp.status_code == 400
#================================================================
def test_fields_undefined_json(client):
"""
server shall respond with Bad Request and name of undefined
when ``fields`` parameter contains undefined name(s).
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
is_error=True,
fields='urlkey,nosuchfield',
output='json')
resp.status_code == 400
#================================================================
def test_resolveRevisits(client):
"""
with ``resolveRevisits=true``, server adds three fields pointing to
the *original* capture.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='true'
)
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.text.splitlines()
originals = {}
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 14
(key, ts, url, mt, st, sha, _, _, size, offset, fn,
orig_size, orig_offset, orig_fn) = fields
# orig_* fields are either all '-' or (int, int, filename)
# check if orig_* fields are equals to corresponding fields
# for the original capture.
if orig_size == '-':
assert orig_offset == '-' and orig_fn == '-'
originals[sha] = (int(size), int(offset), fn)
else:
orig = originals.get(sha)
assert orig == (int(orig_size), int(orig_offset), orig_fn)
#================================================================
def test_resolveRevisits_orig_fields(client):
"""
when resolveRevisits=true, extra three fields are named
``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
it is possible to filter fields by these names.
"""
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='1',
fields='urlkey,orig.length,orig.offset,orig.filename'
)
assert resp.status_code == 200
assert resp.content_type == 'text/plain'
cdxes = resp.text.splitlines()
for cdx in cdxes:
fields = cdx.split(' ')
assert len(fields) == 4
key, orig_len, orig_offset, orig_fn = fields
assert (orig_len == '-' and orig_offset == '-' and orig_fn == '-' or
(int(orig_len), int(orig_offset), orig_fn))
#================================================================
def test_collapseTime_resolveRevisits_reverse(client):
resp = query(client, 'http://www.iana.org/_css/2013.1/print.css',
collapseTime='11',
resolveRevisits='true',
reverse='true'
)
cdxes = [CDXObject(l) for l in resp.body.splitlines()]
assert len(cdxes) == 3
# timestamp is in descending order
for i in range(len(cdxes) - 1):
assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']

View File

@ -1,162 +0,0 @@
# pywb config file
# ========================================
#
# Settings for each collection
collections:
# <name>: <cdx_path>
# collection will be accessed via /<name>
# <cdx_path> is a string or list of:
# - string or list of one or more local .cdx file
# - string or list of one or more local dirs with .cdx files
# - a string value indicating remote http cdx server
pywb: ./sample_archive/cdx/
# ex with filtering: filter CDX lines by filename starting with 'dupe'
pywb-filt:
index_paths: './sample_archive/cdx/'
filters: ['filename:dupe*']
pywb-filt-2:
index_paths: './sample_archive/cdx/'
filters: ['!filename:dupe*']
pywb-nonframe:
index_paths: './sample_archive/cdx/'
framed_replay: false
# collection of non-surt CDX
pywb-nosurt:
index_paths: './sample_archive/non-surt-cdx/'
surt_ordered: false
# live collection
live: $liveweb
# coll with fallback
pywb-fallback:
index_paths: ./sample_archive/cdx/
fallback: live
pywb-norange:
index_paths: ./sample_archive/cdx/
enable_ranges: false
pywb-non-exact:
index_paths: ./sample_archive/cdx/
redir_to_exact: false
pywb-cdxj:
index_paths: ./sample_archive/cdxj/
# indicate if cdx files are sorted by SURT keys -- eg: com,example)/
# SURT keys are recommended for future indices, but non-SURT cdxs
# are also supported
#
# * Set to true if cdxs start with surts: com,example)/
# * Set to false if cdx start with urls: example.com)/
surt_ordered: true
# list of paths prefixes for pywb look to 'resolve' WARC and ARC filenames
# in the cdx to their absolute path
#
# if path is:
# * local dir, use path as prefix
# * local file, lookup prefix in tab-delimited sorted index
# * http:// path, use path as remote prefix
# * redis:// path, use redis to lookup full path for w:<warc> as key
archive_paths: ['./invalid/path/to/ignore/', './sample_archive/warcs/']
# ==== Optional UI: HTML/Jinja2 Templates ====
# template for <head> insert into replayed html content
head_insert_html: templates/head_insert.html
# template to for 'calendar' query,
# eg, a listing of captures in response to a ../*/<url>
#
# may be a simple listing or a more complex 'calendar' UI
# if omitted, will list raw cdx in plain text
query_html: templates/query.html
# template for search page, which is displayed when no search url is entered
# in a collection
search_html: templates/search.html
# template for home page.
# if no other route is set, this will be rendered at /, /index.htm and /index.html
home_html: templates/index.html
# error page temlpate for may formatting error message and details
# if omitted, a text response is returned
error_html: templates/error.html
# template for 404 not found error, may be customized per collection
not_found_html: templates/not_found.html
# ==== Other Paths ====
# Rewrite urls with absolute paths instead of relative
absoulte_paths: true
# List of route names:
# <route>: <package or file path>
static_routes:
static/test/route: pywb/static/
static/__pywb: pywb/static/
# Enable simple http proxy mode
enable_http_proxy: true
# Additional proxy options (defaults)
proxy_options:
use_default_coll: pywb
cookie_resolver: false
use_client_rewrite: true
use_wombat: true
#enable coll info JSON
enable_coll_info: true
# enable cdx server api for querying cdx directly (experimental)
#enable_cdx_api: True
# or specify suffix
enable_cdx_api: -cdx
# test different port
port: 9000
# optional reporter callback func
# if set, called with request and cdx object
reporter: !!python/object/new:tests.fixture.PrintReporter []
# custom rules for domain specific matching
#domain_specific_rules: rules.yaml
# Use lxml parser, if available
# use_lxml_parser: true
# Replay content in an iframe
framed_replay: true
# ==== New / Experimental Settings ====
# Not yet production ready -- used primarily for testing
#perms_checker: !!python/object/new:pywb.cdx.perms.AllowAllPerms []
perms_policy: !!python/name:tests.perms_fixture.perms_policy
# not testing memento here
enable_memento: False
# Debug Handlers
debug_echo_env: True
debug_echo_req: True

View File

@ -1,16 +1,14 @@
import webtest
from pywb.webapp.pywb_init import create_wb_router
from pywb.framework.wsgi_wrappers import init_app
from .base_config_test import BaseConfigTest
from .memento_fixture import *
from .server_mock import make_setup_module, BaseIntegration
# ============================================================================
class TestMementoFrame(MementoMixin, BaseConfigTest):
@classmethod
def setup_class(cls):
super(TestMementoFrame, cls).setup_class('config_test_frames.yaml')
setup_module = make_setup_module('tests/test_config_frames.yaml')
class TestMementoFrameInverse(MementoMixin, BaseIntegration):
def test_top_frame_replay(self):
def _test_top_frame_replay(self):
resp = self.testapp.get('/pywb/20140127171238/http://www.iana.org/')
# Memento Headers

View File

@ -1,15 +1,13 @@
from pytest import raises
from .base_config_test import BaseConfigTest
from pywb.cdx.cdxobject import CDXObject
from pywb.utils.timeutils import timestamp_now
from .server_mock import make_setup_module, BaseIntegration
setup_module = make_setup_module('tests/test_config.yaml')
class TestWbIntegration(BaseIntegration):
#def setup(self):
# self.app = app
# self.testapp = testapp
# ============================================================================
class TestWbIntegration(BaseConfigTest):
@classmethod
def setup_class(cls):
super(TestWbIntegration, cls).setup_class('config_test.yaml')
def _assert_basic_html(self, resp):
assert resp.status_int == 200
@ -47,7 +45,7 @@ class TestWbIntegration(BaseIntegration):
# 3 Captures + header
assert len(resp.html.find_all('tr')) == 4
def test_calendar_query_filtered(self):
def test_calendar_query_2(self):
# unfiltered collection
resp = self.testapp.get('/pywb/*/http://www.iana.org/_css/2013.1/screen.css')
self._assert_basic_html(resp)
@ -55,10 +53,10 @@ class TestWbIntegration(BaseIntegration):
assert len(resp.html.find_all('tr')) == 18
# filtered collection
resp = self.testapp.get('/pywb-filt/*/http://www.iana.org/_css/2013.1/screen.css')
self._assert_basic_html(resp)
#resp = self.testapp.get('/pywb-filt/*/http://www.iana.org/_css/2013.1/screen.css')
#self._assert_basic_html(resp)
# 1 Capture (filtered) + header
assert len(resp.html.find_all('tr')) == 2
#assert len(resp.html.find_all('tr')) == 2
def test_calendar_query_fuzzy_match(self):
# fuzzy match removing _= according to standard rules.yaml
@ -74,7 +72,7 @@ class TestWbIntegration(BaseIntegration):
assert 'No captures found' in resp.text, resp.text
assert len(resp.html.find_all('tr')) == 0
def test_cdx_query(self):
def _test_cdx_query(self):
resp = self.testapp.get('/pywb/cdx_/*/http://www.iana.org/')
self._assert_basic_text(resp)
@ -84,74 +82,74 @@ class TestWbIntegration(BaseIntegration):
assert actual_len == 3, actual_len
def test_replay_top_frame(self):
resp = self.testapp.get('/pywb/20140127171238tf_/http://www.iana.org/')
resp = self.testapp.get('/pywb/20140127171238/http://www.iana.org/')
assert '<iframe ' in resp.text
assert '/pywb/20140127171238/http://www.iana.org/' in resp.text, resp.text
assert '/pywb/20140127171238mp_/http://www.iana.org/' in resp.text, resp.text
def test_replay_content(self):
resp = self.testapp.get('/pywb/20140127171238/http://www.iana.org/')
resp = self.testapp.get('/pywb/20140127171238mp_/http://www.iana.org/')
self._assert_basic_html(resp)
assert '"20140127171238"' in resp.text
assert '"20140127171238"' in resp.text, resp.text
assert 'wb.js' in resp.text
assert 'new _WBWombat' in resp.text, resp.text
assert '/pywb/20140127171238/http://www.iana.org/time-zones"' in resp.text
assert '/pywb/20140127171238mp_/http://www.iana.org/time-zones"' in resp.text
def test_replay_non_frame_content(self):
resp = self.testapp.get('/pywb-nonframe/20140127171238/http://www.iana.org/')
self._assert_basic_html(resp)
#def test_replay_non_frame_content(self):
# resp = self.testapp.get('/pywb-nonframe/20140127171238/http://www.iana.org/')
# self._assert_basic_html(resp)
assert '"20140127171238"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-nonframe/20140127171238/http://www.iana.org/time-zones"' in resp.text
# assert '"20140127171238"' in resp.text
# assert 'wb.js' in resp.text
# assert '/pywb-nonframe/20140127171238/http://www.iana.org/time-zones"' in resp.text
def test_replay_non_surt(self):
resp = self.testapp.get('/pywb-nosurt/20140103030321/http://example.com?example=1')
self._assert_basic_html(resp)
#def test_replay_non_surt(self):
# resp = self.testapp.get('/pywb-nosurt/20140103030321/http://example.com?example=1')
# self._assert_basic_html(resp)
assert '"20140103030321"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-nosurt/20140103030321/http://www.iana.org/domains/example' in resp.text
# assert '"20140103030321"' in resp.text
# assert 'wb.js' in resp.text
# assert '/pywb-nosurt/20140103030321/http://www.iana.org/domains/example' in resp.text
def test_replay_cdxj(self):
resp = self.testapp.get('/pywb-cdxj/20140103030321/http://example.com?example=1')
resp = self.testapp.get('/pywb-cdxj/20140103030321mp_/http://example.com?example=1')
self._assert_basic_html(resp)
assert '"20140103030321"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-cdxj/20140103030321/http://www.iana.org/domains/example' in resp.text
assert '/pywb-cdxj/20140103030321mp_/http://www.iana.org/domains/example' in resp.text
def test_replay_cdxj_revisit(self):
resp = self.testapp.get('/pywb-cdxj/20140103030341/http://example.com?example=1')
resp = self.testapp.get('/pywb-cdxj/20140103030341mp_/http://example.com?example=1')
self._assert_basic_html(resp)
assert '"20140103030341"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb-cdxj/20140103030341/http://www.iana.org/domains/example' in resp.text
assert '/pywb-cdxj/20140103030341mp_/http://www.iana.org/domains/example' in resp.text
def test_zero_len_revisit(self):
resp = self.testapp.get('/pywb/20140603030341/http://example.com?example=2')
resp = self.testapp.get('/pywb/20140603030341mp_/http://example.com?example=2')
self._assert_basic_html(resp)
assert '"20140603030341"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb/20140603030341/http://www.iana.org/domains/example' in resp.text
assert '/pywb/20140603030341mp_/http://www.iana.org/domains/example' in resp.text
def test_replay_url_agnostic_revisit(self):
resp = self.testapp.get('/pywb/20130729195151/http://www.example.com/')
resp = self.testapp.get('/pywb/20130729195151mp_/http://www.example.com/')
self._assert_basic_html(resp)
assert '"20130729195151"' in resp.text
assert 'wb.js' in resp.text
assert '/pywb/20130729195151/http://www.iana.org/domains/example"' in resp.text
assert '/pywb/20130729195151mp_/http://www.iana.org/domains/example"' in resp.text
def test_video_info_not_found(self):
# not actually archived, but ensure video info path is tested
resp = self.testapp.get('/pywb/vi_/https://www.youtube.com/watch?v=DjFZyFWSt1M', status=404)
assert resp.status_int == 404
def test_replay_cdx_mod(self):
def _test_replay_cdx_mod(self):
resp = self.testapp.get('/pywb/20140127171239cdx_/http://www.iana.org/_css/2013.1/print.css')
self._assert_basic_text(resp)
@ -184,7 +182,7 @@ class TestWbIntegration(BaseIntegration):
# original unrewritten url present
assert '"http://www.iana.org/domains/example"' in resp.text
def test_replay_range_cache_content(self):
def _test_replay_range_cache_content(self):
headers = [('Range', 'bytes=0-200')]
resp = self.testapp.get('/pywb/20140127171250id_/http://example.com', headers=headers)
@ -195,7 +193,7 @@ class TestWbIntegration(BaseIntegration):
assert 'wb.js' not in resp.text
def test_replay_content_ignore_range(self):
def _test_replay_content_ignore_range(self):
headers = [('Range', 'bytes=0-200')]
resp = self.testapp.get('/pywb-norange/20140127171251id_/http://example.com', headers=headers)
@ -208,7 +206,7 @@ class TestWbIntegration(BaseIntegration):
# identity, no header insertion
assert 'wb.js' not in resp.text
def test_replay_range_cache_content_bound_end(self):
def _test_replay_range_cache_content_bound_end(self):
headers = [('Range', 'bytes=10-10000')]
resp = self.testapp.get('/pywb/20140127171251id_/http://example.com', headers=headers)
@ -220,12 +218,12 @@ class TestWbIntegration(BaseIntegration):
assert 'wb.js' not in resp.text
def test_replay_redir_no_cache(self):
def _test_replay_redir_no_cache(self):
headers = [('Range', 'bytes=10-10000')]
# Range ignored
resp = self.testapp.get('/pywb/20140126200927/http://www.iana.org/domains/root/db/', headers=headers)
assert resp.content_length == 0
assert resp.status_int == 302
assert resp.content_length == 0
def test_replay_identity_2_arcgz(self):
resp = self.testapp.get('/pywb/20140216050221id_/http://arc.gz.test.example.com')
@ -247,7 +245,7 @@ class TestWbIntegration(BaseIntegration):
def test_replay_content_length_1(self):
# test larger file, rewritten file (svg!)
resp = self.testapp.get('/pywb/20140126200654/http://www.iana.org/_img/2013.1/rir-map.svg')
resp = self.testapp.get('/pywb/20140126200654mp_/http://www.iana.org/_img/2013.1/rir-map.svg')
assert resp.headers['Content-Length'] == str(len(resp.text))
def test_replay_css_mod(self):
@ -262,84 +260,72 @@ class TestWbIntegration(BaseIntegration):
assert resp.content_length == 0
assert resp.content_type == 'application/x-javascript'
def test_redirect_exact(self):
resp = self.testapp.get('/pywb/20140127171237/http://www.iana.org/')
assert resp.status_int == 302
#def test_redirect_exact(self):
# resp = self.testapp.get('/pywb/20140127171237/http://www.iana.org/')
# assert resp.status_int == 302
assert resp.headers['Location'].endswith('/pywb/20140127171238/http://iana.org')
# assert resp.headers['Location'].endswith('/pywb/20140127171238/http://iana.org')
def test_no_redirect_non_exact(self):
def test_replay_non_exact(self):
# non-exact mode, don't redirect to exact capture
resp = self.testapp.get('/pywb-non-exact/20140127171237/http://www.iana.org/')
resp = self.testapp.get('/pywb/20140127171237mp_/http://www.iana.org/')
assert resp.status_int == 200
self._assert_basic_html(resp)
assert '"20140127171237"' in resp.text
# actual timestamp set in JS
assert 'timestamp = "20140127171238"' in resp.text
assert '/pywb-non-exact/20140127171237/http://www.iana.org/about/' in resp.text
assert '/pywb/20140127171237mp_/http://www.iana.org/about/' in resp.text
def test_redirect_latest_replay(self):
resp = self.testapp.get('/pywb/http://example.com/')
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/20140127171251/http://example.com')
resp = resp.follow()
#check resp
def test_latest_replay(self):
resp = self.testapp.get('/pywb/mp_/http://example.com/')
self._assert_basic_html(resp)
assert '"20140127171251"' in resp.text
assert '/pywb/20140127171251/http://www.iana.org/domains/example' in resp.text
def test_redirect_non_exact_latest_replay_ts(self):
resp = self.testapp.get('/pywb-non-exact/http://example.com/')
assert resp.headers['Content-Location'].endswith('/20140127171251mp_/http://example.com')
assert '"20140127171251"' in resp.text
assert '/pywb/mp_/http://www.iana.org/domains/example' in resp.text
def test_replay_non_latest_content_location_ts(self):
resp = self.testapp.get('/pywb/mp_/http://example.com/')
assert resp.status_int == 200
assert resp.headers['Content-Location'].endswith('/http://example.com')
# extract ts, which should be current time
ts = resp.headers['Content-Location'].rsplit('/http://')[0].rsplit('/', 1)[-1]
assert ts == '20140127171251'
assert ts == '20140127171251mp_'
ts = ts[:-3]
#resp = resp.follow()
#self._assert_basic_html(resp)
# ensure the current ts is present in the links
assert '"{0}"'.format(ts) in resp.text
assert '/pywb-non-exact/http://www.iana.org/domains/example' in resp.text
assert '/pywb/mp_/http://www.iana.org/domains/example' in resp.text
# ensure ts is current ts
#assert timestamp_now() >= ts, ts
def test_redirect_relative_3(self):
def test_refer_redirect(self):
# webtest uses Host: localhost:80 by default
# first two requests should result in same redirect
target = 'http://localhost:80/pywb/2014/http://iana.org/_css/2013.1/screen.css'
target = 'http://localhost:80/pywb/2014mp_/http://iana.org/_css/2013.1/screen.css'
# without timestamp
resp = self.testapp.get('/_css/2013.1/screen.css', headers = [('Referer', 'http://localhost:80/pywb/2014/http://iana.org/')])
assert resp.status_int == 302
resp = self.testapp.get('/_css/2013.1/screen.css', headers = [('Referer', 'http://localhost:80/pywb/2014mp_/http://iana.org/')])
assert resp.status_int == 307
assert resp.headers['Location'] == target, resp.headers['Location']
# with timestamp
resp = self.testapp.get('/2014/_css/2013.1/screen.css', headers = [('Referer', 'http://localhost:80/pywb/2014/http://iana.org/')])
assert resp.status_int == 302
assert resp.headers['Location'] == target, resp.headers['Location']
resp = resp.follow()
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/pywb/20140127171239/http://www.iana.org/_css/2013.1/screen.css')
resp = resp.follow()
assert resp.status_int == 200
assert resp.headers['Content-Location'].endswith('/pywb/20140127171239mp_/http://www.iana.org/_css/2013.1/screen.css')
assert resp.content_type == 'text/css'
def test_rel_self_redirect(self):
uri = '/pywb/20140126200927/http://www.iana.org/domains/root/db'
resp = self.testapp.get(uri, status=302)
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/pywb/20140126200928/http://www.iana.org/domains/root/db')
def test_non_exact_replay_skip_self_redir(self):
uri = '/pywb/20140126200927mp_/http://www.iana.org/domains/root/db'
resp = self.testapp.get(uri)
assert resp.status_int == 200
assert resp.headers['Content-Location'].endswith('/pywb/20140126200928mp_/http://www.iana.org/domains/root/db')
#def test_referrer_self_redirect(self):
# uri = '/pywb/20140127171239/http://www.iana.org/_css/2013.1/screen.css'
@ -355,43 +341,43 @@ class TestWbIntegration(BaseIntegration):
# assert resp.status_int == 302
def test_not_existant_warc_other_capture(self):
resp = self.testapp.get('/pywb/20140703030321/http://example.com?example=2')
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/pywb/20140603030341/http://example.com?example=2')
resp = self.testapp.get('/pywb/20140703030321mp_/http://example.com?example=2')
assert resp.status_int == 200
assert resp.headers['Content-Location'].endswith('/pywb/20140603030341mp_/http://example.com?example=2')
def test_missing_revisit_other_capture(self):
resp = self.testapp.get('/pywb/20140603030351/http://example.com?example=2')
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/pywb/20140603030341/http://example.com?example=2')
resp = self.testapp.get('/pywb/20140603030351mp_/http://example.com?example=2')
assert resp.status_int == 200
assert resp.headers['Content-Location'].endswith('/pywb/20140603030341mp_/http://example.com?example=2')
def test_not_existant_warc_no_other(self):
resp = self.testapp.get('/pywb/20140703030321/http://example.com?example=3', status = 503)
resp = self.testapp.get('/pywb/20140703030321mp_/http://example.com?example=3', status=503)
assert resp.status_int == 503
def test_missing_revisit_no_other(self):
resp = self.testapp.get('/pywb/20140603030351/http://example.com?example=3', status = 503)
resp = self.testapp.get('/pywb/20140603030351mp_/http://example.com?example=3', status=503)
assert resp.status_int == 503
def test_live_frame(self):
resp = self.testapp.get('/live/http://example.com/?test=test')
assert resp.status_int == 200
def test_live_redir_1(self):
def _test_live_redir_1(self):
resp = self.testapp.get('/live/*/http://example.com/?test=test')
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/live/http://example.com/?test=test')
def test_live_redir_2(self):
def _test_live_redir_2(self):
resp = self.testapp.get('/live/2010-2011/http://example.com/?test=test')
assert resp.status_int == 302
assert resp.headers['Location'].endswith('/live/http://example.com/?test=test')
def test_live_fallback(self):
resp = self.testapp.get('/pywb-fallback//http://example.com/?test=test')
resp = self.testapp.get('/pywb-fallback/mp_/http://example.com/?test=test')
assert resp.status_int == 200
def test_post_1(self):
resp = self.testapp.post('/pywb/httpbin.org/post', {'foo': 'bar', 'test': 'abc'})
resp = self.testapp.post('/pywb/mp_/httpbin.org/post', {'foo': 'bar', 'test': 'abc'})
# no redirects for POST, as some browsers (FF) show modal confirmation dialog!
#assert resp.status_int == 307
@ -406,56 +392,55 @@ class TestWbIntegration(BaseIntegration):
assert '"test": "abc"' in resp.text
def test_post_2(self):
resp = self.testapp.post('/pywb/20140610001255/http://httpbin.org/post?foo=bar', {'data': '^'})
resp = self.testapp.post('/pywb/20140610001255mp_/http://httpbin.org/post?foo=bar', {'data': '^'})
assert resp.status_int == 200
assert '"data": "^"' in resp.text
def test_post_invalid(self):
# not json
resp = self.testapp.post_json('/pywb/20140610001255/http://httpbin.org/post?foo=bar', {'data': '^'}, status=404)
resp = self.testapp.post_json('/pywb/20140610001255mp_/http://httpbin.org/post?foo=bar', {'data': '^'}, status=404)
assert resp.status_int == 404
def test_post_redirect(self):
# post handled without redirect (since 307 not allowed)
resp = self.testapp.post('/post', {'foo': 'bar', 'test': 'abc'}, headers=[('Referer', 'http://localhost:80/pywb/2014/http://httpbin.org/post')])
assert resp.status_int == 200
assert '"foo": "bar"' in resp.text
assert '"test": "abc"' in resp.text
def test_post_referer_redirect(self):
# allowing 307 redirects
resp = self.testapp.post('/post', {'foo': 'bar', 'test': 'abc'}, headers=[('Referer', 'http://localhost:80/pywb/2014mp_/http://httpbin.org/foo')])
assert resp.status_int == 307
assert resp.headers['Location'].endswith('/pywb/2014mp_/http://httpbin.org/post')
def test_excluded_content(self):
resp = self.testapp.get('/pywb/http://www.iana.org/_img/bookmark_icon.ico', status=403)
def _test_excluded_content(self):
resp = self.testapp.get('/pywb/mp_/http://www.iana.org/_img/bookmark_icon.ico', status=403)
assert resp.status_int == 403
assert 'Excluded' in resp.text
def test_replay_not_found(self):
resp = self.testapp.head('/pywb/http://not-exist.example.com', status=404)
resp = self.testapp.head('/pywb/mp_/http://not-exist.example.com', status=404)
assert resp.content_type == 'text/html'
assert resp.status_int == 404
def test_static_content(self):
resp = self.testapp.get('/static/test/route/wb.css')
resp = self.testapp.get('/static/__pywb/wb.css')
assert resp.status_int == 200
assert resp.content_type == 'text/css'
assert resp.content_length > 0
def test_static_content_filewrapper(self):
from wsgiref.util import FileWrapper
resp = self.testapp.get('/static/test/route/wb.css', extra_environ = {'wsgi.file_wrapper': FileWrapper})
resp = self.testapp.get('/static/__pywb/wb.css', extra_environ = {'wsgi.file_wrapper': FileWrapper})
assert resp.status_int == 200
assert resp.content_type == 'text/css'
assert resp.content_length > 0
def test_static_not_found(self):
resp = self.testapp.get('/static/test/route/notfound.css', status = 404)
resp = self.testapp.get('/static/__pywb/notfound.css', status = 404)
assert resp.status_int == 404
def test_cdx_server_filters(self):
def _test_cdx_server_filters(self):
resp = self.testapp.get('/pywb-cdx?url=http://www.iana.org/_css/2013.1/screen.css&filter=mime:warc/revisit&filter=filename:dupes.warc.gz')
self._assert_basic_text(resp)
actual_len = len(resp.text.rstrip().split('\n'))
assert actual_len == 1, actual_len
def test_cdx_server_advanced(self):
def _test_cdx_server_advanced(self):
# combine collapsing, reversing and revisit resolving
resp = self.testapp.get('/pywb-cdx?url=http://www.iana.org/_css/2013.1/print.css&collapseTime=11&resolveRevisits=true&reverse=true')
@ -482,7 +467,9 @@ class TestWbIntegration(BaseIntegration):
def test_coll_info_json(self):
resp = self.testapp.get('/collinfo.json')
assert resp.content_type == 'application/json'
assert len(resp.json) == 9
value = resp.json
assert len(value['fixed']) == 4
assert len(value['dynamic']) == 0
#def test_invalid_config(self):
# with raises(IOError):

View File

@ -1,30 +1,11 @@
from pywb.webapp.live_rewrite_handler import RewriteHandler
from pywb.apps.cli import LiveCli
from pywb.framework.wsgi_wrappers import init_app
import webtest
import pywb.rewrite.rewrite_live
#=================================================================
class MockYTDWrapper(object):
def extract_info(self, url):
return {'mock': 'youtube_dl_data'}
from .base_config_test import BaseConfigTest
pywb.rewrite.rewrite_live.youtubedl = MockYTDWrapper()
def setup_module():
global app
global testapp
app = LiveCli(['-f']).application
testapp = webtest.TestApp(app)
#=================================================================
class TestLiveRewriter:
def setup(self):
self.app = app
self.testapp = testapp
# ============================================================================
class TestLiveRewriter(BaseConfigTest):
@classmethod
def setup_class(cls):
super(TestLiveRewriter, cls).setup_class('config_test.yaml')
def test_live_live_1(self):
headers = [('User-Agent', 'python'), ('Referer', 'http://localhost:80/live/other.example.com')]
@ -61,7 +42,7 @@ class TestLiveRewriter:
def test_live_video_info(self):
resp = self.testapp.get('/live/vi_/https://www.youtube.com/watch?v=DjFZyFWSt1M')
assert resp.status_int == 200
assert resp.content_type == RewriteHandler.YT_DL_TYPE, resp.content_type
assert resp.content_type == 'application/vnd.youtube-dl_formats+json', resp.content_type
def test_deflate(self):
resp = self.testapp.get('/live/mp_/http://httpbin.org/deflate')

View File

View File

@ -0,0 +1,14 @@
collections:
# <name>: <cdx_path>
# collection will be accessed via /<name>
# <cdx_path> is a string or list of:
# - string or list of one or more local .cdx file
# - string or list of one or more local dirs with .cdx files
# - a string value indicating remote http cdx server
pywb: ./sample_archive/cdx/
archive_paths: ./sample_archive/warcs/
enable_memento: true
framed_replay: inverse