1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

add wburlrewriter, ReferRedirect uses the rewriter

more refactoring, ReferRedirect moved into archivalrouter module
wbrequest: parses from uri directly, keeps track of wburl and prefix
This commit is contained in:
Ilya Kreymer 2013-12-20 14:54:41 -08:00
parent 0a2b16407d
commit 4cf4bf3bbb
7 changed files with 289 additions and 160 deletions

View File

@ -1,22 +1,122 @@
from refer_redirect import ReferRedirect import urlparse
from wbrequestresponse import WbRequest, WbResponse
from wbrequestresponse import WbRequest, WbResponse
from wburlrewriter import ArchivalUrlRewriter
#=================================================================
# ArchivalRequestRouter -- route WB requests in archival mode
#=================================================================
class ArchivalRequestRouter: class ArchivalRequestRouter:
def __init__(self, mappings, hostpaths=None): def __init__(self, mappings, hostpaths = None, abs_path = True):
self.mappings = mappings self.mappings = mappings
self.fallback = ReferRedirect(hostpaths) self.fallback = ReferRedirect(hostpaths)
self.abs_path = abs_path
def parse_request(self, env): def _parseRequest(self, env):
request_uri = env['REQUEST_URI'] request_uri = env['REQUEST_URI']
for key, value in self.mappings.iteritems(): for coll, handler in self.mappings.iteritems():
if request_uri.startswith(key): rel_prefix = '/' + coll + '/'
return value, WbRequest.prefix_request(env, key, request_uri) if request_uri.startswith(rel_prefix):
#return value, ArchivalRequestRouter._prefix_request(env, key, request_uri)
req = WbRequest(env,
request_uri = request_uri,
coll = coll,
wb_url = request_uri[len(coll) + 1:],
wb_prefix = self.getPrefix(env, rel_prefix))
return handler, req
return self.fallback, WbRequest(env) return self.fallback, WbRequest(env)
def handle_request(self, env): def handleRequest(self, env):
handler, wbrequest = self.parse_request(env) handler, wbrequest = self._parseRequest(env)
return handler.run(wbrequest) return handler.run(wbrequest)
def getPrefix(self, env, rel_prefix):
if self.abs_path:
try:
return env['wsgi.url_scheme'] + '://' + env['HTTP_HOST'] + rel_prefix
except KeyError:
return rel_prefix
else:
return rel_prefix
#=================================================================
# ReferRedirect -- redirect urls that have 'fallen through' based on the referrer settings
#=================================================================
class ReferRedirect:
"""
>>> ReferRedirect('http://localhost:8080/').matchPrefixs
['http://localhost:8080/']
>>> ReferRedirect(['http://example:9090/']).matchPrefixs
['http://example:9090/']
>>> test_redir('http://localhost:8080/', '/other.html', 'http://localhost:8080/coll/20131010/http://example.com/path/page.html')
'http://localhost:8080/coll/20131010/http://example.com/path/other.html'
>>> test_redir('http://localhost:8080/', '/../other.html', 'http://localhost:8080/coll/20131010/http://example.com/path/page.html')
'http://localhost:8080/coll/20131010/http://example.com/other.html'
>>> test_redir('http://localhost:8080/', '/../../other.html', 'http://localhost:8080/coll/20131010/http://example.com/index.html')
'http://localhost:8080/coll/20131010/http://example.com/other.html'
>>> test_redir('http://example:8080/', '/other.html', 'http://localhost:8080/coll/20131010/http://example.com/path/page.html')
False
"""
def __init__(self, matchPrefixs):
if isinstance(matchPrefixs, list):
self.matchPrefixs = matchPrefixs
else:
self.matchPrefixs = [matchPrefixs]
def run(self, wbrequest):
if wbrequest.referrer is None:
return None
if not any (wbrequest.referrer.startswith(i) for i in self.matchPrefixs):
return None
try:
ref_split = urlparse.urlsplit(wbrequest.referrer)
ref_path = ref_split.path[1:].split('/', 1)
rewriter = ArchivalUrlRewriter('/' + ref_path[1], '/' + ref_path[0])
rel_request_uri = wbrequest.request_uri[1:]
#ref_wb_url = archiveurl('/' + ref_path[1])
#ref_wb_url.url = urlparse.urljoin(ref_wb_url.url, wbrequest.request_uri[1:])
#ref_wb_url.url = ref_wb_url.url.replace('../', '')
#final_url = urlparse.urlunsplit((ref_split.scheme, ref_split.netloc, ref_path[0] + str(ref_wb_url), '', ''))
final_url = urlparse.urlunsplit((ref_split.scheme, ref_split.netloc, rewriter.rewrite(rel_request_uri), '', ''))
except Exception as e:
raise e
return WbResponse.redir_response(final_url)
if __name__ == "__main__":
import doctest
def test_redir(matchHost, request_uri, referrer):
env = {'REQUEST_URI': request_uri, 'HTTP_REFERER': referrer}
redir = ReferRedirect(matchHost)
req = WbRequest.parse(env)
rep = redir.run(req)
if not rep:
return False
return rep.get_header('Location')
doctest.testmod()

View File

@ -2,6 +2,8 @@ import urllib
import urllib2 import urllib2
import wbexceptions import wbexceptions
from wbarchivalurl import ArchivalUrl
class RemoteCDXServer: class RemoteCDXServer:
""" """
>>> x = cdxserver.load('example.com', parse_cdx = True, limit = '2') >>> x = cdxserver.load('example.com', parse_cdx = True, limit = '2')
@ -45,6 +47,27 @@ class RemoteCDXServer:
else: else:
return response return response
@staticmethod
def getQueryParams(wburl):
return {
ArchivalUrl.QUERY:
{'collapseTime': '10', 'filter': '!statuscode:(500|502|504)', 'limit': '150000'},
ArchivalUrl.URL_QUERY:
{'collapse': 'urlkey', 'matchType': 'prefix', 'showGroupCount': True, 'showUniqCount': True, 'lastSkipTimestamp': True, 'limit': '100',
'fl': 'urlkey,original,timestamp,endtimestamp,groupcount,uniqcount',
},
ArchivalUrl.REPLAY:
{'sort': 'closest', 'filter': '!statuscode:(500|502|504)', 'limit': '10', 'closest': wburl.timestamp, 'resolveRevisits': True},
ArchivalUrl.LATEST_REPLAY:
{'sort': 'reverse', 'filter': 'statuscode:[23]..', 'limit': '1', 'resolveRevisits': True}
}[wburl.type]
class CDXCaptureResult: class CDXCaptureResult:
CDX_FORMATS = [["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"], CDX_FORMATS = [["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"],
["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","offset","filename"]] ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","offset","filename"]]

View File

@ -1,76 +0,0 @@
import urlparse
from wbrequestresponse import WbRequest, WbResponse
from archiveurl import archiveurl
# Redirect urls that have 'fallen through' based on the referrer
# settings
class ReferRedirect:
"""
>>> ReferRedirect('http://localhost:8080/').matchPrefixs
['http://localhost:8080/']
>>> ReferRedirect(['http://example:9090/']).matchPrefixs
['http://example:9090/']
>>> test_redir('http://localhost:8080/', '/other.html', 'http://localhost:8080/coll/20131010/http://example.com/path/page.html')
'http://localhost:8080/coll/20131010/http://example.com/path/other.html'
>>> test_redir('http://localhost:8080/', '/../other.html', 'http://localhost:8080/coll/20131010/http://example.com/path/page.html')
'http://localhost:8080/coll/20131010/http://example.com/other.html'
>>> test_redir('http://localhost:8080/', '/../../other.html', 'http://localhost:8080/coll/20131010/http://example.com/index.html')
'http://localhost:8080/coll/20131010/http://example.com/other.html'
>>> test_redir('http://example:8080/', '/other.html', 'http://localhost:8080/coll/20131010/http://example.com/path/page.html')
False
"""
def __init__(self, matchPrefixs):
if isinstance(matchPrefixs, list):
self.matchPrefixs = matchPrefixs
else:
self.matchPrefixs = [matchPrefixs]
def run(self, wbrequest):
if wbrequest.referrer is None:
return None
if not any (wbrequest.referrer.startswith(i) for i in self.matchPrefixs):
return None
try:
ref_split = urlparse.urlsplit(wbrequest.referrer)
ref_path = ref_split.path[1:].split('/', 1)
ref_wb_url = archiveurl('/' + ref_path[1])
ref_wb_url.url = urlparse.urljoin(ref_wb_url.url, wbrequest.request_uri[1:])
ref_wb_url.url = ref_wb_url.url.replace('../', '')
final_url = urlparse.urlunsplit((ref_split.scheme, ref_split.netloc, ref_path[0] + str(ref_wb_url), '', ''))
except Exception as e:
return None
return WbResponse.redir_response(final_url)
if __name__ == "__main__":
import doctest
def test_redir(matchHost, request_uri, referrer):
env = {'REQUEST_URI': request_uri, 'HTTP_REFERER': referrer}
redir = ReferRedirect(matchHost)
req = WbRequest(env)
rep = redir.run(req)
if not rep:
return False
return rep.get_header('Location')
doctest.testmod()

View File

@ -1,47 +1,28 @@
from wbrequestresponse import WbResponse
from archiveurl import archiveurl
from archivalrouter import ArchivalRequestRouter
import indexreader import indexreader
import json import json
import wbexceptions import wbexceptions
import utils import utils
from wbrequestresponse import WbResponse
from archivalrouter import ArchivalRequestRouter
class EchoEnv:
def run(self, wbrequest):
return WbResponse.text_response(str(wbrequest.env))
class WBHandler: class WBHandler:
def run(self, wbrequest): def run(self, wbrequest):
wburl = archiveurl(wbrequest.wb_url) return WbResponse.text_response(str(wbrequest))
wbrequest.parsed_url = wburl
return WbResponse.text_stream(str(vars(wburl)))
class QueryHandler: class QueryHandler:
def __init__(self): def __init__(self):
self.cdxserver = indexreader.RemoteCDXServer('http://web.archive.org/cdx/search/cdx') self.cdxserver = indexreader.RemoteCDXServer('http://web.archive.org/cdx/search/cdx')
@staticmethod
def get_query_params(wburl):
return {
archiveurl.QUERY:
{'collapseTime': '10', 'filter': '!statuscode:(500|502|504)', 'limit': '150000'},
archiveurl.URL_QUERY:
{'collapse': 'urlkey', 'matchType': 'prefix', 'showGroupCount': True, 'showUniqCount': True, 'lastSkipTimestamp': True, 'limit': '100',
'fl': 'urlkey,original,timestamp,endtimestamp,groupcount,uniqcount',
},
archiveurl.REPLAY:
{'sort': 'closest', 'filter': '!statuscode:(500|502|504)', 'limit': '10', 'closest': wburl.timestamp, 'resolveRevisits': True},
archiveurl.LATEST_REPLAY:
{'sort': 'reverse', 'filter': 'statuscode:[23]..', 'limit': '1', 'resolveRevisits': True}
}[wburl.type]
def run(self, wbrequest): def run(self, wbrequest):
wburl = archiveurl(wbrequest.wb_url) wburl = wbrequest.wb_url
#wburl = wbresponse.body.parsed_url
params = QueryHandler.get_query_params(wburl) params = self.cdxserver.getQueryParams(wburl)
cdxlines = self.cdxserver.load(wburl.url, params) cdxlines = self.cdxserver.load(wburl.url, params)
@ -56,8 +37,10 @@ class QueryHandler:
## =========== ## ===========
parser = ArchivalRequestRouter( parser = ArchivalRequestRouter(
{'/t1/' : WBHandler(), {
'/t2/' : QueryHandler() 't0' : EchoEnv(),
't1' : WBHandler(),
't2' : QueryHandler()
}, },
hostpaths = ['http://localhost:9090/']) hostpaths = ['http://localhost:9090/'])
## =========== ## ===========
@ -67,7 +50,7 @@ def application(env, start_response):
response = None response = None
try: try:
response = parser.handle_request(env) response = parser.handleRequest(env)
if not response: if not response:
raise wbexceptions.NotFoundException(env['REQUEST_URI'] + ' was not found') raise wbexceptions.NotFoundException(env['REQUEST_URI'] + ' was not found')
@ -76,11 +59,11 @@ def application(env, start_response):
last_exc = e last_exc = e
import traceback import traceback
traceback.print_exc() traceback.print_exc()
response = handle_exception(env, e) response = handleException(env, e)
return response(env, start_response) return response(env, start_response)
def handle_exception(env, exc): def handleException(env, exc):
if hasattr(exc, 'status'): if hasattr(exc, 'status'):
status = exc.status() status = exc.status()
else: else:

View File

@ -5,51 +5,51 @@ import rfc3987
import wbexceptions import wbexceptions
# archiveurl : archivalurl representation for WB # ArchivalUrl : archivalurl representation for WB
class archiveurl: class ArchivalUrl:
""" """
# Replay Urls # Replay Urls
# ====================== # ======================
>>> repr(archiveurl('/20131010000506/example.com')) >>> repr(ArchivalUrl('/20131010000506/example.com'))
"('replay', '20131010000506', '', 'http://example.com', '/20131010000506/http://example.com')" "('replay', '20131010000506', '', 'http://example.com', '/20131010000506/http://example.com')"
>>> repr(archiveurl('/20130102im_/https://example.com')) >>> repr(ArchivalUrl('/20130102im_/https://example.com'))
"('replay', '20130102', 'im_', 'https://example.com', '/20130102im_/https://example.com')" "('replay', '20130102', 'im_', 'https://example.com', '/20130102im_/https://example.com')"
>>> repr(archiveurl('/cs_/example.com')) >>> repr(ArchivalUrl('/cs_/example.com'))
"('latest_replay', '', 'cs_', 'http://example.com', '/cs_/http://example.com')" "('latest_replay', '', 'cs_', 'http://example.com', '/cs_/http://example.com')"
>>> repr(archiveurl('/https://example.com/xyz')) >>> repr(ArchivalUrl('/https://example.com/xyz'))
"('latest_replay', '', '', 'https://example.com/xyz', '/https://example.com/xyz')" "('latest_replay', '', '', 'https://example.com/xyz', '/https://example.com/xyz')"
# Query Urls # Query Urls
# ====================== # ======================
>>> repr(archiveurl('/*/http://example.com/abc?def=a')) >>> repr(ArchivalUrl('/*/http://example.com/abc?def=a'))
"('query', '', '', 'http://example.com/abc?def=a', '/*/http://example.com/abc?def=a')" "('query', '', '', 'http://example.com/abc?def=a', '/*/http://example.com/abc?def=a')"
>>> repr(archiveurl('/*/http://example.com/abc?def=a*')) >>> repr(ArchivalUrl('/*/http://example.com/abc?def=a*'))
"('url_query', '', '', 'http://example.com/abc?def=a', '/*/http://example.com/abc?def=a*')" "('url_query', '', '', 'http://example.com/abc?def=a', '/*/http://example.com/abc?def=a*')"
>>> repr(archiveurl('/json/*/http://example.com/abc?def=a')) >>> repr(ArchivalUrl('/json/*/http://example.com/abc?def=a'))
"('query', '', 'json', 'http://example.com/abc?def=a', '/json/*/http://example.com/abc?def=a')" "('query', '', 'json', 'http://example.com/abc?def=a', '/json/*/http://example.com/abc?def=a')"
>>> repr(archiveurl('/timemap-link/2011*/http://example.com/abc?def=a')) >>> repr(ArchivalUrl('/timemap-link/2011*/http://example.com/abc?def=a'))
"('query', '2011', 'timemap-link', 'http://example.com/abc?def=a', '/timemap-link/2011*/http://example.com/abc?def=a')" "('query', '2011', 'timemap-link', 'http://example.com/abc?def=a', '/timemap-link/2011*/http://example.com/abc?def=a')"
# Error Urls # Error Urls
# ====================== # ======================
>>> x = archiveurl('abc') >>> x = ArchivalUrl('abc')
Traceback (most recent call last): Traceback (most recent call last):
RequestParseException: Invalid WB Request Url: abc RequestParseException: Invalid WB Request Url: abc
>>> x = archiveurl('/#$%#/') >>> x = ArchivalUrl('/#$%#/')
Traceback (most recent call last): Traceback (most recent call last):
BadUrlException: Bad Request Url: http://#$%#/ BadUrlException: Bad Request Url: http://#$%#/
>>> x = archiveurl('/http://example.com:abc/') >>> x = ArchivalUrl('/http://example.com:abc/')
Traceback (most recent call last): Traceback (most recent call last):
BadUrlException: Bad Request Url: http://example.com:abc/ BadUrlException: Bad Request Url: http://example.com:abc/
""" """
@ -75,14 +75,14 @@ class archiveurl:
self.timestamp = '' self.timestamp = ''
self.mod = '' self.mod = ''
if not any (f(self, url) for f in [archiveurl._init_query, archiveurl._init_replay]): if not any (f(self, url) for f in [ArchivalUrl._init_query, ArchivalUrl._init_replay]):
raise wbexceptions.RequestParseException('Invalid WB Request Url: ' + url) raise wbexceptions.RequestParseException('Invalid WB Request Url: ' + url)
if len(self.url) == 0: if len(self.url) == 0:
raise wbexceptions.RequestParseException('Invalid WB Request Url: ' + url) raise wbexceptions.RequestParseException('Invalid WB Request Url: ' + url)
if not self.url.startswith('//') and not '://' in self.url: if not self.url.startswith('//') and not '://' in self.url:
self.url = archiveurl.DEFAULT_SCHEME + self.url self.url = ArchivalUrl.DEFAULT_SCHEME + self.url
matcher = rfc3987.match(self.url, 'IRI') matcher = rfc3987.match(self.url, 'IRI')
@ -92,7 +92,7 @@ class archiveurl:
# Match query regex # Match query regex
# ====================== # ======================
def _init_query(self, url): def _init_query(self, url):
query = archiveurl.QUERY_REGEX.match(url) query = ArchivalUrl.QUERY_REGEX.match(url)
if not query: if not query:
return None return None
@ -102,16 +102,16 @@ class archiveurl:
self.timestamp = res[1] self.timestamp = res[1]
self.url = res[2] self.url = res[2]
if self.url.endswith('*'): if self.url.endswith('*'):
self.type = archiveurl.URL_QUERY self.type = ArchivalUrl.URL_QUERY
self.url = self.url[:-1] self.url = self.url[:-1]
else: else:
self.type = archiveurl.QUERY self.type = ArchivalUrl.QUERY
return True return True
# Match replay regex # Match replay regex
# ====================== # ======================
def _init_replay(self, url): def _init_replay(self, url):
replay = archiveurl.REPLAY_REGEX.match(url) replay = ArchivalUrl.REPLAY_REGEX.match(url)
if not replay: if not replay:
return None return None
@ -121,16 +121,16 @@ class archiveurl:
self.mod = res[1] self.mod = res[1]
self.url = res[2] self.url = res[2]
if self.timestamp: if self.timestamp:
self.type = archiveurl.REPLAY self.type = ArchivalUrl.REPLAY
else: else:
self.type = archiveurl.LATEST_REPLAY self.type = ArchivalUrl.LATEST_REPLAY
return True return True
# Str Representation # Str Representation
# ==================== # ====================
def __str__(self): def __str__(self):
if self.type == archiveurl.QUERY or self.type == archiveurl.URL_QUERY: if self.type == ArchivalUrl.QUERY or self.type == ArchivalUrl.URL_QUERY:
tsmod = "/" tsmod = "/"
if self.mod: if self.mod:
tsmod += self.mod + "/" tsmod += self.mod + "/"
@ -138,7 +138,7 @@ class archiveurl:
tsmod += self.timestamp tsmod += self.timestamp
tsmod += "*/" + self.url tsmod += "*/" + self.url
if self.type == archiveurl.URL_QUERY: if self.type == ArchivalUrl.URL_QUERY:
tsmod += "*" tsmod += "*"
return tsmod return tsmod
else: else:

View File

@ -1,32 +1,79 @@
from wbarchivalurl import ArchivalUrl
#WB Request and Response #WB Request and Response
class WbRequest: class WbRequest:
""" """
>>> WbRequest.prefix_request({'REQUEST_URI': '/save/_embed/example.com/?a=b'}, '/save/') >>> WbRequest.parse({'REQUEST_URI': '/save/_embed/example.com/?a=b'})
WbRequest(env, '/_embed/example.com/?a=b', 'save') {'wb_url': ('latest_replay', '', '', 'http://_embed/example.com/?a=b', '/http://_embed/example.com/?a=b'), 'coll': 'save', 'wb_prefix': '/save/', 'request_uri': '/save/_embed/example.com/?a=b'}
>>> WbRequest.parse({'REQUEST_URI': '/2345/20101024101112im_/example.com/?b=c'})
{'wb_url': ('replay', '20101024101112', 'im_', 'http://example.com/?b=c', '/20101024101112im_/http://example.com/?b=c'), 'coll': '2345', 'wb_prefix': '/2345/', 'request_uri': '/2345/20101024101112im_/example.com/?b=c'}
>>> WbRequest.parse({'REQUEST_URI': '/2010/example.com'})
{'wb_url': ('latest_replay', '', '', 'http://example.com', '/http://example.com'), 'coll': '2010', 'wb_prefix': '/2010/', 'request_uri': '/2010/example.com'}
>>> WbRequest.parse({'REQUEST_URI': '../example.com'})
{'wb_url': ('latest_replay', '', '', 'http://example.com', '/http://example.com'), 'coll': '', 'wb_prefix': '/', 'request_uri': '../example.com'}
""" """
def __init__(self, env, request_uri = '', wb_url = '', coll = ''):
self.env = env
# if len(wb_url) == 0:
# wb_url = request_uri
setattr(self, 'wb_url', wb_url)
setattr(self, 'coll', coll)
setattr(self, 'request_uri', request_uri)
setattr(self, 'referrer', env.get('HTTP_REFERER'))
@staticmethod @staticmethod
def prefix_request(env, prefix, request_uri = ''): def parse(env, request_uri = ''):
if not request_uri: if not request_uri:
request_uri = env.get('REQUEST_URI') request_uri = env.get('REQUEST_URI')
return WbRequest(env, request_uri, request_uri[len(prefix)-1:], coll = prefix[1:-1])
parts = request_uri.split('/', 2)
# Has coll prefix
if len(parts) == 3:
wb_prefix = '/' + parts[1] + '/'
wb_url = '/' + parts[2]
coll = parts[1]
# No Coll Prefix
elif len(parts) == 2:
wb_prefix = '/'
wb_url = '/' + parts[1]
coll = ''
else:
wb_prefix = '/'
wb_url = parts[0]
coll = ''
return WbRequest(env, request_uri, wb_prefix, wb_url, coll)
def __init__(self, env, request_uri, wb_prefix, wb_url, coll):
self.env = env
self.request_uri = request_uri if request_uri else env.get('REQUEST_URI')
self.wb_prefix = wb_prefix
self.wb_url = ArchivalUrl(wb_url)
self.coll = coll
self.referrer = env.get('HTTP_REFERER')
self.is_ajax = self._is_ajax()
def _is_ajax(self):
value = self.env.get('HTTP_X_REQUESTED_WITH')
if not value:
return False
if value.lower() == 'xmlhttprequest':
return True
if self.referrer and ('ajaxpipe' in self.env.get('QUERY_STRING')):
return True
return False
def __repr__(self): def __repr__(self):
return "WbRequest(env, '" + (self.wb_url) + "', '" + (self.coll) + "')" #return "WbRequest(env, '" + (self.wb_url) + "', '" + (self.coll) + "')"
#return str(vars(self))
varlist = vars(self)
return str({k: varlist[k] for k in ('request_uri', 'wb_prefix', 'wb_url', 'coll')})
class WbResponse: class WbResponse:

52
pywb/wburlrewriter.py Normal file
View File

@ -0,0 +1,52 @@
import copy
import urlparse
from wbarchivalurl import ArchivalUrl
class ArchivalUrlRewriter:
"""
>>> test_rewrite('other.html', '/20131010/http://example.com/path/page.html', 'https://web.archive.org/web/')
'https://web.archive.org/web/20131010/http://example.com/path/other.html'
>>> test_rewrite('./other.html', '/20130907*/http://example.com/path/page.html', '/coll/')
'/coll/20130907*/http://example.com/path/other.html'
>>> test_rewrite('../other.html', '/20131112im_/http://example.com/path/page.html', '/coll/')
'/coll/20131112im_/http://example.com/other.html'
>>> test_rewrite('../../other.html', '/*/http://example.com/index.html', 'localhost:8080/')
'localhost:8080/*/http://example.com/other.html'
>>> test_rewrite('../../other.html', '/2020/http://example.com/index.html', '/')
'/2020/http://example.com/other.html'
>>> test_rewrite('../../other.html', '/2020/http://example.com/index.html', '')
'/2020/http://example.com/other.html'
"""
def __init__(self, wburl_str, prefix):
self.wburl_str = wburl_str
self.prefix = prefix
if self.prefix.endswith('/'):
self.prefix = self.prefix[:-1]
def rewrite(self, rel_url):
if '../' in rel_url:
wburl = ArchivalUrl(self.wburl_str)
wburl.url = urlparse.urljoin(wburl.url, rel_url)
wburl.url = wburl.url.replace('../', '')
final_url = self.prefix + str(wburl)
else:
final_url = urlparse.urljoin(self.prefix + self.wburl_str, rel_url)
return final_url
if __name__ == "__main__":
import doctest
def test_rewrite(rel_url, base_url, prefix):
rewriter = ArchivalUrlRewriter(base_url, prefix)
return rewriter.rewrite(rel_url)
doctest.testmod()