1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-31 03:04:12 +02:00
pywb/pywb/webapp/handlers.py
Ilya Kreymer 695245d9e8 wburl idn: more complete support for idn urls (#66)
add distinct to_iri() and to_uri() functions in WbUrl
internal representation is always as ascii uri
for rewriting, defaults to iri representation unless
'rewrite_ascii_only_urls' is set to true per collection
add wbrequest.get_url() to get url as either iri or uri to be passed
to templates
2015-01-26 11:07:59 -08:00

234 lines
8.3 KiB
Python

import pkgutil
import mimetypes
import time
from datetime import datetime
from pywb.utils.wbexception import NotFoundException
from pywb.utils.loaders import BlockLoader
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.framework.basehandlers import BaseHandler, WbUrlHandler
from pywb.framework.wbrequestresponse import WbResponse
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.warc.resolvingloader import ResolvingLoader
from views import J2TemplateView
from replay_views import ReplayView
from pywb.framework.memento import MementoResponse
from pywb.utils.timeutils import datetime_to_timestamp
#=================================================================
class SearchPageWbUrlHandler(WbUrlHandler):
"""
Loads a default search page html template to be shown when
the wb_url is empty
"""
def __init__(self, config):
self.search_view = (J2TemplateView.
create_template(config.get('search_html'),
'Search Page'))
self.is_frame_mode = config.get('framed_replay', False)
self.response_class = WbResponse
if self.is_frame_mode:
html = config.get('frame_insert_html', 'ui/frame_insert.html')
self.frame_insert_view = (J2TemplateView.
create_template(html, 'Frame Insert'))
self.banner_html = config.get('banner_html', 'banner.html')
if config.get('enable_memento', False):
self.response_class = MementoResponse
else:
self.frame_insert_view = None
self.banner_html = None
def render_search_page(self, wbrequest, **kwargs):
return self.search_view.render_response(wbrequest=wbrequest,
prefix=wbrequest.wb_prefix,
**kwargs)
def __call__(self, wbrequest):
# root search page
if wbrequest.wb_url_str == '/':
return self.render_search_page(wbrequest)
# render top level frame if in frame mode
# (not supported in proxy mode)
if (self.is_frame_mode and wbrequest.wb_url and
not wbrequest.wb_url.is_query() and
not wbrequest.options['is_proxy']):
if wbrequest.wb_url.is_top_frame:
return self.get_top_frame_response(wbrequest)
else:
wbrequest.final_mod = 'tf_'
return self.handle_request(wbrequest)
def get_top_frame_params(self, wbrequest, mod=''):
embed_url = wbrequest.wb_url.to_str(mod=mod, url='')
embed_url += wbrequest.get_url()
if wbrequest.wb_url.timestamp:
timestamp = wbrequest.wb_url.timestamp
else:
timestamp = datetime_to_timestamp(datetime.utcnow())
params = dict(embed_url=embed_url,
wbrequest=wbrequest,
timestamp=timestamp,
url=wbrequest.get_url(),
banner_html=self.banner_html)
return params
def get_top_frame_response(self, wbrequest):
params = self.get_top_frame_params(wbrequest)
headers = [('Content-Type', 'text/html; charset=utf-8')]
status_headers = StatusAndHeaders('200 OK', headers)
template_result = self.frame_insert_view.render_to_string(**params)
body = template_result.encode('utf-8')
return self.response_class(status_headers, [body], wbrequest=wbrequest)
#=================================================================
# Standard WB Handler
#=================================================================
class WBHandler(SearchPageWbUrlHandler):
def __init__(self, query_handler, config=None):
super(WBHandler, self).__init__(config)
self.index_reader = query_handler
self.not_found_view = (J2TemplateView.
create_template(config.get('not_found_html'),
'Not Found Error'))
cookie_maker = config.get('cookie_maker')
record_loader = ArcWarcRecordLoader(cookie_maker=cookie_maker)
paths = config.get('archive_paths')
resolving_loader = ResolvingLoader(paths=paths,
record_loader=record_loader)
self.replay = ReplayView(resolving_loader, config)
self.fallback_handler = None
self.fallback_name = config.get('fallback')
def resolve_refs(self, handler_dict):
if self.fallback_name:
self.fallback_handler = handler_dict.get(self.fallback_name)
def handle_request(self, wbrequest):
try:
cdx_lines, output = self.index_reader.load_for_request(wbrequest)
except NotFoundException as nfe:
return self.handle_not_found(wbrequest, nfe)
if output != 'text' and wbrequest.wb_url.is_replay():
return self.handle_replay(wbrequest, cdx_lines)
else:
return self.handle_query(wbrequest, cdx_lines, output)
def handle_query(self, wbrequest, cdx_lines, output):
return self.index_reader.make_cdx_response(wbrequest,
cdx_lines,
output)
def handle_replay(self, wbrequest, cdx_lines):
cdx_callback = self.index_reader.cdx_load_callback(wbrequest)
return self.replay.render_content(wbrequest,
cdx_lines,
cdx_callback)
def handle_not_found(self, wbrequest, nfe):
# check fallback: only for replay queries and not for identity
if (self.fallback_handler and
not wbrequest.wb_url.is_query() and
not wbrequest.wb_url.is_identity):
return self.fallback_handler(wbrequest)
# if capture query, just return capture page
if wbrequest.wb_url.is_query():
return self.index_reader.make_cdx_response(wbrequest, [], 'html')
else:
return self.not_found_view.render_response(status='404 Not Found',
env=wbrequest.env,
url=wbrequest.wb_url.url)
def __str__(self):
return 'Web Archive Replay Handler'
#=================================================================
# Static Content Handler
#=================================================================
class StaticHandler(BaseHandler):
def __init__(self, static_path):
mimetypes.init()
self.static_path = static_path
self.block_loader = BlockLoader()
def __call__(self, wbrequest):
url = wbrequest.wb_url_str.split('?')[0]
full_path = self.static_path + url
try:
data = self.block_loader.load(full_path)
try:
data.seek(0, 2)
size = data.tell()
data.seek(0)
headers = [('Content-Length', str(size))]
except IOError:
headers = None
if 'wsgi.file_wrapper' in wbrequest.env:
reader = wbrequest.env['wsgi.file_wrapper'](data)
else:
reader = iter(lambda: data.read(), '')
content_type = 'application/octet-stream'
guessed = mimetypes.guess_type(full_path)
if guessed[0]:
content_type = guessed[0]
return WbResponse.text_stream(reader,
content_type=content_type,
headers=headers)
except IOError:
raise NotFoundException('Static File Not Found: ' +
wbrequest.wb_url_str)
def __str__(self): # pragma: no cover
return 'Static files from ' + self.static_path
#=================================================================
# Debug Handlers
#=================================================================
class DebugEchoEnvHandler(BaseHandler): # pragma: no cover
def __call__(self, wbrequest):
return WbResponse.text_response(str(wbrequest.env))
#=================================================================
class DebugEchoHandler(BaseHandler): # pragma: no cover
def __call__(self, wbrequest):
return WbResponse.text_response(str(wbrequest))