mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-24 06:59:52 +01:00
Full Memento (Pattern 2.2) Support (#228)
- memento fixes, fully support memento pattern 2.2 api spec - add timemap endpoints at /timemap/link/<url>, also /timemap/cdxj/<url>, /timemap/json/<url> - include original and timemap links in Link header - correct memento headers for timegate, timemap, memento - support Accept-Datetime header for timegate - Link rel="memento" includes canonical url, matches Content-Location url - tests: update memento tests
This commit is contained in:
parent
6db2a1161d
commit
39b5630f7b
@ -43,6 +43,7 @@ class FrontEndApp(object):
|
|||||||
self.url_map.add(Rule('/static/_/<coll>/<path:filepath>', endpoint=self.serve_static))
|
self.url_map.add(Rule('/static/_/<coll>/<path:filepath>', endpoint=self.serve_static))
|
||||||
self.url_map.add(Rule('/static/<path:filepath>', endpoint=self.serve_static))
|
self.url_map.add(Rule('/static/<path:filepath>', endpoint=self.serve_static))
|
||||||
self.url_map.add(Rule('/<coll>/', endpoint=self.serve_coll_page))
|
self.url_map.add(Rule('/<coll>/', endpoint=self.serve_coll_page))
|
||||||
|
self.url_map.add(Rule('/<coll>/timemap/<timemap_output>/<path:url>', endpoint=self.serve_content))
|
||||||
self.url_map.add(Rule('/<coll>/<path:url>', endpoint=self.serve_content))
|
self.url_map.add(Rule('/<coll>/<path:url>', endpoint=self.serve_content))
|
||||||
self.url_map.add(Rule('/collinfo.json', endpoint=self.serve_listing))
|
self.url_map.add(Rule('/collinfo.json', endpoint=self.serve_listing))
|
||||||
self.url_map.add(Rule('/', endpoint=self.serve_home))
|
self.url_map.add(Rule('/', endpoint=self.serve_home))
|
||||||
@ -116,7 +117,7 @@ class FrontEndApp(object):
|
|||||||
|
|
||||||
return WbResponse.text_response(content, content_type='text/html; charset="utf-8"')
|
return WbResponse.text_response(content, content_type='text/html; charset="utf-8"')
|
||||||
|
|
||||||
def serve_content(self, environ, coll='', url=''):
|
def serve_content(self, environ, coll='', url='', timemap_output=''):
|
||||||
if not self.is_valid_coll(coll):
|
if not self.is_valid_coll(coll):
|
||||||
self.raise_not_found(environ, 'No handler for "/{0}"'.format(coll))
|
self.raise_not_found(environ, 'No handler for "/{0}"'.format(coll))
|
||||||
|
|
||||||
@ -128,6 +129,8 @@ class FrontEndApp(object):
|
|||||||
wb_url_str += '?' + environ.get('QUERY_STRING')
|
wb_url_str += '?' + environ.get('QUERY_STRING')
|
||||||
|
|
||||||
metadata = self.get_metadata(coll)
|
metadata = self.get_metadata(coll)
|
||||||
|
if timemap_output:
|
||||||
|
metadata['output'] = timemap_output
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = self.rewriterapp.render_content(wb_url_str, metadata, environ)
|
response = self.rewriterapp.render_content(wb_url_str, metadata, environ)
|
||||||
|
@ -14,7 +14,7 @@ from pywb.utils.loaders import extract_client_cookie
|
|||||||
from pywb.utils.io import BUFF_SIZE
|
from pywb.utils.io import BUFF_SIZE
|
||||||
from pywb.utils.memento import MementoUtils
|
from pywb.utils.memento import MementoUtils
|
||||||
|
|
||||||
from warcio.timeutils import http_date_to_timestamp
|
from warcio.timeutils import http_date_to_timestamp, timestamp_to_http_date
|
||||||
from warcio.bufferedreaders import BufferedReader
|
from warcio.bufferedreaders import BufferedReader
|
||||||
from warcio.recordloader import ArcWarcRecordLoader
|
from warcio.recordloader import ArcWarcRecordLoader
|
||||||
|
|
||||||
@ -98,23 +98,36 @@ class RewriterApp(object):
|
|||||||
wb_url.mod == self.frame_mod and
|
wb_url.mod == self.frame_mod and
|
||||||
wb_url.is_replay())
|
wb_url.is_replay())
|
||||||
|
|
||||||
|
def _check_accept_dt(self, wb_url, environ):
|
||||||
|
is_timegate = False
|
||||||
|
if wb_url.is_latest_replay():
|
||||||
|
accept_dt = environ.get('HTTP_ACCEPT_DATETIME')
|
||||||
|
is_timegate = True
|
||||||
|
if accept_dt:
|
||||||
|
try:
|
||||||
|
wb_url.timestamp = http_date_to_timestamp(accept_dt)
|
||||||
|
except:
|
||||||
|
raise UpstreamException(400, url=wb_url.url, details='Invalid Accept-Datetime')
|
||||||
|
#return WbResponse.text_response('Invalid Accept-Datetime', status='400 Bad Request')
|
||||||
|
|
||||||
|
wb_url.type = wb_url.REPLAY
|
||||||
|
|
||||||
|
return is_timegate
|
||||||
|
|
||||||
def render_content(self, wb_url, kwargs, environ):
|
def render_content(self, wb_url, kwargs, environ):
|
||||||
wb_url = WbUrl(wb_url)
|
wb_url = WbUrl(wb_url)
|
||||||
|
is_timegate = self._check_accept_dt(wb_url, environ)
|
||||||
|
|
||||||
host_prefix = self.get_host_prefix(environ)
|
host_prefix = self.get_host_prefix(environ)
|
||||||
rel_prefix = self.get_rel_prefix(environ)
|
rel_prefix = self.get_rel_prefix(environ)
|
||||||
full_prefix = host_prefix + rel_prefix
|
full_prefix = host_prefix + rel_prefix
|
||||||
|
|
||||||
resp = self.handle_custom_response(environ, wb_url,
|
response = self.handle_custom_response(environ, wb_url,
|
||||||
full_prefix, host_prefix, kwargs)
|
full_prefix, host_prefix,
|
||||||
if resp is not None:
|
kwargs)
|
||||||
content_type = 'text/html'
|
|
||||||
|
|
||||||
# if not replay outer frame, specify utf-8 charset
|
if response:
|
||||||
if not self.is_framed_replay(wb_url):
|
return self.format_response(response, wb_url, full_prefix, is_timegate)
|
||||||
content_type += '; charset=utf-8'
|
|
||||||
|
|
||||||
return WbResponse.text_response(resp, content_type=content_type)
|
|
||||||
|
|
||||||
is_proxy = ('wsgiprox.proxy_host' in environ)
|
is_proxy = ('wsgiprox.proxy_host' in environ)
|
||||||
|
|
||||||
@ -278,7 +291,9 @@ class RewriterApp(object):
|
|||||||
status_headers.statusline += ' None'
|
status_headers.statusline += ' None'
|
||||||
|
|
||||||
if not is_ajax and self.enable_memento:
|
if not is_ajax and self.enable_memento:
|
||||||
self._add_memento_links(urlrewriter, full_prefix, memento_dt, status_headers)
|
self._add_memento_links(cdx['url'], full_prefix,
|
||||||
|
memento_dt, cdx['timestamp'], status_headers,
|
||||||
|
is_timegate)
|
||||||
|
|
||||||
set_content_loc = True
|
set_content_loc = True
|
||||||
|
|
||||||
@ -295,20 +310,67 @@ class RewriterApp(object):
|
|||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def _add_memento_links(self, urlrewriter, full_prefix, memento_dt, status_headers):
|
def format_response(self, response, wb_url, full_prefix, is_timegate):
|
||||||
wb_url = urlrewriter.wburl
|
memento_ts = None
|
||||||
status_headers.headers.append(('Memento-Datetime', memento_dt))
|
if not isinstance(response, WbResponse):
|
||||||
|
content_type = 'text/html'
|
||||||
|
|
||||||
memento_url = full_prefix + str(wb_url)
|
# if not replay outer frame, specify utf-8 charset
|
||||||
timegate_url = urlrewriter.get_new_url(timestamp='')
|
if not self.is_framed_replay(wb_url):
|
||||||
|
content_type += '; charset=utf-8'
|
||||||
|
else:
|
||||||
|
memento_ts = wb_url.timestamp
|
||||||
|
|
||||||
|
response = WbResponse.text_response(response, content_type=content_type)
|
||||||
|
|
||||||
|
self._add_memento_links(wb_url.url, full_prefix, None, memento_ts,
|
||||||
|
response.status_headers, is_timegate)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _add_memento_links(self, url, full_prefix, memento_dt, memento_ts,
|
||||||
|
status_headers, is_timegate):
|
||||||
|
|
||||||
|
# memento url + header
|
||||||
|
if not memento_dt and memento_ts:
|
||||||
|
memento_dt = timestamp_to_http_date(memento_ts)
|
||||||
|
|
||||||
|
if memento_dt:
|
||||||
|
status_headers.headers.append(('Memento-Datetime', memento_dt))
|
||||||
|
|
||||||
|
memento_url = full_prefix + memento_ts + self.replay_mod
|
||||||
|
memento_url += '/' + url
|
||||||
|
else:
|
||||||
|
memento_url = None
|
||||||
|
|
||||||
|
timegate_url, timemap_url = self._get_timegate_timemap(url, full_prefix)
|
||||||
|
|
||||||
link = []
|
link = []
|
||||||
|
link.append(MementoUtils.make_link(url, 'original'))
|
||||||
link.append(MementoUtils.make_link(timegate_url, 'timegate'))
|
link.append(MementoUtils.make_link(timegate_url, 'timegate'))
|
||||||
link.append(MementoUtils.make_memento_link(memento_url, 'memento', memento_dt))
|
link.append(MementoUtils.make_link(timemap_url, 'timemap'))
|
||||||
|
|
||||||
|
if memento_dt:
|
||||||
|
link.append(MementoUtils.make_memento_link(memento_url, 'memento', memento_dt))
|
||||||
|
|
||||||
link_str = ', '.join(link)
|
link_str = ', '.join(link)
|
||||||
|
|
||||||
status_headers.headers.append(('Link', link_str))
|
status_headers.headers.append(('Link', link_str))
|
||||||
|
|
||||||
|
if is_timegate:
|
||||||
|
status_headers.headers.append(('Vary', 'accept-datetime'))
|
||||||
|
|
||||||
|
def _get_timegate_timemap(self, url, full_prefix):
|
||||||
|
# timegate url
|
||||||
|
timegate_url = full_prefix
|
||||||
|
if self.replay_mod:
|
||||||
|
timegate_url += self.replay_mod + '/'
|
||||||
|
|
||||||
|
timegate_url += url
|
||||||
|
|
||||||
|
# timemap url
|
||||||
|
timemap_url = full_prefix + 'timemap/link/' + url
|
||||||
|
return timegate_url, timemap_url
|
||||||
|
|
||||||
def get_top_url(self, full_prefix, wb_url, cdx, kwargs):
|
def get_top_url(self, full_prefix, wb_url, cdx, kwargs):
|
||||||
top_url = full_prefix
|
top_url = full_prefix
|
||||||
top_url += wb_url.to_str(mod='')
|
top_url += wb_url.to_str(mod='')
|
||||||
@ -389,7 +451,7 @@ class RewriterApp(object):
|
|||||||
def do_query(self, wb_url, kwargs):
|
def do_query(self, wb_url, kwargs):
|
||||||
params = {}
|
params = {}
|
||||||
params['url'] = wb_url.url
|
params['url'] = wb_url.url
|
||||||
params['output'] = 'json'
|
params['output'] = kwargs.get('output', 'json')
|
||||||
params['from'] = wb_url.timestamp
|
params['from'] = wb_url.timestamp
|
||||||
params['to'] = wb_url.end_timestamp
|
params['to'] = wb_url.end_timestamp
|
||||||
|
|
||||||
@ -398,11 +460,37 @@ class RewriterApp(object):
|
|||||||
|
|
||||||
r = requests.get(upstream_url)
|
r = requests.get(upstream_url)
|
||||||
|
|
||||||
return r.text
|
return r
|
||||||
|
|
||||||
def handle_query(self, environ, wb_url, kwargs):
|
def make_timemap(self, wb_url, res, full_prefix):
|
||||||
|
wb_url.type = wb_url.QUERY
|
||||||
|
|
||||||
|
content_type = res.headers.get('Content-Type')
|
||||||
|
text = res.text
|
||||||
|
|
||||||
|
if not res.text:
|
||||||
|
status = '404 Not Found'
|
||||||
|
|
||||||
|
elif res.status_code:
|
||||||
|
status = str(res.status_code) + ' ' + res.reason
|
||||||
|
|
||||||
|
if res.status_code == 200:
|
||||||
|
timegate, timemap = self._get_timegate_timemap(wb_url.url, full_prefix)
|
||||||
|
|
||||||
|
text = MementoUtils.wrap_timemap_header(wb_url.url,
|
||||||
|
timegate,
|
||||||
|
timemap,
|
||||||
|
res.text)
|
||||||
|
return WbResponse.text_response(text,
|
||||||
|
content_type=content_type,
|
||||||
|
status=status)
|
||||||
|
|
||||||
|
def handle_query(self, environ, wb_url, kwargs, full_prefix):
|
||||||
res = self.do_query(wb_url, kwargs)
|
res = self.do_query(wb_url, kwargs)
|
||||||
|
|
||||||
|
if kwargs.get('output'):
|
||||||
|
return self.make_timemap(wb_url, res, full_prefix)
|
||||||
|
|
||||||
def format_cdx(text):
|
def format_cdx(text):
|
||||||
cdx_lines = text.rstrip().split('\n')
|
cdx_lines = text.rstrip().split('\n')
|
||||||
for cdx in cdx_lines:
|
for cdx in cdx_lines:
|
||||||
@ -417,7 +505,7 @@ class RewriterApp(object):
|
|||||||
|
|
||||||
params = dict(url=wb_url.url,
|
params = dict(url=wb_url.url,
|
||||||
prefix=prefix,
|
prefix=prefix,
|
||||||
cdx_lines=list(format_cdx(res)))
|
cdx_lines=list(format_cdx(res.text)))
|
||||||
|
|
||||||
extra_params = self.get_query_params(wb_url, kwargs)
|
extra_params = self.get_query_params(wb_url, kwargs)
|
||||||
if extra_params:
|
if extra_params:
|
||||||
@ -506,8 +594,8 @@ class RewriterApp(object):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def handle_custom_response(self, environ, wb_url, full_prefix, host_prefix, kwargs):
|
def handle_custom_response(self, environ, wb_url, full_prefix, host_prefix, kwargs):
|
||||||
if wb_url.is_query():
|
if wb_url.is_query() or kwargs.get('output'):
|
||||||
return self.handle_query(environ, wb_url, kwargs)
|
return self.handle_query(environ, wb_url, kwargs, full_prefix)
|
||||||
|
|
||||||
if self.is_framed_replay(wb_url):
|
if self.is_framed_replay(wb_url):
|
||||||
extra_params = self.get_top_frame_params(wb_url, kwargs)
|
extra_params = self.get_top_frame_params(wb_url, kwargs)
|
||||||
|
@ -11,6 +11,9 @@ LINK_SEG_SPLIT = re.compile(';\s*')
|
|||||||
LINK_URL = re.compile('<(.*)>')
|
LINK_URL = re.compile('<(.*)>')
|
||||||
LINK_PROP = re.compile('([\w]+)="([^"]+)')
|
LINK_PROP = re.compile('([\w]+)="([^"]+)')
|
||||||
|
|
||||||
|
FIND_DT = re.compile('datetime=\"([^\"]+)\"')
|
||||||
|
|
||||||
|
|
||||||
#=============================================================================
|
#=============================================================================
|
||||||
class MementoException(BadRequestException):
|
class MementoException(BadRequestException):
|
||||||
pass
|
pass
|
||||||
@ -18,8 +21,8 @@ class MementoException(BadRequestException):
|
|||||||
|
|
||||||
#=============================================================================
|
#=============================================================================
|
||||||
class MementoUtils(object):
|
class MementoUtils(object):
|
||||||
@staticmethod
|
@classmethod
|
||||||
def parse_links(link_header, def_name='timemap'):
|
def parse_links(cls, link_header, def_name='timemap'):
|
||||||
links = LINK_SPLIT.split(link_header)
|
links = LINK_SPLIT.split(link_header)
|
||||||
results = {}
|
results = {}
|
||||||
mementos = []
|
mementos = []
|
||||||
@ -61,9 +64,9 @@ class MementoUtils(object):
|
|||||||
results['mementos'] = mementos
|
results['mementos'] = mementos
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def make_timemap_memento_link(cdx, datetime=None, rel='memento', end=',\n'):
|
def make_timemap_memento_link(cls, cdx, datetime=None, rel='memento', end=',\n'):
|
||||||
url = cdx.get('load_url')
|
url = cdx.get('url')
|
||||||
if not url:
|
if not url:
|
||||||
url = 'file://{0}:{1}:{2}'.format(cdx.get('filename'), cdx.get('offset'), cdx.get('length'))
|
url = 'file://{0}:{1}:{2}'.format(cdx.get('filename'), cdx.get('offset'), cdx.get('length'))
|
||||||
|
|
||||||
@ -74,37 +77,43 @@ class MementoUtils(object):
|
|||||||
|
|
||||||
return memento.format(url, rel, datetime, cdx.get('source', ''))
|
return memento.format(url, rel, datetime, cdx.get('source', ''))
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def make_timemap(cdx_iter):
|
def make_timemap(cls, cdx_iter):
|
||||||
# get first memento as it'll be used for 'from' field
|
|
||||||
try:
|
|
||||||
first_cdx = six.next(cdx_iter)
|
|
||||||
from_date = timestamp_to_http_date(first_cdx['timestamp'])
|
|
||||||
except StopIteration:
|
|
||||||
first_cdx = None
|
|
||||||
return
|
|
||||||
|
|
||||||
# first memento link
|
|
||||||
yield MementoUtils.make_timemap_memento_link(first_cdx, datetime=from_date)
|
|
||||||
|
|
||||||
prev_cdx = None
|
prev_cdx = None
|
||||||
|
|
||||||
for cdx in cdx_iter:
|
for cdx in cdx_iter:
|
||||||
if prev_cdx:
|
if prev_cdx:
|
||||||
yield MementoUtils.make_timemap_memento_link(prev_cdx)
|
yield cls.make_timemap_memento_link(prev_cdx)
|
||||||
|
|
||||||
prev_cdx = cdx
|
prev_cdx = cdx
|
||||||
|
|
||||||
# last memento link, if any
|
# last memento link, if any
|
||||||
if prev_cdx:
|
if prev_cdx:
|
||||||
yield MementoUtils.make_timemap_memento_link(prev_cdx, end='\n')
|
yield cls.make_timemap_memento_link(prev_cdx, end='\n')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def wrap_timemap_header(cls, url, timegate_url, timemap_url, timemap):
|
||||||
|
string = cls.make_link(timemap_url, "self")
|
||||||
|
m = FIND_DT.search(timemap)
|
||||||
|
if m:
|
||||||
|
string += '; from="{0}"'.format(m.group(1))
|
||||||
|
|
||||||
|
string += ',\n'
|
||||||
|
|
||||||
|
string += cls.make_link(timegate_url, "timegate") + ',\n'
|
||||||
|
string += cls.make_link(url, "original") + ',\n'
|
||||||
|
string += timemap
|
||||||
|
return string
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def make_link(cls, url, type):
|
||||||
|
if type in ('timemap', 'self'):
|
||||||
|
return '<{0}>; rel="{1}"; type="application/link-format"'.format(url, type)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def make_link(url, type):
|
|
||||||
return '<{0}>; rel="{1}"'.format(url, type)
|
return '<{0}>; rel="{1}"'.format(url, type)
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def make_memento_link(url, type, dt):
|
def make_memento_link(cls, url, type, dt):
|
||||||
return '<{0}>; rel="{1}"; datetime="{2}"'.format(url, type, dt)
|
return '<{0}>; rel="{1}"; datetime="{2}"'.format(url, type, dt)
|
||||||
|
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ def to_text(cdx_iter, fields):
|
|||||||
return content_type, (cdx.to_text(fields) for cdx in cdx_iter)
|
return content_type, (cdx.to_text(fields) for cdx in cdx_iter)
|
||||||
|
|
||||||
def to_link(cdx_iter, fields):
|
def to_link(cdx_iter, fields):
|
||||||
content_type = 'application/link'
|
content_type = 'application/link-format'
|
||||||
return content_type, MementoUtils.make_timemap(cdx_iter)
|
return content_type, MementoUtils.make_timemap(cdx_iter)
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,11 +11,20 @@ class MementoMixin(object):
|
|||||||
return list(map(lambda x: x.strip(), re.split(', (?![0-9])', resp.headers[LINK])))
|
return list(map(lambda x: x.strip(), re.split(', (?![0-9])', resp.headers[LINK])))
|
||||||
|
|
||||||
def make_timemap_link(self, url, coll='pywb'):
|
def make_timemap_link(self, url, coll='pywb'):
|
||||||
format_ = '<http://localhost:80/{2}/timemap/*/{0}>; rel="timemap"; type="{1}"'
|
format_ = '<http://localhost:80/{2}/timemap/link/{0}>; rel="timemap"; type="{1}"'
|
||||||
return format_.format(url, LINK_FORMAT, coll)
|
return format_.format(url, LINK_FORMAT, coll)
|
||||||
|
|
||||||
def make_memento_link(self, url, ts, dt, coll='pywb'):
|
def make_original_link(self, url):
|
||||||
format_ = '<http://localhost:80/{3}/{1}/{0}>; rel="memento"; datetime="{2}"'
|
format_ = '<{0}>; rel="original"'
|
||||||
return format_.format(url, ts, dt, coll)
|
return format_.format(url)
|
||||||
|
|
||||||
|
def make_timegate_link(self, url, fmod='', coll='pywb'):
|
||||||
|
fmod_slash = fmod + '/' if fmod else ''
|
||||||
|
format_ = '<http://localhost:80/{2}/{1}{0}>; rel="timegate"'
|
||||||
|
return format_.format(url, fmod_slash, coll)
|
||||||
|
|
||||||
|
def make_memento_link(self, url, ts, dt, fmod='', coll='pywb'):
|
||||||
|
format_ = '<http://localhost:80/{4}/{1}{3}/{0}>; rel="memento"; datetime="{2}"'
|
||||||
|
return format_.format(url, ts, dt, fmod, coll)
|
||||||
|
|
||||||
|
|
||||||
|
@ -2,13 +2,45 @@ from .base_config_test import BaseConfigTest, fmod
|
|||||||
|
|
||||||
from .memento_fixture import *
|
from .memento_fixture import *
|
||||||
|
|
||||||
|
from warcio.timeutils import timestamp_to_http_date
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
class TestMemento(MementoMixin, BaseConfigTest):
|
class TestMemento(MementoMixin, BaseConfigTest):
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup_class(cls):
|
def setup_class(cls):
|
||||||
super(TestMemento, cls).setup_class('config_test.yaml')
|
super(TestMemento, cls).setup_class('config_test.yaml')
|
||||||
|
|
||||||
def _test_top_frame_replay(self):
|
def _assert_memento(self, resp, url, ts, fmod, dt=''):
|
||||||
|
dt = dt or timestamp_to_http_date(ts)
|
||||||
|
|
||||||
|
links = self.get_links(resp)
|
||||||
|
|
||||||
|
assert MEMENTO_DATETIME in resp.headers
|
||||||
|
assert resp.headers[MEMENTO_DATETIME] == dt
|
||||||
|
|
||||||
|
# memento link
|
||||||
|
memento_link = self.make_memento_link(url, ts, dt, fmod)
|
||||||
|
assert memento_link in links
|
||||||
|
|
||||||
|
# content location
|
||||||
|
assert '/pywb/{1}{0}/{2}'.format(fmod, ts, url) in resp.headers['Content-Location']
|
||||||
|
|
||||||
|
# content location part of memento link
|
||||||
|
assert resp.headers['Content-Location'] in memento_link
|
||||||
|
|
||||||
|
# timegate link
|
||||||
|
assert self.make_timegate_link(url, fmod) in links
|
||||||
|
|
||||||
|
# timemap link
|
||||||
|
assert self.make_timemap_link(url) in links
|
||||||
|
|
||||||
|
# original
|
||||||
|
assert self.make_original_link(url) in links
|
||||||
|
|
||||||
|
|
||||||
|
# Memento Pattern 2.2 (no redirect, 200 negotiation)
|
||||||
|
def test_memento_top_frame(self):
|
||||||
resp = self.testapp.get('/pywb/20140127171238/http://www.iana.org/')
|
resp = self.testapp.get('/pywb/20140127171238/http://www.iana.org/')
|
||||||
|
|
||||||
# Memento Headers
|
# Memento Headers
|
||||||
@ -18,37 +50,119 @@ class TestMemento(MementoMixin, BaseConfigTest):
|
|||||||
|
|
||||||
# memento link
|
# memento link
|
||||||
dt = 'Mon, 27 Jan 2014 17:12:38 GMT'
|
dt = 'Mon, 27 Jan 2014 17:12:38 GMT'
|
||||||
|
url = 'http://www.iana.org/'
|
||||||
|
|
||||||
links = self.get_links(resp)
|
links = self.get_links(resp)
|
||||||
assert self.make_memento_link('http://www.iana.org/', '20140127171238mp_', dt) in links
|
|
||||||
|
assert self.make_memento_link(url, '20140127171238', dt, 'mp_') in links
|
||||||
|
|
||||||
#timegate link
|
#timegate link
|
||||||
assert '<http://localhost:80/pywb/mp_/http://www.iana.org/>; rel="timegate"' in links
|
assert self.make_timegate_link(url, 'mp_') in links
|
||||||
|
|
||||||
# Body
|
# Body
|
||||||
assert '<iframe ' in resp.text
|
assert '<iframe ' in resp.text
|
||||||
assert '/pywb/20140127171238mp_/http://www.iana.org/' in resp.text, resp.text
|
assert '/pywb/20140127171238mp_/http://www.iana.org/' in resp.text, resp.text
|
||||||
|
|
||||||
def test_memento_content_replay(self, fmod):
|
def test_memento_content_replay_exact(self, fmod):
|
||||||
fmod_slash = fmod + '/' if fmod else ''
|
|
||||||
resp = self.get('/pywb/20140127171238{0}/http://www.iana.org/', fmod)
|
resp = self.get('/pywb/20140127171238{0}/http://www.iana.org/', fmod)
|
||||||
|
|
||||||
# Memento Headers
|
self._assert_memento(resp, 'http://www.iana.org/', '20140127171238', fmod)
|
||||||
# no vary header
|
|
||||||
assert VARY not in resp.headers
|
assert VARY not in resp.headers
|
||||||
assert MEMENTO_DATETIME in resp.headers
|
|
||||||
|
|
||||||
# memento link
|
|
||||||
dt = 'Mon, 27 Jan 2014 17:12:38 GMT'
|
|
||||||
|
|
||||||
links = self.get_links(resp)
|
|
||||||
assert self.make_memento_link('http://www.iana.org/', '20140127171238{0}'.format(fmod), dt) in links
|
|
||||||
|
|
||||||
# timegate link
|
|
||||||
assert '<http://localhost:80/pywb/{0}http://www.iana.org/>; rel="timegate"'.format(fmod_slash) in links
|
|
||||||
|
|
||||||
# Body
|
# Body
|
||||||
assert '"20140127171238"' in resp.text
|
assert '"20140127171238"' in resp.text
|
||||||
assert 'wb.js' in resp.text
|
assert 'wb.js' in resp.text
|
||||||
assert 'new _WBWombat' in resp.text, resp.text
|
assert 'new _WBWombat' in resp.text, resp.text
|
||||||
assert '/pywb/20140127171238{0}/http://www.iana.org/time-zones"'.format(fmod) in resp.text
|
assert '/pywb/20140127171238{0}/http://www.iana.org/time-zones"'.format(fmod) in resp.text
|
||||||
|
|
||||||
|
def test_memento_at_timegate_latest(self, fmod):
|
||||||
|
"""
|
||||||
|
TimeGate with no Accept-Datetime header
|
||||||
|
"""
|
||||||
|
|
||||||
|
fmod_slash = fmod + '/' if fmod else ''
|
||||||
|
resp = self.get('/pywb/{0}http://www.iana.org/_css/2013.1/screen.css', fmod_slash)
|
||||||
|
|
||||||
|
assert resp.headers[VARY] == 'accept-datetime'
|
||||||
|
|
||||||
|
self._assert_memento(resp, 'http://www.iana.org/_css/2013.1/screen.css', '20140127171239', fmod)
|
||||||
|
|
||||||
|
def test_memento_at_timegate(self, fmod):
|
||||||
|
"""
|
||||||
|
TimeGate with Accept-Datetime header, not matching a memento exactly, no redirect
|
||||||
|
"""
|
||||||
|
dt = 'Sun, 26 Jan 2014 20:08:04 GMT'
|
||||||
|
|
||||||
|
request_dt = 'Sun, 26 Jan 2014 20:08:00 GMT'
|
||||||
|
headers = {ACCEPT_DATETIME: request_dt}
|
||||||
|
|
||||||
|
fmod_slash = fmod + '/' if fmod else ''
|
||||||
|
resp = self.get('/pywb/{0}http://www.iana.org/_css/2013.1/screen.css', fmod_slash, headers=headers)
|
||||||
|
|
||||||
|
assert resp.headers[VARY] == 'accept-datetime'
|
||||||
|
|
||||||
|
self._assert_memento(resp, 'http://www.iana.org/_css/2013.1/screen.css', '20140126200804', fmod, dt)
|
||||||
|
|
||||||
|
def test_302_memento(self, fmod):
|
||||||
|
"""
|
||||||
|
Memento (capture) of a 302 response
|
||||||
|
"""
|
||||||
|
resp = self.get('/pywb/20140128051539{0}/http://www.iana.org/domains/example', fmod)
|
||||||
|
|
||||||
|
assert resp.status_int == 302
|
||||||
|
|
||||||
|
assert VARY not in resp.headers
|
||||||
|
|
||||||
|
self._assert_memento(resp, 'http://www.iana.org/domains/example', '20140128051539', fmod)
|
||||||
|
|
||||||
|
def test_timemap(self):
|
||||||
|
"""
|
||||||
|
Test application/link-format timemap
|
||||||
|
"""
|
||||||
|
|
||||||
|
resp = self.testapp.get('/pywb/timemap/link/http://example.com?example=1')
|
||||||
|
assert resp.status_int == 200
|
||||||
|
assert resp.content_type == LINK_FORMAT
|
||||||
|
|
||||||
|
resp.charset = 'utf-8'
|
||||||
|
|
||||||
|
exp = """\
|
||||||
|
<http://localhost:80/pywb/timemap/link/http://example.com?example=1>; rel="self"; type="application/link-format"; from="Fri, 03 Jan 2014 03:03:21 GMT",
|
||||||
|
<http://localhost:80/pywb/mp_/http://example.com?example=1>; rel="timegate",
|
||||||
|
<http://example.com?example=1>; rel="original",
|
||||||
|
<http://example.com?example=1>; rel="memento"; datetime="Fri, 03 Jan 2014 03:03:21 GMT"; src="pywb:example.cdx",
|
||||||
|
<http://example.com?example=1>; rel="memento"; datetime="Fri, 03 Jan 2014 03:03:41 GMT"; src="pywb:example.cdx"
|
||||||
|
"""
|
||||||
|
assert exp == resp.text
|
||||||
|
|
||||||
|
def test_timemap_2(self):
|
||||||
|
"""
|
||||||
|
Test application/link-format timemap total count
|
||||||
|
"""
|
||||||
|
|
||||||
|
resp = self.testapp.get('/pywb/timemap/link/http://example.com')
|
||||||
|
assert resp.status_int == 200
|
||||||
|
assert resp.content_type == LINK_FORMAT
|
||||||
|
|
||||||
|
lines = resp.text.split('\n')
|
||||||
|
|
||||||
|
assert len(lines) == 7
|
||||||
|
|
||||||
|
def test_timemap_error_not_found(self):
|
||||||
|
resp = self.testapp.get('/pywb/timemap/link/http://example.com/x-not-found', status=404)
|
||||||
|
assert resp.body == b''
|
||||||
|
|
||||||
|
def test_timemap_error_invalid_format(self):
|
||||||
|
resp = self.testapp.get('/pywb/timemap/foo/http://example.com', status=400)
|
||||||
|
assert resp.json == {'message': 'output=foo not supported'}
|
||||||
|
|
||||||
|
def test_error_bad_accept_datetime(self):
|
||||||
|
"""
|
||||||
|
400 response for bad accept_datetime
|
||||||
|
"""
|
||||||
|
headers = {ACCEPT_DATETIME: 'Sun'}
|
||||||
|
resp = self.testapp.get('/pywb/http://www.iana.org/_css/2013.1/screen.css', headers=headers, status=400)
|
||||||
|
assert resp.status_int == 400
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user