1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

make pywb.rewrite package pep8-compatible

move doctests to test subdir
This commit is contained in:
Ilya Kreymer 2014-03-14 16:34:51 -07:00
parent bfffac45b0
commit a69d565af5
9 changed files with 302 additions and 225 deletions

View File

@ -1,8 +1,11 @@
from pywb.utils.statusandheaders import StatusAndHeaders
#=================================================================
class RewrittenStatusAndHeaders:
def __init__(self, statusline, headers, removed_header_dict, text_type, charset):
def __init__(self, statusline, headers,
removed_header_dict, text_type, charset):
self.status_headers = StatusAndHeaders(statusline, headers)
self.removed_header_dict = removed_header_dict
self.text_type = text_type
@ -16,12 +19,16 @@ class RewrittenStatusAndHeaders:
class HeaderRewriter:
REWRITE_TYPES = {
'html': ['text/html', 'application/xhtml'],
'css': ['text/css'],
'js': ['text/javascript', 'application/javascript', 'application/x-javascript'],
'js': ['text/javascript',
'application/javascript',
'application/x-javascript'],
'xml': ['/xml', '+xml', '.xml', '.rss'],
}
PROXY_HEADERS = ['content-type', 'content-disposition']
URL_REWRITE_HEADERS = ['location', 'content-location', 'content-base']
@ -32,7 +39,7 @@ class HeaderRewriter:
PROXY_NO_REWRITE_HEADERS = ['content-length']
def __init__(self, header_prefix = 'X-Archive-Orig-'):
def __init__(self, header_prefix='X-Archive-Orig-'):
self.header_prefix = header_prefix
def rewrite(self, status_headers, urlrewriter):
@ -47,14 +54,22 @@ class HeaderRewriter:
charset = self._extract_char_set(content_type)
strip_encoding = True
(new_headers, removed_header_dict) = self._rewrite_headers(status_headers.headers, urlrewriter, strip_encoding)
result = self._rewrite_headers(status_headers.headers,
urlrewriter,
strip_encoding)
return RewrittenStatusAndHeaders(status_headers.statusline, new_headers, removed_header_dict, text_type, charset)
new_headers = result[0]
removed_header_dict = result[1]
return RewrittenStatusAndHeaders(status_headers.statusline,
new_headers,
removed_header_dict,
text_type,
charset)
def _extract_text_type(self, content_type):
for ctype, mimelist in self.REWRITE_TYPES.iteritems():
if any ((mime in content_type) for mime in mimelist):
if any((mime in content_type) for mime in mimelist):
return ctype
return None
@ -67,27 +82,34 @@ class HeaderRewriter:
return content_type[idx + len(CHARSET_TOKEN):].lower()
def _rewrite_headers(self, headers, urlrewriter, content_rewritten = False):
def _rewrite_headers(self, headers, urlrewriter, content_rewritten=False):
new_headers = []
removed_header_dict = {}
for (name, value) in headers:
lowername = name.lower()
if lowername in self.PROXY_HEADERS:
new_headers.append((name, value))
elif lowername in self.URL_REWRITE_HEADERS:
new_headers.append((name, urlrewriter.rewrite(value)))
elif lowername in self.ENCODING_HEADERS:
if content_rewritten:
removed_header_dict[lowername] = value
else:
new_headers.append((name, value))
elif lowername in self.REMOVE_HEADERS:
removed_header_dict[lowername] = value
elif lowername in self.PROXY_NO_REWRITE_HEADERS and not content_rewritten:
elif (lowername in self.PROXY_NO_REWRITE_HEADERS and
not content_rewritten):
new_headers.append((name, value))
else:
new_headers.append((self.header_prefix + name, value))
return (new_headers, removed_header_dict)

View File

@ -9,12 +9,12 @@ from HTMLParser import HTMLParser, HTMLParseError
from url_rewriter import UrlRewriter
from regex_rewriters import JSRewriter, CSSRewriter
#=================================================================
# HTMLRewriter -- html parser for custom rewriting, also handlers for script and css
#=================================================================
class HTMLRewriter(HTMLParser):
"""
HTML-Parsing Rewriter
HTML-Parsing Rewriter for custom rewriting, also delegates
to rewriters for script and css
"""
REWRITE_TAGS = {
@ -27,7 +27,7 @@ class HTMLRewriter(HTMLParser):
'body': {'background': 'im_'},
'del': {'cite': ''},
'embed': {'src': 'oe_'},
'head': {'': ''}, # for head rewriting
'head': {'': ''}, # for head rewriting
'iframe': {'src': 'if_'},
'img': {'src': 'im_'},
'ins': {'cite': ''},
@ -41,16 +41,19 @@ class HTMLRewriter(HTMLParser):
'q': {'cite': ''},
'ref': {'href': 'oe_'},
'script': {'src': 'js_'},
'div': {'data-src' : '',
'data-uri' : ''},
'li': {'data-src' : '',
'data-uri' : ''},
'div': {'data-src': '',
'data-uri': ''},
'li': {'data-src': '',
'data-uri': ''},
}
STATE_TAGS = ['script', 'style']
HEAD_TAGS = ['html', 'head', 'base', 'link', 'meta', 'title', 'style', 'script', 'object', 'bgsound']
# tags allowed in the <head> of an html document
HEAD_TAGS = ['html', 'head', 'base', 'link', 'meta',
'title', 'style', 'script', 'object', 'bgsound']
# ===========================
class AccumBuff:
def __init__(self):
self.buff = ''
@ -58,22 +61,27 @@ class HTMLRewriter(HTMLParser):
def write(self, string):
self.buff += string
# ===========================
def __init__(self, url_rewriter,
head_insert=None,
js_rewriter_class=JSRewriter,
css_rewriter_class=CSSRewriter):
def __init__(self, url_rewriter, outstream = None, head_insert = None, js_rewriter_class = JSRewriter, css_rewriter_class = CSSRewriter):
HTMLParser.__init__(self)
self.url_rewriter = url_rewriter
self._wb_parse_context = None
self.out = outstream if outstream else self.AccumBuff()
#self.out = outstream if outstream else self.AccumBuff()
self.out = self.AccumBuff()
self.js_rewriter = js_rewriter_class(url_rewriter)
self.css_rewriter = css_rewriter_class(url_rewriter)
self.head_insert = head_insert
# ===========================
META_REFRESH_REGEX = re.compile('^[\\d.]+\\s*;\\s*url\\s*=\\s*(.+?)\\s*$', re.IGNORECASE | re.MULTILINE)
META_REFRESH_REGEX = re.compile('^[\\d.]+\\s*;\\s*url\\s*=\\s*(.+?)\\s*$',
re.IGNORECASE | re.MULTILINE)
def _rewrite_meta_refresh(self, meta_refresh):
if not meta_refresh:
@ -84,22 +92,32 @@ class HTMLRewriter(HTMLParser):
return meta_refresh
try:
meta_refresh = meta_refresh[:m.start(1)] + self._rewrite_url(m.group(1)) + meta_refresh[m.end(1):]
meta_refresh = (meta_refresh[:m.start(1)] +
self._rewrite_url(m.group(1)) +
meta_refresh[m.end(1):])
except Exception:
pass
return meta_refresh
# ===========================
def _rewrite_url(self, value, mod = None):
return self.url_rewriter.rewrite(value, mod) if value else None
def _rewrite_url(self, value, mod=None):
if value:
return self.url_rewriter.rewrite(value, mod)
else:
return None
def _rewrite_css(self, css_content):
return self.css_rewriter.rewrite(css_content) if css_content else None
if css_content:
return self.css_rewriter.rewrite(css_content)
else:
return None
def _rewrite_script(self, script_content):
return self.js_rewriter.rewrite(script_content) if script_content else None
if script_content:
return self.js_rewriter.rewrite(script_content)
else:
return None
def has_attr(self, tag_attrs, attr):
name, value = attr
@ -110,11 +128,13 @@ class HTMLRewriter(HTMLParser):
def rewrite_tag_attrs(self, tag, tag_attrs, is_start_end):
# special case: script or style parse context
if (tag in self.STATE_TAGS) and (self._wb_parse_context == None):
if ((tag in self.STATE_TAGS) and not self._wb_parse_context):
self._wb_parse_context = tag
# special case: head insertion, non-head tags
elif (self.head_insert and (self._wb_parse_context == None) and (tag not in self.HEAD_TAGS)):
elif (self.head_insert and
not self._wb_parse_context
and (tag not in self.HEAD_TAGS)):
self.out.write(self.head_insert)
self.head_insert = None
@ -132,7 +152,8 @@ class HTMLRewriter(HTMLParser):
attr_name, attr_value = attr
# special case: inline JS/event handler
if (attr_value and attr_value.startswith('javascript:')) or attr_name.startswith('on'):
if ((attr_value and attr_value.startswith('javascript:'))
or attr_name.startswith('on')):
attr_value = self._rewrite_script(attr_value)
# special case: inline CSS/style attribute
@ -163,13 +184,14 @@ class HTMLRewriter(HTMLParser):
self.out.write('/>' if is_start_end else '>')
# special case: head tag
if (self.head_insert) and (self._wb_parse_context == None) and (tag == 'head'):
if (self.head_insert and
not self._wb_parse_context and
(tag == 'head')):
self.out.write(self.head_insert)
self.head_insert = None
return True
def parse_data(self, data):
if self._wb_parse_context == 'script':
data = self._rewrite_script(data)

View File

@ -8,7 +8,8 @@ from rewriterules import RewriteRules
from pywb.utils.dsrules import RuleSet
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.utils.bufferedreaders import DecompressingBufferedReader, ChunkedDataReader
from pywb.utils.bufferedreaders import DecompressingBufferedReader
from pywb.utils.bufferedreaders import ChunkedDataReader
#=================================================================
@ -19,29 +20,39 @@ class RewriteContent:
ds_rules_file=ds_rules_file)
def rewrite_headers(self, urlrewriter, status_headers, stream, urlkey=''):
header_rewriter_class = self.ruleset.get_first_match(urlkey).rewriters['header']
rewritten_headers = header_rewriter_class().rewrite(status_headers, urlrewriter)
header_rewriter_class = (self.ruleset.get_first_match(urlkey).
rewriters['header'])
# note: since chunking may be broken, approach taken here is to *always* attempt
# to dechunk if transfer-encoding: chunked is present
rewritten_headers = (header_rewriter_class().
rewrite(status_headers, urlrewriter))
# note: since chunk encoding may/may not be valid,
# the approach taken here is to *always* attempt
# to dechunk if 'transfer-encoding: chunked' is present
#
# an alternative may be to serve chunked unless content rewriting is needed
# an alternative may be to serve chunked unless
# content rewriting is needed
# todo: possible revisit this approach
if (rewritten_headers.contains_removed_header('transfer-encoding', 'chunked')):
if (rewritten_headers.
contains_removed_header('transfer-encoding', 'chunked')):
stream = ChunkedDataReader(stream)
return (rewritten_headers, stream)
def rewrite_content(self, urlrewriter, headers, stream, head_insert_func=None, urlkey=''):
def rewrite_content(self, urlrewriter, headers, stream,
head_insert_func=None, urlkey=''):
# see if we've already rewritten headers
if isinstance(headers, RewrittenStatusAndHeaders):
rewritten_headers = headers
elif isinstance(headers, StatusAndHeaders):
# otherwise, need to determine if rewriting is even necessary
(rewritten_headers, stream) = self.rewrite_headers(urlrewriter, headers, stream)
(rewritten_headers, stream) = self.rewrite_headers(urlrewriter,
headers,
stream)
# no rewriting needed here
if rewritten_headers.text_type is None:
gen = self.stream_to_gen(stream)
@ -50,10 +61,11 @@ class RewriteContent:
status_headers = rewritten_headers.status_headers
# Handle text content rewriting
# =========================================================================
# ====================================================================
# special case -- need to ungzip the body
if (rewritten_headers.contains_removed_header('content-encoding', 'gzip')):
if (rewritten_headers.
contains_removed_header('content-encoding', 'gzip')):
stream = DecompressingBufferedReader(stream, decomp_type='gzip')
if rewritten_headers.charset:
@ -85,7 +97,6 @@ class RewriteContent:
head_insert_str = head_insert_func(rule)
rewriter = rewriter_class(urlrewriter,
outstream=None,
js_rewriter_class=rule.rewriters['js'],
css_rewriter_class=rule.rewriters['css'],
head_insert=head_insert_str)
@ -93,12 +104,13 @@ class RewriteContent:
rewriter = rewriter_class(urlrewriter)
# Create rewriting generator
gen = self._rewriting_stream_gen(rewriter, encoding, stream, first_buff)
gen = self._rewriting_stream_gen(rewriter, encoding,
stream, first_buff)
return (status_headers, gen)
# Create rewrite stream, may even be chunked by front-end
def _rewriting_stream_gen(self, rewriter, encoding, stream, first_buff = None):
def _rewriting_stream_gen(self, rewriter, encoding,
stream, first_buff=None):
def do_rewrite(buff):
if encoding:
buff = self._decode_buff(buff, stream, encoding)
@ -113,8 +125,10 @@ class RewriteContent:
def do_finish():
return rewriter.close()
return self.stream_to_gen(stream, rewrite_func = do_rewrite, final_read_func = do_finish, first_buff = first_buff)
return self.stream_to_gen(stream,
rewrite_func=do_rewrite,
final_read_func=do_finish,
first_buff=first_buff)
def _decode_buff(self, buff, stream, encoding):
try:
@ -133,17 +147,17 @@ class RewriteContent:
return buff
def _detect_charset(self, stream):
buff = stream.read(8192)
result = chardet.detect(buff)
print "chardet result: " + str(result)
return (result['encoding'], buff)
# Create a generator reading from a stream, with optional rewriting and final read call
# Create a generator reading from a stream,
# with optional rewriting and final read call
@staticmethod
def stream_to_gen(stream, rewrite_func = None, final_read_func = None, first_buff = None):
def stream_to_gen(stream, rewrite_func=None,
final_read_func=None, first_buff=None):
try:
buff = first_buff if first_buff else stream.read()
while buff:
@ -160,5 +174,3 @@ class RewriteContent:
finally:
stream.close()

View File

@ -1,3 +1,7 @@
"""
Fetch a url from live web and apply rewriting rules
"""
import urllib2
import os
import sys
@ -13,10 +17,6 @@ from pywb.rewrite.url_rewriter import UrlRewriter
from pywb.rewrite.rewrite_content import RewriteContent
"""
Fetch a url from live web and apply rewriting rules
"""
#=================================================================
def get_status_and_stream(url):
resp = urllib2.urlopen(url)
@ -30,6 +30,7 @@ def get_status_and_stream(url):
return (status_headers, stream)
#=================================================================
def get_local_file(uri):
fh = open(uri)
@ -37,11 +38,13 @@ def get_local_file(uri):
content_type, _ = mimetypes.guess_type(uri)
# create fake headers for local file
status_headers = StatusAndHeaders('200 OK', [('Content-Type', content_type)])
status_headers = StatusAndHeaders('200 OK',
[('Content-Type', content_type)])
stream = fh
return (status_headers, stream)
#=================================================================
def get_rewritten(url, urlrewriter, urlkey=None, head_insert_func=None):
if is_http(url):
@ -69,10 +72,12 @@ def get_rewritten(url, urlrewriter, urlkey=None, head_insert_func=None):
return (status_headers, buff)
#=================================================================
def main(): # pragma: no cover
if len(sys.argv) < 2:
print 'Usage: {0} url-to-fetch [wb-url-target] [extra-prefix]'.format(sys.argv[0])
msg = 'Usage: {0} url-to-fetch [wb-url-target] [extra-prefix]'
print msg.format(sys.argv[0])
return 1
else:
url = sys.argv[1]
@ -85,7 +90,8 @@ def main(): # pragma: no cover
prefix, wburl_str = wburl_str.split('/', 1)
prefix = '/' + prefix + '/'
else:
wburl_str = datetime_to_timestamp(datetime.datetime.now()) + '/http://example.com/path/sample.html'
wburl_str = (datetime_to_timestamp(datetime.datetime.now()) +
'/http://example.com/path/sample.html')
prefix = '/pywb_rewrite/'
urlrewriter = UrlRewriter(wburl_str, prefix)

View File

@ -7,6 +7,8 @@ from header_rewriter import HeaderRewriter
import itertools
#=================================================================
class RewriteRules(BaseRule):
def __init__(self, url_prefix, config={}):
super(RewriteRules, self).__init__(url_prefix, config)

View File

@ -0,0 +1,72 @@
"""
# UrlRewriter tests
>>> do_rewrite('other.html', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'https://web.archive.org/web/20131010/http://example.com/path/other.html'
>>> do_rewrite('file.js', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/', 'js_')
'https://web.archive.org/web/20131010js_/http://example.com/path/file.js'
>>> do_rewrite('/other.html', '20130907*/http://example.com/path/page.html', '/coll/')
'/coll/20130907*/http://example.com/other.html'
>>> do_rewrite('./other.html', '20130907*/http://example.com/path/page.html', '/coll/')
'/coll/20130907*/http://example.com/path/other.html'
>>> do_rewrite('../other.html', '20131112im_/http://example.com/path/page.html', '/coll/')
'/coll/20131112im_/http://example.com/other.html'
>>> do_rewrite('../../other.html', '*/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/*/http://example.com/other.html'
>>> do_rewrite('path/../../other.html', '*/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/*/http://example.com/other.html'
>>> do_rewrite('http://some-other-site.com', '20101226101112/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/20101226101112/http://some-other-site.com'
>>> do_rewrite(r'http:\/\/some-other-site.com', '20101226101112/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/20101226101112/http:\\\\/\\\\/some-other-site.com'
>>> do_rewrite('../../other.html', '2020/http://example.com/index.html', '/')
'/2020/http://example.com/other.html'
>>> do_rewrite('../../other.html', '2020/http://example.com/index.html', '')
'2020/http://example.com/other.html'
>>> do_rewrite('', '20131010010203/http://example.com/file.html', '/web/')
'/web/20131010010203/http://example.com/file.html'
>>> do_rewrite('#anchor', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'#anchor'
>>> do_rewrite('mailto:example@example.com', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'mailto:example@example.com'
>>> UrlRewriter('19960708im_/http://domain.example.com/path.txt', '/abc/').get_abs_url()
'/abc/19960708im_/'
>>> UrlRewriter('2013id_/example.com/file/path/blah.html', '/123/').get_timestamp_url('20131024')
'/123/20131024id_/http://example.com/file/path/blah.html'
# HttpsUrlRewriter tests
>>> HttpsUrlRewriter(None, None).rewrite('https://example.com/abc')
'http://example.com/abc'
>>> HttpsUrlRewriter(None, None).rewrite('http://example.com/abc')
'http://example.com/abc'
"""
from pywb.rewrite.url_rewriter import UrlRewriter, HttpsUrlRewriter
def do_rewrite(rel_url, base_url, prefix, mod = None):
rewriter = UrlRewriter(base_url, prefix)
return rewriter.rewrite(rel_url, mod)
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -0,0 +1,82 @@
"""
# Replay Urls
# ======================
>>> repr(WbUrl('20131010000506/example.com'))
"('replay', '20131010000506', '', 'http://example.com', '20131010000506/http://example.com')"
>>> repr(WbUrl('20130102im_/https://example.com'))
"('replay', '20130102', 'im_', 'https://example.com', '20130102im_/https://example.com')"
>>> repr(WbUrl('20130102im_/https:/example.com'))
"('replay', '20130102', 'im_', 'https://example.com', '20130102im_/https://example.com')"
# Protocol agnostic convert to http
>>> repr(WbUrl('20130102im_///example.com'))
"('replay', '20130102', 'im_', 'http://example.com', '20130102im_/http://example.com')"
>>> repr(WbUrl('cs_/example.com'))
"('latest_replay', '', 'cs_', 'http://example.com', 'cs_/http://example.com')"
>>> repr(WbUrl('https://example.com/xyz'))
"('latest_replay', '', '', 'https://example.com/xyz', 'https://example.com/xyz')"
>>> repr(WbUrl('https:/example.com/xyz'))
"('latest_replay', '', '', 'https://example.com/xyz', 'https://example.com/xyz')"
>>> repr(WbUrl('https://example.com/xyz?a=%2f&b=%2E'))
"('latest_replay', '', '', 'https://example.com/xyz?a=%2f&b=%2E', 'https://example.com/xyz?a=%2f&b=%2E')"
# Query Urls
# ======================
>>> repr(WbUrl('*/http://example.com/abc?def=a'))
"('query', '', '', 'http://example.com/abc?def=a', '*/http://example.com/abc?def=a')"
>>> repr(WbUrl('*/http://example.com/abc?def=a*'))
"('url_query', '', '', 'http://example.com/abc?def=a', '*/http://example.com/abc?def=a*')"
>>> repr(WbUrl('2010*/http://example.com/abc?def=a'))
"('query', '2010', '', 'http://example.com/abc?def=a', '2010*/http://example.com/abc?def=a')"
# timestamp range query
>>> repr(WbUrl('2009-2015*/http://example.com/abc?def=a'))
"('query', '2009', '', 'http://example.com/abc?def=a', '2009-2015*/http://example.com/abc?def=a')"
>>> repr(WbUrl('json/*/http://example.com/abc?def=a'))
"('query', '', 'json', 'http://example.com/abc?def=a', 'json/*/http://example.com/abc?def=a')"
>>> repr(WbUrl('timemap-link/2011*/http://example.com/abc?def=a'))
"('query', '2011', 'timemap-link', 'http://example.com/abc?def=a', 'timemap-link/2011*/http://example.com/abc?def=a')"
# strip off repeated, likely scheme-agnostic, slashes altogether
>>> repr(WbUrl('///example.com'))
"('latest_replay', '', '', 'http://example.com', 'http://example.com')"
>>> repr(WbUrl('//example.com/'))
"('latest_replay', '', '', 'http://example.com/', 'http://example.com/')"
>>> repr(WbUrl('/example.com/'))
"('latest_replay', '', '', 'http://example.com/', 'http://example.com/')"
# Error Urls
# ======================
>>> x = WbUrl('/#$%#/')
Traceback (most recent call last):
Exception: Bad Request Url: http://#$%#/
>>> x = WbUrl('/http://example.com:abc/')
Traceback (most recent call last):
Exception: Bad Request Url: http://example.com:abc/
# considered blank
>>> x = WbUrl('https:/')
>>> x = WbUrl('https:///')
>>> x = WbUrl('http://')
"""
from pywb.rewrite.wburl import WbUrl
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -5,55 +5,11 @@ from wburl import WbUrl
#=================================================================
class UrlRewriter:
class UrlRewriter(object):
"""
>>> do_rewrite('other.html', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'https://web.archive.org/web/20131010/http://example.com/path/other.html'
>>> do_rewrite('file.js', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/', 'js_')
'https://web.archive.org/web/20131010js_/http://example.com/path/file.js'
>>> do_rewrite('/other.html', '20130907*/http://example.com/path/page.html', '/coll/')
'/coll/20130907*/http://example.com/other.html'
>>> do_rewrite('./other.html', '20130907*/http://example.com/path/page.html', '/coll/')
'/coll/20130907*/http://example.com/path/other.html'
>>> do_rewrite('../other.html', '20131112im_/http://example.com/path/page.html', '/coll/')
'/coll/20131112im_/http://example.com/other.html'
>>> do_rewrite('../../other.html', '*/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/*/http://example.com/other.html'
>>> do_rewrite('path/../../other.html', '*/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/*/http://example.com/other.html'
>>> do_rewrite('http://some-other-site.com', '20101226101112/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/20101226101112/http://some-other-site.com'
>>> do_rewrite(r'http:\/\/some-other-site.com', '20101226101112/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/20101226101112/http:\\\\/\\\\/some-other-site.com'
>>> do_rewrite('../../other.html', '2020/http://example.com/index.html', '/')
'/2020/http://example.com/other.html'
>>> do_rewrite('../../other.html', '2020/http://example.com/index.html', '')
'2020/http://example.com/other.html'
>>> do_rewrite('', '20131010010203/http://example.com/file.html', '/web/')
'/web/20131010010203/http://example.com/file.html'
>>> do_rewrite('#anchor', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'#anchor'
>>> do_rewrite('mailto:example@example.com', '20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'mailto:example@example.com'
>>> UrlRewriter('19960708im_/http://domain.example.com/path.txt', '/abc/').get_abs_url()
'/abc/19960708im_/'
>>> UrlRewriter('2013id_/example.com/file/path/blah.html', '/123/').get_timestamp_url('20131024')
'/123/20131024id_/http://example.com/file/path/blah.html'
Main pywb UrlRewriter which rewrites absolute and relative urls
to be relative to the current page, as specified via a WbUrl
instance and an optional full path prefix
"""
NO_REWRITE_URI_PREFIX = ['#', 'javascript:', 'data:', 'mailto:', 'about:']
@ -67,9 +23,9 @@ class UrlRewriter:
#if self.prefix.endswith('/'):
# self.prefix = self.prefix[:-1]
def rewrite(self, url, mod = None):
def rewrite(self, url, mod=None):
# if special protocol, no rewriting at all
if any (url.startswith(x) for x in self.NO_REWRITE_URI_PREFIX):
if any(url.startswith(x) for x in self.NO_REWRITE_URI_PREFIX):
return url
wburl = self.wburl
@ -77,7 +33,8 @@ class UrlRewriter:
isAbs = any(url.startswith(x) for x in self.PROTOCOLS)
# Optimized rewriter for
# -rel urls that don't start with / and don't contain ../ and no special mod
# -rel urls that don't start with / and
# do not contain ../ and no special mod
if not (isAbs or mod or url.startswith('/') or ('../' in url)):
finalUrl = urlparse.urljoin(self.prefix + wburl.original_url, url)
@ -95,10 +52,10 @@ class UrlRewriter:
return finalUrl
def get_abs_url(self, url = ''):
def get_abs_url(self, url=''):
return self.prefix + self.wburl.to_str(url=url)
def get_timestamp_url(self, timestamp, url = None):
def get_timestamp_url(self, timestamp, url=None):
if url is None:
url = self.wburl.url
@ -111,23 +68,13 @@ class UrlRewriter:
return "UrlRewriter('{0}', '{1}')".format(self.wburl, self.prefix)
def do_rewrite(rel_url, base_url, prefix, mod = None):
rewriter = UrlRewriter(base_url, prefix)
return rewriter.rewrite(rel_url, mod)
#=================================================================
class HttpsUrlRewriter:
class HttpsUrlRewriter(object):
"""
A url rewriter which urls that start with https:// to http://
Other urls/input is unchanged.
>>> HttpsUrlRewriter(None, None).rewrite('https://example.com/abc')
'http://example.com/abc'
>>> HttpsUrlRewriter(None, None).rewrite('http://example.com/abc')
'http://example.com/abc'
"""
HTTP = 'http://'
HTTPS = 'https://'
@ -149,9 +96,3 @@ class HttpsUrlRewriter:
def set_base_url(self, newUrl):
pass
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -49,7 +49,6 @@ class BaseWbUrl(object):
REPLAY = 'replay'
LATEST_REPLAY = 'latest_replay'
def __init__(self, url='', mod='',
timestamp='', end_timestamp='', type=None):
@ -62,82 +61,6 @@ class BaseWbUrl(object):
#=================================================================
class WbUrl(BaseWbUrl):
"""
# Replay Urls
# ======================
>>> repr(WbUrl('20131010000506/example.com'))
"('replay', '20131010000506', '', 'http://example.com', '20131010000506/http://example.com')"
>>> repr(WbUrl('20130102im_/https://example.com'))
"('replay', '20130102', 'im_', 'https://example.com', '20130102im_/https://example.com')"
>>> repr(WbUrl('20130102im_/https:/example.com'))
"('replay', '20130102', 'im_', 'https://example.com', '20130102im_/https://example.com')"
# Protocol agnostic convert to http
>>> repr(WbUrl('20130102im_///example.com'))
"('replay', '20130102', 'im_', 'http://example.com', '20130102im_/http://example.com')"
>>> repr(WbUrl('cs_/example.com'))
"('latest_replay', '', 'cs_', 'http://example.com', 'cs_/http://example.com')"
>>> repr(WbUrl('https://example.com/xyz'))
"('latest_replay', '', '', 'https://example.com/xyz', 'https://example.com/xyz')"
>>> repr(WbUrl('https:/example.com/xyz'))
"('latest_replay', '', '', 'https://example.com/xyz', 'https://example.com/xyz')"
>>> repr(WbUrl('https://example.com/xyz?a=%2f&b=%2E'))
"('latest_replay', '', '', 'https://example.com/xyz?a=%2f&b=%2E', 'https://example.com/xyz?a=%2f&b=%2E')"
# Query Urls
# ======================
>>> repr(WbUrl('*/http://example.com/abc?def=a'))
"('query', '', '', 'http://example.com/abc?def=a', '*/http://example.com/abc?def=a')"
>>> repr(WbUrl('*/http://example.com/abc?def=a*'))
"('url_query', '', '', 'http://example.com/abc?def=a', '*/http://example.com/abc?def=a*')"
>>> repr(WbUrl('2010*/http://example.com/abc?def=a'))
"('query', '2010', '', 'http://example.com/abc?def=a', '2010*/http://example.com/abc?def=a')"
# timestamp range query
>>> repr(WbUrl('2009-2015*/http://example.com/abc?def=a'))
"('query', '2009', '', 'http://example.com/abc?def=a', '2009-2015*/http://example.com/abc?def=a')"
>>> repr(WbUrl('json/*/http://example.com/abc?def=a'))
"('query', '', 'json', 'http://example.com/abc?def=a', 'json/*/http://example.com/abc?def=a')"
>>> repr(WbUrl('timemap-link/2011*/http://example.com/abc?def=a'))
"('query', '2011', 'timemap-link', 'http://example.com/abc?def=a', 'timemap-link/2011*/http://example.com/abc?def=a')"
# strip off repeated, likely scheme-agnostic, slashes altogether
>>> repr(WbUrl('///example.com'))
"('latest_replay', '', '', 'http://example.com', 'http://example.com')"
>>> repr(WbUrl('//example.com/'))
"('latest_replay', '', '', 'http://example.com/', 'http://example.com/')"
>>> repr(WbUrl('/example.com/'))
"('latest_replay', '', '', 'http://example.com/', 'http://example.com/')"
# Error Urls
# ======================
>>> x = WbUrl('/#$%#/')
Traceback (most recent call last):
Exception: Bad Request Url: http://#$%#/
>>> x = WbUrl('/http://example.com:abc/')
Traceback (most recent call last):
Exception: Bad Request Url: http://example.com:abc/
# considered blank
>>> x = WbUrl('https:/')
>>> x = WbUrl('https:///')
>>> x = WbUrl('http://')
"""
# Regexs
# ======================
QUERY_REGEX = re.compile('^(?:([\w\-:]+)/)?(\d*)(?:-(\d+))?\*/?(.*)$')
@ -146,13 +69,12 @@ class WbUrl(BaseWbUrl):
DEFAULT_SCHEME = 'http://'
# ======================
def __init__(self, url):
super(WbUrl, self).__init__()
self.original_url = url
if not any (f(url) for f in [self._init_query, self._init_replay]):
if not any(f(url) for f in [self._init_query, self._init_replay]):
raise Exception('Invalid WbUrl: ', url)
if len(self.url) == 0:
@ -168,7 +90,8 @@ class WbUrl(BaseWbUrl):
if inx < len(self.url) and self.url[inx] != '/':
self.url = self.url[:inx] + '/' + self.url[inx:]
# BUG?: adding upper() because rfc3987 lib rejects lower case %-encoding
# BUG?: adding upper() because rfc3987 lib
# rejects lower case %-encoding
# %2F is fine, but %2f -- standard supports either
matcher = rfc3987.match(self.url.upper(), 'IRI')
@ -218,15 +141,14 @@ class WbUrl(BaseWbUrl):
self.timestamp = timestamp
self.type = self.REPLAY
# Str Representation
# ====================
def to_str(self, **overrides):
atype = overrides['type'] if 'type' in overrides else self.type
mod = overrides['mod'] if 'mod' in overrides else self.mod
timestamp = overrides['timestamp'] if 'timestamp' in overrides else self.timestamp
end_timestamp = overrides['end_timestamp'] if 'end_timestamp' in overrides else self.end_timestamp
url = overrides['url'] if 'url' in overrides else self.url
atype = overrides.get('type', self.type)
mod = overrides.get('mod', self.mod)
timestamp = overrides.get('timestamp', self.timestamp)
end_timestamp = overrides.get('end_timestamp', self.end_timestamp)
url = overrides.get('url', self.url)
if atype == self.QUERY or atype == self.URL_QUERY:
tsmod = ''
@ -253,7 +175,3 @@ class WbUrl(BaseWbUrl):
def __repr__(self):
return str((self.type, self.timestamp, self.mod, self.url, str(self)))
if __name__ == "__main__":
import doctest
doctest.testmod()