1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

rewrite system refactor:

- rewriter interface accepts RewriteInfo instance
- add StreamingRewriter adapter wraps html, regex rewriters to support rewriting streaming text from general rewriter interface
- add RewriteDASH, RewriteHLS as (non-streaming) rewriters. Need to read contents into buffer (for now)
- add RewriteAMF experimental AMF rewriter
- general rewriting system in BaseContentRewriter, default rewriters configured in DefaultRewriter
- tests: disable banner-only test as not currently support banner only (for now)
This commit is contained in:
Ilya Kreymer 2017-05-10 19:05:55 -07:00
parent db9d0ae41a
commit c1be7d4da5
10 changed files with 423 additions and 381 deletions

View File

@ -0,0 +1,324 @@
from io import BytesIO
from contextlib import closing
from warcio.bufferedreaders import BufferedReader
from warcio.utils import to_native_str
import re
import webencodings
from pywb.webagg.utils import StreamIter, BUFF_SIZE
from pywb.rewrite.cookie_rewriter import ExactPathCookieRewriter
from pywb.utils.loaders import load_yaml_config
# ============================================================================
class BaseContentRewriter(object):
CHARSET_REGEX = re.compile(b'<meta[^>]*?[\s;"\']charset\s*=[\s"\']*([^\s"\'/>]*)')
def __init__(self, rules_file, replay_mod=''):
self.rules = []
self.load_rules(rules_file)
self.replay_mod = replay_mod
#for rw in self.known_rewriters:
# self.all_rewriters[rw.name] = rw
def add_rewriter(self, rw):
self.all_rewriters[rw.name] = rw
def get_rewriter(self, url, text_type):
return self.all_rewriters.get(text_type)
def load_rules(self, filename):
config = load_yaml_config(filename)
for rule in config.get('rules'):
rule = self.parse_rewrite_rule(rule)
if rule:
self.rules.append(rule)
def parse_rewrite_rule(self, config):
rw_config = config.get('rewrite')
if not rw_config:
return
rule = rw_config
url_prefix = config.get('url_prefix')
if not isinstance(url_prefix, list):
url_prefix = [url_prefix]
rule['url_prefix'] = url_prefix
regexs = rule.get('js_regexs')
if regexs:
parse_rules_func = self.init_js_regex(regexs)
rule['js_regex_func'] = parse_rules_func
return rule
def get_rule(self, cdx):
urlkey = to_native_str(cdx['urlkey'])
for rule in self.rules:
if any((urlkey.startswith(prefix) for prefix in rule['url_prefix'])):
return rule
return {}
def get_rw_class(self, rule, text_type, rwinfo):
if text_type == 'js' and not rwinfo.is_url_rw():
text_type = 'js-proxy'
rw_type = rule.get(text_type, text_type)
rw_class = self.all_rewriters.get(rw_type)
return rw_type, rw_class
def create_rewriter(self, text_type, rule, rwinfo, cdx, head_insert_func=None):
rw_type, rw_class = self.get_rw_class(rule, text_type, rwinfo)
if rw_type in ('js', 'js_proxy'):
extra_rules = []
if 'js_regex_func' in rule:
extra_rules = rule['js_regex_func'](rwinfo.url_rewriter)
return rw_class(rwinfo.url_rewriter, extra_rules)
elif rw_type != 'html':
return rw_class(rwinfo.url_rewriter)
# HTML Rewriter
head_insert_str = self.get_head_insert(rwinfo, rule, head_insert_func, cdx)
js_rewriter = self.create_rewriter('js', rule, rwinfo, cdx)
css_rewriter = self.create_rewriter('css', rule, rwinfo, cdx)
rw = rw_class(rwinfo.url_rewriter,
js_rewriter=js_rewriter,
css_rewriter=css_rewriter,
head_insert=head_insert_str,
url=cdx['url'],
defmod=self.replay_mod,
parse_comments=rule.get('parse_comments', False))
return rw
def get_head_insert(self, rwinfo, rule, head_insert_func, cdx):
head_insert_str = ''
charset = rwinfo.charset
# if no charset set, attempt to extract from first 1024
if not charset:
first_buff = rwinfo.read_and_keep(1024)
charset = self.extract_html_charset(first_buff)
if head_insert_func:
head_insert_orig = head_insert_func(rule, cdx)
if charset:
try:
head_insert_str = webencodings.encode(head_insert_orig, charset)
except:
pass
if not head_insert_str:
charset = 'utf-8'
head_insert_str = head_insert_orig.encode(charset)
head_insert_str = head_insert_str.decode('iso-8859-1')
return head_insert_str
def extract_html_charset(self, buff):
charset = None
m = self.CHARSET_REGEX.search(buff)
if m:
charset = m.group(1)
charset = to_native_str(charset)
return charset
def rewrite_headers(self, rwinfo):
if rwinfo.is_url_rw():
header_rw_name = 'header'
else:
header_rw_name = 'header-proxy'
header_rw_class = self.all_rewriters.get(header_rw_name)
rwinfo.rw_http_headers = header_rw_class(rwinfo)()
def __call__(self, record, url_rewriter, cookie_rewriter,
head_insert_func=None,
cdx=None):
rwinfo = RewriteInfo(record, self.get_rewrite_types(), url_rewriter, cookie_rewriter)
self.rewrite_headers(rwinfo)
content_rewriter = None
if rwinfo.is_content_rw():
rule = self.get_rule(cdx)
content_rewriter = self.create_rewriter(rwinfo.text_type, rule, rwinfo, cdx, head_insert_func)
if content_rewriter:
gen = content_rewriter(rwinfo)
else:
gen = StreamIter(rwinfo.content_stream)
return rwinfo.rw_http_headers, gen, (content_rewriter != None)
def init_js_regexs(self, regexs):
raise NotImplemented()
def get_rewrite_types(self):
raise NotImplemented()
# ============================================================================
class StreamingRewriter(object):
def __init__(self):
self.align_to_line = True
def __call__(self, rwinfo):
gen = self.rewrite_text_stream_to_gen(rwinfo.content_stream,
rewrite_func=self.rewrite,
final_read_func=self.close,
align_to_line=self.align_to_line)
return gen
def rewrite(self, string):
return string
def close(self):
return ''
def rewrite_text_stream_to_gen(cls, stream,
rewrite_func,
final_read_func,
align_to_line):
"""
Convert stream to generator using applying rewriting func
to each portion of the stream.
Align to line boundaries if needed.
"""
try:
buff = ''
while True:
buff = stream.read(BUFF_SIZE)
if not buff:
break
if align_to_line:
buff += stream.readline()
buff = rewrite_func(buff.decode('iso-8859-1'))
yield buff.encode('iso-8859-1')
# For adding a tail/handling final buffer
buff = final_read_func()
if buff:
yield buff.encode('iso-8859-1')
finally:
stream.close()
# ============================================================================
class RewriteInfo(object):
TAG_REGEX = re.compile(b'^\s*\<')
def __init__(self, record, rewrite_types, url_rewriter, cookie_rewriter):
self.record = record
self.rw_http_headers = record.http_headers
self.content_stream = record.content_stream()
self.rewrite_types = rewrite_types
self.text_type = None
self.charset = None
self.url_rewriter = url_rewriter
if not cookie_rewriter:
cookie_rewriter = ExactPathCookieRewriter(url_rewriter)
self.cookie_rewriter = cookie_rewriter
self._fill_text_type_and_charset()
self._resolve_text_type()
def _fill_text_type_and_charset(self):
content_type = self.record.http_headers.get_header('Content-Type')
if not content_type:
return
parts = content_type.split(';', 1)
mime = parts[0]
self.text_type = self.rewrite_types.get(mime)
if not self.text_type:
return
if len(parts) == 2:
parts = parts[1].lower().split('charset=', 1)
if len(parts) == 2:
self.charset = parts[1].strip()
def _resolve_text_type(self):
mod = self.url_rewriter.wburl.mod
if self.text_type == 'css' and mod == 'js_':
self.text_type = 'css'
# only attempt to resolve between html and other text types
if self.text_type != 'html':
return
if mod != 'js_' and mod != 'cs_':
return
buff = self.read_and_keep(128)
# check if starts with a tag, then likely html
if self.TAG_REGEX.match(buff):
self.text_type = 'html'
def read_and_keep(self, size):
buff = self.content_stream.read(size)
self.content_stream = BufferedReader(self.content_stream, starting_data=buff)
return buff
def is_content_rw(self):
if not self.url_rewriter.prefix:
return False
if self.url_rewriter.wburl.mod == 'id_':
return False
if self.text_type == 'html':
if self.url_rewriter.rewrite_opts.get('is_ajax'):
return False
elif self.text_type == 'plain':
if self.url_rewriter.wburl.mod not in ('js_', 'cs_'):
return False
elif not self.text_type:
return False
return True
def is_url_rw(self):
if not self.url_rewriter:
return False
if self.url_rewriter.wburl.mod == 'id_':
return False
return True

View File

@ -11,6 +11,8 @@ from six.moves.urllib.parse import urljoin, urlsplit, urlunsplit
from pywb.rewrite.url_rewriter import UrlRewriter
from pywb.rewrite.regex_rewriters import JSRewriter, CSSRewriter
from pywb.rewrite.content_rewriter import StreamingRewriter
import six.moves.html_parser
six.moves.html_parser.unescape = lambda x: x
from six import text_type
@ -441,7 +443,7 @@ class HTMLRewriterMixin(object):
#=================================================================
class HTMLRewriter(HTMLRewriterMixin, HTMLParser):
class HTMLRewriter(HTMLRewriterMixin, StreamingRewriter, HTMLParser):
PARSETAG = re.compile('[<]')
def __init__(self, *args, **kwargs):
@ -451,6 +453,8 @@ class HTMLRewriter(HTMLRewriterMixin, HTMLParser):
HTMLParser.__init__(self)
super(HTMLRewriter, self).__init__(*args, **kwargs)
# for StreamingRewriter
self.align_to_line = False
def reset(self):
HTMLParser.reset(self)

View File

@ -1,10 +1,14 @@
import re
from pywb.rewrite.content_rewriter import StreamingRewriter
class JSONPRewriter(object):
# ============================================================================
class JSONPRewriter(StreamingRewriter):
JSONP = re.compile(r'^(\w+)\(\{')
CALLBACK = re.compile(r'[?].*callback=([^&]+)')
def __init__(self, urlrewriter):
super(JSONPRewriter, self).__init__()
self.urlrewriter = urlrewriter
def rewrite(self, string):

View File

@ -1,6 +1,7 @@
import re
from pywb.rewrite.url_rewriter import UrlRewriter
from pywb.rewrite.content_rewriter import StreamingRewriter
#=================================================================
@ -13,7 +14,7 @@ def load_function(string):
#=================================================================
class RegexRewriter(object):
class RegexRewriter(StreamingRewriter):
#@staticmethod
#def comment_out(string):
# return '/*' + string + '*/'
@ -43,6 +44,7 @@ class RegexRewriter(object):
#DEFAULT_OP = add_prefix
def __init__(self, rewriter, rules):
super(RegexRewriter, self).__init__()
#rules = self.create_rules(http_prefix)
# Build regexstr, concatenating regex list

View File

@ -3,16 +3,9 @@ from six.moves import zip
# ============================================================================
# Expiermental: not fully tested
class RewriteAMFMixin(object): #pragma: no cover
def handle_custom_rewrite(self, rewritten_headers, stream, urlrewriter, mod, env):
if rewritten_headers.status_headers.get_header('Content-Type') == 'application/x-amf':
stream = self.rewrite_amf(stream, env)
return (super(RewriteAMFMixin, self).
handle_custom_rewrite(rewritten_headers, stream, urlrewriter, mod, env))
def rewrite_amf(self, stream, env):
# Experimental: not fully tested
class RewriteAMF(object): #pragma: no cover
def __call__(self, rwinfo):
try:
from pyamf import remoting
@ -26,9 +19,10 @@ class RewriteAMFMixin(object): #pragma: no cover
iobuff.seek(0)
res = remoting.decode(iobuff)
if env and env.get('pywb.inputdata'):
inputdata = env.get('pywb.inputdata')
# TODO: revisit this
inputdata = rwinfo.url_rewriter.rewrite_opts.get('pywb.inputdata')
if inputdata:
new_list = []
for src, target in zip(inputdata.bodies, res.bodies):

View File

@ -1,39 +1,17 @@
import xml.etree.ElementTree as ET
from contextlib import closing
from io import BytesIO, StringIO
import json
from pywb.webagg.utils import StreamIter
import re
import xml.etree.ElementTree as ET
EXT_INF = re.compile('#EXT-X-STREAM-INF:(?:.*[,])?BANDWIDTH=([\d]+)')
from pywb.webagg.utils import StreamIter
# ============================================================================
class RewriteDASHMixin(object):
def handle_custom_rewrite(self, rewritten_headers, stream, urlrewriter, mod, env):
if rewritten_headers.status_headers.get_header('Content-Type') == 'application/dash+xml':
stream = self._decoding_stream(rewritten_headers, stream)
stream, _ = self.rewrite_dash(stream)
rewritten_headers.status_headers.remove_header('content-length')
return (rewritten_headers.status_headers, StreamIter(stream), True)
elif rewritten_headers.status_headers.get_header('Content-Type') == 'application/x-mpegURL':
stream = self._decoding_stream(rewritten_headers, stream)
stream = self.rewrite_m3u8(stream)
rewritten_headers.status_headers.remove_header('content-length')
return (rewritten_headers.status_headers, StreamIter(stream), True)
return (super(RewriteDASHMixin, self).
handle_custom_rewrite(rewritten_headers, stream, urlrewriter, mod, env))
@classmethod
def rewrite_dash(cls, stream):
ET.register_namespace('', 'urn:mpeg:dash:schema:mpd:2011')
namespaces = {'mpd': 'urn:mpeg:dash:schema:mpd:2011'}
class RewriteDASH(object):
def __call__(self, rwinfo):
buff_io = BytesIO()
with closing(stream) as fh:
with closing(rwinfo.content_stream) as fh:
while True:
buff = fh.read()
if not buff:
@ -42,8 +20,15 @@ class RewriteDASHMixin(object):
buff_io.write(buff)
buff_io.seek(0)
res_buff, best_ids = self.rewrite_dash(buff_io)
return StreamIter(res_buff)
def rewrite_dash(self, stream):
ET.register_namespace('', 'urn:mpeg:dash:schema:mpd:2011')
namespaces = {'mpd': 'urn:mpeg:dash:schema:mpd:2011'}
tree = ET.ElementTree()
tree.parse(buff_io)
tree.parse(stream)
root = tree.getroot()
@ -72,40 +57,8 @@ class RewriteDASHMixin(object):
buff_io.seek(0)
return buff_io, best_ids
@classmethod
def rewrite_m3u8(cls, stream):
buff = stream.read()
lines = buff.decode('utf-8').split('\n')
best = None
indexes = []
count = 0
best_index = None
for line in lines:
m = EXT_INF.match(line)
if m:
indexes.append(count)
bandwidth = int(m.group(1))
if not best or bandwidth > best:
best = bandwidth
best_index = count
count = count + 1
if indexes and best_index is not None:
indexes.remove(best_index)
for index in reversed(indexes):
del lines[index + 1]
del lines[index]
buff_io = BytesIO()
buff_io.write('\n'.join(lines).encode('utf-8'))
buff_io.seek(0)
return buff_io
# ============================================================================
def rewrite_fb_dash(string):
DASH_SPLIT = r'\n",dash_prefetched_representation_ids:'
inx = string.find(DASH_SPLIT)
@ -117,7 +70,7 @@ def rewrite_fb_dash(string):
buff = string.encode('utf-8').decode('unicode-escape')
buff = buff.encode('utf-8')
io = BytesIO(buff)
io, best_ids = RewriteDASHMixin.rewrite_dash(io)
io, best_ids = RewriteDASHMixin().rewrite_dash(io)
string = json.dumps(io.read().decode('utf-8'))
string = string[1:-1].replace('<', r'\x3C')

View File

@ -0,0 +1,44 @@
import re
from io import BytesIO
from pywb.webagg.utils import StreamIter
# ============================================================================
class RewriteHLS(object):
EXT_INF = re.compile('#EXT-X-STREAM-INF:(?:.*[,])?BANDWIDTH=([\d]+)')
def __call__(self, rwinfo):
return StreamIter(self.rewrite_m3u8(rwinfo.content_stream))
def rewrite_m3u8(self, stream):
buff = stream.read()
lines = buff.decode('utf-8').split('\n')
best = None
indexes = []
count = 0
best_index = None
for line in lines:
m = self.EXT_INF.match(line)
if m:
indexes.append(count)
bandwidth = int(m.group(1))
if not best or bandwidth > best:
best = bandwidth
best_index = count
count = count + 1
if indexes and best_index is not None:
indexes.remove(best_index)
for index in reversed(indexes):
del lines[index + 1]
del lines[index]
buff_io = BytesIO()
buff_io.write('\n'.join(lines).encode('utf-8'))
buff_io.seek(0)
return buff_io

View File

@ -1,10 +1,4 @@
from warcio.utils import to_native_str
from warcio.bufferedreaders import BufferedReader
import webencodings
import re
from pywb.utils.loaders import load_yaml_config
from pywb.rewrite.content_rewriter import BaseContentRewriter
from pywb.rewrite.html_rewriter import HTMLRewriter
@ -12,19 +6,17 @@ from pywb.rewrite.regex_rewriters import RegexRewriter, CSSRewriter, XMLRewriter
from pywb.rewrite.regex_rewriters import JSLinkAndLocationRewriter, JSLinkOnlyRewriter
from pywb.rewrite.regex_rewriters import JSLocationOnlyRewriter, JSNoneRewriter
from pywb.rewrite.cookie_rewriter import ExactPathCookieRewriter
from pywb.urlrewrite.header_rewriter import PrefixHeaderRewriter, ProxyHeaderRewriter
from pywb.rewrite.jsonp_rewriter import JSONPRewriter
from pywb.webagg.utils import StreamIter, BUFF_SIZE
from pywb.rewrite.rewrite_dash import RewriteDASH
from pywb.rewrite.rewrite_hls import RewriteHLS
from pywb.rewrite.rewrite_amf import RewriteAMF
# ============================================================================
class Rewriter(object):
CHARSET_REGEX = re.compile(b'<meta[^>]*?[\s;"\']charset\s*=[\s"\']*([^\s"\'/>]*)')
class DefaultRewriter(BaseContentRewriter):
all_rewriters = {
'header': PrefixHeaderRewriter,
'header-proxy': ProxyHeaderRewriter,
@ -39,6 +31,12 @@ class Rewriter(object):
'json': JSONPRewriter,
'xml': XMLRewriter,
'dash': RewriteDASH,
'hls': RewriteHLS,
'amf': RewriteAMF,
}
rewrite_types = {
@ -64,6 +62,9 @@ class Rewriter(object):
# DASH
'application/dash+xml': 'dash',
# AMF
'application/x-amf': 'amf',
# XML
'text/xml': 'xml',
'application/xml': 'xml',
@ -73,291 +74,8 @@ class Rewriter(object):
'text/plain': 'plain',
}
def __init__(self, rules_file, replay_mod=''):
self.rules = []
self.load_rules(rules_file)
self.replay_mod = replay_mod
#for rw in self.known_rewriters:
# self.all_rewriters[rw.name] = rw
def add_rewriter(self, rw):
self.all_rewriters[rw.name] = rw
def get_rewriter(self, url, text_type):
return self.all_rewriters.get(text_type)
def load_rules(self, filename):
config = load_yaml_config(filename)
for rule in config.get('rules'):
rule = self.parse_rewrite_rule(rule)
if rule:
self.rules.append(rule)
def parse_rewrite_rule(self, config):
rw_config = config.get('rewrite')
if not rw_config:
return
rule = rw_config
url_prefix = config.get('url_prefix')
if not isinstance(url_prefix, list):
url_prefix = [url_prefix]
rule['url_prefix'] = url_prefix
regexs = rule.get('js_regexs')
if regexs:
parse_rules_func = RegexRewriter.parse_rules_from_config(regexs)
rule['js_regex_func'] = parse_rules_func
return rule
def get_rule(self, cdx):
urlkey = to_native_str(cdx['urlkey'])
for rule in self.rules:
if any((urlkey.startswith(prefix) for prefix in rule['url_prefix'])):
return rule
return {}
def get_rw_class(self, rule, text_type, rwinfo):
if text_type == 'js' and not rwinfo.is_url_rw():
text_type = 'js-proxy'
rw_type = rule.get(text_type, text_type)
rw_class = self.all_rewriters.get(rw_type)
return rw_type, rw_class
def create_rewriter(self, text_type, rule, rwinfo, cdx, head_insert_func=None):
rw_type, rw_class = self.get_rw_class(rule, text_type, rwinfo)
if rw_type in ('js', 'js_proxy'):
extra_rules = []
if 'js_regex_func' in rule:
extra_rules = rule['js_regex_func'](rwinfo.url_rewriter)
return rw_class(rwinfo.url_rewriter, extra_rules)
elif rw_type != 'html':
return rw_class(rwinfo.url_rewriter)
# HTML Rewriter
head_insert_str = self.get_head_insert(rwinfo, rule, head_insert_func, cdx)
js_rewriter = self.create_rewriter('js', rule, rwinfo, cdx)
css_rewriter = self.create_rewriter('css', rule, rwinfo, cdx)
rw = rw_class(rwinfo.url_rewriter,
js_rewriter=js_rewriter,
css_rewriter=css_rewriter,
head_insert=head_insert_str,
url=cdx['url'],
defmod=self.replay_mod,
parse_comments=rule.get('parse_comments', False))
return rw
def get_head_insert(self, rwinfo, rule, head_insert_func, cdx):
head_insert_str = ''
charset = rwinfo.charset
# if no charset set, attempt to extract from first 1024
if not charset:
first_buff = rwinfo.read_and_keep(1024)
charset = self.extract_html_charset(first_buff)
if head_insert_func:
head_insert_orig = head_insert_func(rule, cdx)
if charset:
try:
head_insert_str = webencodings.encode(head_insert_orig, charset)
except:
pass
if not head_insert_str:
charset = 'utf-8'
head_insert_str = head_insert_orig.encode(charset)
head_insert_str = head_insert_str.decode('iso-8859-1')
return head_insert_str
def extract_html_charset(self, buff):
charset = None
m = self.CHARSET_REGEX.search(buff)
if m:
charset = m.group(1)
charset = to_native_str(charset)
return charset
def rewrite_headers(self, rwinfo):
if rwinfo.is_url_rw():
header_rw_name = 'header'
else:
header_rw_name = 'header-proxy'
header_rw_class = self.all_rewriters.get(header_rw_name)
rwinfo.rw_http_headers = header_rw_class(rwinfo)()
def __call__(self, record, url_rewriter, cookie_rewriter,
head_insert_func=None,
cdx=None):
rwinfo = RewriteInfo(record, self, url_rewriter, cookie_rewriter)
self.rewrite_headers(rwinfo)
content_rewriter = None
if rwinfo.is_content_rw():
rule = self.get_rule(cdx)
content_rewriter = self.create_rewriter(rwinfo.text_type, rule, rwinfo, cdx, head_insert_func)
if not content_rewriter:
return rwinfo.rw_http_headers, StreamIter(rwinfo.content_stream), False
#rwinfo.rw_http_headers.status_headers.remove_header('content-length')
# align to line end for all non-html rewriting
align = (rwinfo.text_type != 'html')
# Create rewriting generator
gen = self.rewrite_text_stream_to_gen(rwinfo.content_stream,
rewrite_func=content_rewriter.rewrite,
final_read_func=content_rewriter.close,
align_to_line=align)
return rwinfo.rw_http_headers, gen, True
@staticmethod
def rewrite_text_stream_to_gen(stream,
rewrite_func,
final_read_func,
align_to_line):
"""
Convert stream to generator using applying rewriting func
to each portion of the stream.
Align to line boundaries if needed.
"""
try:
buff = ''
while True:
buff = stream.read(BUFF_SIZE)
if not buff:
break
if align_to_line:
buff += stream.readline()
buff = rewrite_func(buff.decode('iso-8859-1'))
yield buff.encode('iso-8859-1')
# For adding a tail/handling final buffer
buff = final_read_func()
if buff:
yield buff.encode('iso-8859-1')
finally:
stream.close()
# ============================================================================
class RewriteInfo(object):
TAG_REGEX = re.compile(b'^\s*\<')
def __init__(self, record, rewriter, url_rewriter, cookie_rewriter):
self.record = record
self.rw_http_headers = record.http_headers
self.content_stream = record.content_stream()
self.rewriter = rewriter
self.text_type = None
self.charset = None
self.url_rewriter = url_rewriter
if not cookie_rewriter:
cookie_rewriter = ExactPathCookieRewriter(url_rewriter)
self.cookie_rewriter = cookie_rewriter
self._fill_text_type_and_charset()
self._resolve_text_type()
def _fill_text_type_and_charset(self):
content_type = self.record.http_headers.get_header('Content-Type')
if not content_type:
return
parts = content_type.split(';', 1)
mime = parts[0]
self.text_type = self.rewriter.rewrite_types.get(mime)
if not self.text_type:
return
if len(parts) == 2:
parts = parts[1].lower().split('charset=', 1)
if len(parts) == 2:
self.charset = parts[1].strip()
def _resolve_text_type(self):
mod = self.url_rewriter.wburl.mod
if self.text_type == 'css' and mod == 'js_':
self.text_type = 'css'
# only attempt to resolve between html and other text types
if self.text_type != 'html':
return
if mod != 'js_' and mod != 'cs_':
return
buff = self.read_and_keep(128)
# check if starts with a tag, then likely html
if self.TAG_REGEX.match(buff):
self.text_type = 'html'
def read_and_keep(self, size):
buff = self.content_stream.read(size)
self.content_stream = BufferedReader(self.content_stream, starting_data=buff)
return buff
def is_content_rw(self):
if not self.url_rewriter.prefix:
return False
if self.url_rewriter.wburl.mod == 'id_':
return False
if self.text_type == 'html':
if self.url_rewriter.rewrite_opts.get('is_ajax'):
return False
elif self.text_type == 'plain':
if self.url_rewriter.wburl.mod not in ('js_', 'cs_'):
return False
elif not self.text_type:
return False
return True
def is_url_rw(self):
if not self.url_rewriter:
return False
if self.url_rewriter.wburl.mod == 'id_':
return False
return True
def init_js_regex(self, regexs):
return RegexRewriter.parse_rules_from_config(regexs)
def get_rewrite_types(self):
return self.rewrite_types

View File

@ -3,7 +3,7 @@ import requests
#from pywb.rewrite.rewrite_amf import RewriteAMFMixin
#from pywb.rewrite.rewrite_dash import RewriteDASHMixin
#from pywb.rewrite.rewrite_content import RewriteContent
from pywb.urlrewrite.rewriter import Rewriter
from pywb.urlrewrite.rewriter import DefaultRewriter
from pywb.rewrite.wburl import WbUrl
from pywb.rewrite.url_rewriter import UrlRewriter, SchemeOnlyUrlRewriter
@ -71,7 +71,7 @@ class RewriterApp(object):
#frame_type = 'inverse' if framed_replay else False
#self.content_rewriter = Rewriter(is_framed_replay=frame_type)
self.content_rw = Rewriter('pkg://pywb/rules.yaml', self.replay_mod)
self.content_rw = DefaultRewriter('pkg://pywb/rules.yaml', self.replay_mod)
if not jinja_env:
jinja_env = JinjaEnv(globals={'static_path': 'static'})

View File

@ -152,8 +152,7 @@ class TestWbIntegration(BaseConfigTest):
assert len(lines) == 17
assert lines[0].startswith('org,iana)/_css/2013.1/print.css 20140127171239')
def test_replay_banner_only(self):
def _test_replay_banner_only(self):
resp = self.testapp.get('/pywb/20140126201054bn_/http://www.iana.org/domains/reserved')
# wb.js header insertion