diff --git a/pywb/perms/perms_handler.py b/pywb/perms/perms_handler.py new file mode 100644 index 00000000..9f3715e1 --- /dev/null +++ b/pywb/perms/perms_handler.py @@ -0,0 +1,65 @@ +from pywb.utils.canonicalize import UrlCanonicalizer + +from pywb.framework.basehandlers import WbUrlHandler +from pywb.framework.archivalrouter import ArchivalRouter, Route +from pywb.framework.wbrequestresponse import WbResponse +from pywb.framework.wbexceptions import NotFoundException + +BLOCK = '["block"]' +ALLOW = '["allow"]' +RESPONSE_TYPE = 'application/json' + +NOT_FOUND = 'Please specify a url to check for access' + +#================================================================= +class PermsHandler(WbUrlHandler): + + def __init__(self, perms_policy, url_canon): + self.perms_policy = perms_policy + self.url_canon = url_canon + + def __call__(self, wbrequest): + perms_checker = self.perms_policy.create_perms_checker(wbrequest) + + if wbrequest.wb_url: + return self.check_single_url(wbrequest, perms_checker) + +# elif wbrequest.env['REQUEST_METHOD'] == 'POST': +# return self.check_bulk(wbrequest, perms_checker) + + else: + raise NotFoundException(NOT_FOUND) + + def check_single_url(self, wbrequest, perms_checker): + urlkey = self.url_canon(wbrequest.wb_url.url) + + if not perms_checker.allow_url_lookup(urlkey): + response_text = BLOCK + else: + response_text = ALLOW + + #TODO: other types of checking + return WbResponse.text_response(response_text, + content_type=RESPONSE_TYPE) +#TODO +# def check_bulk_urls(self, wbrequest, perms_checker): +# pass +# + + +#================================================================= +def create_perms_checker_app(config): + """ + Create permissions checker standalone app + Running under the '/check-access' route + """ + port = config.get('port') + + perms_policy = config.get('perms_policy') + + canonicalizer = UrlCanonicalizer(config.get('surt_ordered', True)) + + handler = PermsHandler(perms_policy, canonicalizer) + routes = [Route('check-access', handler)] + + return ArchivalRouter(routes, port=port) diff --git a/tests/test_perms_app.py b/tests/test_perms_app.py new file mode 100644 index 00000000..da7d3840 --- /dev/null +++ b/tests/test_perms_app.py @@ -0,0 +1,53 @@ +import webtest + +from pywb.perms.perms_handler import create_perms_checker_app +from pywb.perms.perms_handler import ALLOW, BLOCK +from pywb.framework.wsgi_wrappers import init_app + +class TestPermsApp: + TEST_CONFIG = 'test_config.yaml' + + def setup(self): + self.app = init_app(create_perms_checker_app, + load_yaml=True, + config_file=self.TEST_CONFIG) + + self.testapp = webtest.TestApp(self.app) + + + def test_allow(self): + resp = self.testapp.get('/check-access/http://example.com') + + assert resp.content_type == 'application/json' + + assert ALLOW in resp.body + + + def test_allow_with_timestamp(self): + resp = self.testapp.get('/check-access/20131024000102/http://example.com') + + assert resp.content_type == 'application/json' + + assert ALLOW in resp.body + + + def test_block_with_timestamp(self): + resp = self.testapp.get('/check-access/20131024000102/http://www.iana.org/_img/bookmark_icon.ico') + + assert resp.content_type == 'application/json' + + assert BLOCK in resp.body + + + def test_bad_url(self): + resp = self.testapp.get('/check-access/@#$', expect_errors=True, status = 400) + + assert resp.status_int == 400 + + assert 'Invalid Url: http://@' in resp.body + + + def test_not_found(self): + resp = self.testapp.get('/check-access/#abc', expect_errors=True, status = 404) + + assert resp.status_int == 404