mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
cdx: move perms related handling to pywb.perms package, support
custom processing ops, of which perms is a specific type add lazy_ops test to ensure all cdx processing ops are lazy perms: set up a 'perms policy' factory and perms policy implementation perms policy setting results in a custom processing op update tests to work with new config IndexReader handles both cdx server + perms policy
This commit is contained in:
parent
e0d5846484
commit
577c74be49
@ -19,12 +19,6 @@ class CaptureNotFoundException(CDXException):
|
||||
return '404 Not Found'
|
||||
|
||||
|
||||
#=================================================================
|
||||
class AccessException(CDXException):
|
||||
def status(self):
|
||||
return '403 Access Denied'
|
||||
|
||||
|
||||
#=================================================================
|
||||
class CDXObject(OrderedDict):
|
||||
CDX_FORMATS = [
|
||||
|
@ -1,4 +1,4 @@
|
||||
from cdxobject import CDXObject, IDXObject, AccessException
|
||||
from cdxobject import CDXObject, IDXObject
|
||||
from query import CDXQuery
|
||||
from pywb.utils.timeutils import timestamp_to_sec
|
||||
|
||||
@ -11,15 +11,12 @@ from collections import deque
|
||||
|
||||
|
||||
#=================================================================
|
||||
def cdx_load(sources, query, perms_checker=None, process=True):
|
||||
def cdx_load(sources, query, process=True):
|
||||
"""
|
||||
merge text CDX lines from sources, return an iterator for
|
||||
filtered and access-checked sequence of CDX objects.
|
||||
|
||||
:param sources: iterable for text CDX sources.
|
||||
:param perms_checker: access check filter object implementing
|
||||
allow_url_lookup(key), allow_capture(cdxobj) and
|
||||
filter_fields(cdxobj) methods.
|
||||
:param process: bool, perform processing sorting/filtering/grouping ops
|
||||
"""
|
||||
cdx_iter = create_merged_cdx_gen(sources, query)
|
||||
@ -28,8 +25,9 @@ def cdx_load(sources, query, perms_checker=None, process=True):
|
||||
if process and not query.secondary_index_only:
|
||||
cdx_iter = process_cdx(cdx_iter, query)
|
||||
|
||||
if perms_checker:
|
||||
cdx_iter = restrict_cdx(cdx_iter, query, perms_checker)
|
||||
custom_ops = query.custom_ops
|
||||
for op in custom_ops:
|
||||
cdx_iter = op(cdx_iter, query)
|
||||
|
||||
if query.output == 'text':
|
||||
cdx_iter = cdx_to_text(cdx_iter, query.fields)
|
||||
@ -43,30 +41,6 @@ def cdx_to_text(cdx_iter, fields):
|
||||
yield cdx.to_text(fields)
|
||||
|
||||
|
||||
#=================================================================
|
||||
def restrict_cdx(cdx_iter, query, perms_checker):
|
||||
"""
|
||||
filter out those cdx records that user doesn't have access to,
|
||||
by consulting :param perms_checker:.
|
||||
:param cdx_iter: cdx record source iterable
|
||||
:param query: request parameters (CDXQuery)
|
||||
:param perms_checker: object implementing permission checker
|
||||
"""
|
||||
if not perms_checker.allow_url_lookup(query.key):
|
||||
if query.is_exact:
|
||||
raise AccessException('Excluded')
|
||||
|
||||
for cdx in cdx_iter:
|
||||
# TODO: we could let filter_fields handle this case by accepting
|
||||
# None as a return value.
|
||||
if not perms_checker.allow_capture(cdx):
|
||||
continue
|
||||
|
||||
cdx = perms_checker.filter_fields(cdx)
|
||||
|
||||
yield cdx
|
||||
|
||||
|
||||
#=================================================================
|
||||
def process_cdx(cdx_iter, query):
|
||||
if query.resolve_revisits:
|
||||
@ -81,15 +55,16 @@ def process_cdx(cdx_iter, query):
|
||||
cdx_iter = cdx_collapse_time_status(cdx_iter, collapse_time)
|
||||
|
||||
limit = query.limit
|
||||
reverse = query.reverse
|
||||
|
||||
if query.reverse:
|
||||
if reverse:
|
||||
cdx_iter = cdx_reverse(cdx_iter, limit)
|
||||
|
||||
closest = query.closest
|
||||
if closest:
|
||||
cdx_iter = cdx_sort_closest(closest, cdx_iter, limit)
|
||||
|
||||
if limit:
|
||||
if limit and not reverse:
|
||||
cdx_iter = cdx_limit(cdx_iter, limit)
|
||||
|
||||
return cdx_iter
|
||||
@ -127,8 +102,9 @@ def make_obj_iter(text_iter, query):
|
||||
#=================================================================
|
||||
# limit cdx to at most limit
|
||||
def cdx_limit(cdx_iter, limit):
|
||||
for cdx, _ in itertools.izip(cdx_iter, xrange(limit)):
|
||||
yield cdx
|
||||
# for cdx, _ in itertools.izip(cdx_iter, xrange(limit)):
|
||||
# yield cdx
|
||||
return (cdx for cdx, _ in itertools.izip(cdx_iter, xrange(limit)))
|
||||
|
||||
|
||||
#=================================================================
|
||||
|
@ -35,7 +35,7 @@ class BaseCDXServer(object):
|
||||
self.url_canon = UrlCanonicalizer(surt_ordered)
|
||||
|
||||
# set perms checker, if any
|
||||
self.perms_checker = kwargs.get('perms_checker')
|
||||
#self.perms_checker = kwargs.get('perms_checker')
|
||||
|
||||
def _check_cdx_iter(self, cdx_iter, query):
|
||||
""" Check cdx iter semantics
|
||||
@ -100,8 +100,8 @@ class CDXServer(BaseCDXServer):
|
||||
query.set_key(key, end_key)
|
||||
|
||||
cdx_iter = cdx_load(self.sources,
|
||||
query,
|
||||
perms_checker=self.perms_checker)
|
||||
query)
|
||||
#perms_checker=self.perms_checker)
|
||||
|
||||
return self._check_cdx_iter(cdx_iter, query)
|
||||
|
||||
@ -194,7 +194,7 @@ class RemoteCDXServer(BaseCDXServer):
|
||||
|
||||
|
||||
#=================================================================
|
||||
def create_cdx_server(config, ds_rules_file=None, perms_checker=None):
|
||||
def create_cdx_server(config, ds_rules_file=None):
|
||||
if hasattr(config, 'get'):
|
||||
paths = config.get('index_paths')
|
||||
surt_ordered = config.get('surt_ordered', True)
|
||||
@ -214,5 +214,4 @@ def create_cdx_server(config, ds_rules_file=None, perms_checker=None):
|
||||
return server_cls(paths,
|
||||
config=pass_config,
|
||||
surt_ordered=surt_ordered,
|
||||
ds_rules_file=ds_rules_file,
|
||||
perms_checker=perms_checker)
|
||||
ds_rules_file=ds_rules_file)
|
||||
|
@ -1,7 +1,7 @@
|
||||
from pywb.utils.binsearch import iter_range
|
||||
from pywb.utils.loaders import SeekableTextFileReader
|
||||
|
||||
from cdxobject import AccessException
|
||||
from pywb.utils.wbexception import AccessException
|
||||
from query import CDXQuery
|
||||
|
||||
import urllib
|
||||
|
@ -1,30 +0,0 @@
|
||||
|
||||
|
||||
#=================================================================
|
||||
class AllowAllPerms(object):
|
||||
"""
|
||||
Sample Perm Checker which allows all
|
||||
"""
|
||||
def allow_url_lookup(self, urlkey):
|
||||
"""
|
||||
Return true/false if url or urlkey (canonicalized url)
|
||||
should be allowed
|
||||
"""
|
||||
return True
|
||||
|
||||
def allow_capture(self, cdx):
|
||||
"""
|
||||
Return true/false is specified capture (cdx) should be
|
||||
allowed
|
||||
"""
|
||||
return True
|
||||
|
||||
def filter_fields(self, cdx):
|
||||
"""
|
||||
Filter out any forbidden cdx fields from cdx dictionary
|
||||
"""
|
||||
return cdx
|
||||
|
||||
|
||||
#=================================================================
|
||||
#TODO: other types of perm handlers
|
@ -1,5 +1,4 @@
|
||||
from urllib import urlencode
|
||||
from urlparse import parse_qs
|
||||
from cdxobject import CDXException
|
||||
|
||||
|
||||
@ -79,6 +78,10 @@ class CDXQuery(object):
|
||||
return (self._get_bool('reverse') or
|
||||
self.params.get('sort') == 'reverse')
|
||||
|
||||
@property
|
||||
def custom_ops(self):
|
||||
return self.params.get('custom_ops', [])
|
||||
|
||||
@property
|
||||
def secondary_index_only(self):
|
||||
return self._get_bool('showPagedIndex')
|
||||
@ -97,28 +100,3 @@ class CDXQuery(object):
|
||||
|
||||
def urlencode(self):
|
||||
return urlencode(self.params, True)
|
||||
|
||||
@staticmethod
|
||||
def from_wsgi_env(env):
|
||||
return CDXQuery(**CDXQuery.extract_params_from_wsgi_env(env))
|
||||
|
||||
@staticmethod
|
||||
def extract_params_from_wsgi_env(env):
|
||||
""" utility function to extract params and create a CDXQuery
|
||||
from a WSGI environment dictionary
|
||||
"""
|
||||
params = parse_qs(env['QUERY_STRING'])
|
||||
|
||||
# parse_qs produces arrays for single values
|
||||
# cdx processing expects singleton params for all params,
|
||||
# except filters, so convert here
|
||||
# use first value of the list
|
||||
for name, val in params.iteritems():
|
||||
if name != 'filter':
|
||||
params[name] = val[0]
|
||||
|
||||
if not 'output' in params:
|
||||
params['output'] = 'text'
|
||||
|
||||
|
||||
return params
|
||||
|
@ -177,13 +177,9 @@ import sys
|
||||
import pprint
|
||||
|
||||
from pywb import get_test_dir
|
||||
#test_cdx_dir = os.path.dirname(os.path.realpath(__file__)) + '/../sample_data/'
|
||||
|
||||
test_cdx_dir = get_test_dir() + 'cdx/'
|
||||
from pywb.cdx.cdxobject import AccessException
|
||||
|
||||
from tests.fixture import testconfig, TestExclusionPerms
|
||||
|
||||
import pytest
|
||||
|
||||
def cdx_ops_test(url, sources = [test_cdx_dir + 'iana.cdx'], **kwparams):
|
||||
kwparams['url'] = url
|
||||
@ -199,22 +195,6 @@ def cdx_ops_test(url, sources = [test_cdx_dir + 'iana.cdx'], **kwparams):
|
||||
l = x.to_text(fields).replace('\t', ' ')
|
||||
sys.stdout.write(l)
|
||||
|
||||
#================================================================
|
||||
|
||||
def test_excluded(testconfig):
|
||||
testconfig['perms_checker'] = TestExclusionPerms()
|
||||
sources = testconfig.get('index_paths')
|
||||
print sources
|
||||
server = CDXServer(sources, perms_checker=testconfig['perms_checker'])
|
||||
assert isinstance(server, CDXServer)
|
||||
assert server.perms_checker
|
||||
|
||||
url = 'http://www.iana.org/_img/bookmark_icon.ico'
|
||||
key = 'org,iana)/_img/bookmark_icon.ico'
|
||||
with pytest.raises(AccessException):
|
||||
cdxobjs = list(server.load_cdx(url=url))
|
||||
print cdxobjs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
|
60
pywb/cdx/test/test_lazy_ops.py
Normal file
60
pywb/cdx/test/test_lazy_ops.py
Normal file
@ -0,0 +1,60 @@
|
||||
from pywb.utils.wbexception import AccessException
|
||||
from pywb.cdx.cdxops import cdx_load
|
||||
from pywb.cdx.query import CDXQuery
|
||||
|
||||
from pytest import raises
|
||||
|
||||
KEY = 'com,example)/'
|
||||
|
||||
#================================================================
|
||||
def raise_access_exception(cdx_iter, query):
|
||||
if query.key == KEY:
|
||||
raise AccessException
|
||||
|
||||
for cdx in cdx_iter:
|
||||
yield
|
||||
|
||||
#================================================================
|
||||
def lazy_cdx_load(**params):
|
||||
"""
|
||||
# Verify that an op 'short-circuits' further evaluation.. eg, a bad cdx source is not even loaded
|
||||
# as soon as exception is thrown
|
||||
|
||||
Exception is thrown on first .next() access, not on the cdx_load
|
||||
"""
|
||||
params['custom_ops'] = [raise_access_exception]
|
||||
|
||||
cdx_iter = cdx_load(['bogus ignored'],
|
||||
CDXQuery(**params),
|
||||
process=True)
|
||||
|
||||
# exception happens on first access attempt
|
||||
with raises(AccessException):
|
||||
cdx_iter.next()
|
||||
|
||||
|
||||
def test_no_process():
|
||||
lazy_cdx_load(key=KEY)
|
||||
|
||||
def test_reverse():
|
||||
lazy_cdx_load(key=KEY, reverse=True)
|
||||
|
||||
def test_closest():
|
||||
lazy_cdx_load(key=KEY, closest='2013')
|
||||
|
||||
def test_limit():
|
||||
lazy_cdx_load(key=KEY, limit=10)
|
||||
|
||||
def test_multi_ops():
|
||||
lazy_cdx_load(key=KEY,
|
||||
resolveRevisits=True,
|
||||
filters=['=filename:A'],
|
||||
collapseTime=10,
|
||||
reverse=True,
|
||||
closest='2013',
|
||||
limit=5,
|
||||
fields='timestamp,filename',
|
||||
output='text')
|
||||
|
||||
|
||||
|
@ -1,28 +0,0 @@
|
||||
from pywb.cdx.cdxops import cdx_load
|
||||
from pywb.cdx.perms import AllowAllPerms
|
||||
from pywb.cdx.query import CDXQuery
|
||||
from pywb.cdx.cdxobject import AccessException
|
||||
|
||||
from pytest import raises
|
||||
|
||||
class BlockAllPerms(AllowAllPerms):
|
||||
def allow_url_lookup(self, urlkey):
|
||||
return False
|
||||
|
||||
|
||||
def test_exclusion_short_circuit():
|
||||
"""
|
||||
# Verify that exclusion check 'short-circuits' further evaluation.. eg, a bad cdx source is not even loaded
|
||||
# if exclusion check does not pass
|
||||
"""
|
||||
cdx_iter = cdx_load(['bogus ignored'], CDXQuery(url='example.com', key='com,example)/'),
|
||||
perms_checker=BlockAllPerms(), process=True)
|
||||
|
||||
# exception happens on first access attempt
|
||||
with raises(AccessException):
|
||||
cdx_iter.next()
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -1,11 +1,13 @@
|
||||
from pywb.cdx.query import CDXQuery
|
||||
from pywb.cdx.cdxserver import create_cdx_server
|
||||
|
||||
from pywb.framework.archivalrouter import ArchivalRouter, Route
|
||||
from pywb.framework.basehandlers import BaseHandler
|
||||
|
||||
from indexreader import IndexReader
|
||||
from views import TextCapturesView
|
||||
|
||||
from urlparse import parse_qs
|
||||
|
||||
|
||||
#=================================================================
|
||||
class CDXHandler(BaseHandler):
|
||||
@ -18,18 +20,40 @@ class CDXHandler(BaseHandler):
|
||||
self.view = view if view else TextCapturesView()
|
||||
|
||||
def __call__(self, wbrequest):
|
||||
params = CDXQuery.extract_params_from_wsgi_env(wbrequest.env)
|
||||
cdx_lines = self.index_reader.load_cdx(**params)
|
||||
params = self.extract_params_from_wsgi_env(wbrequest.env)
|
||||
|
||||
return self.view.render_response(wbrequest, cdx_lines)
|
||||
cdx_iter = self.index_reader.load_cdx(wbrequest, params)
|
||||
|
||||
return self.view.render_response(wbrequest, cdx_iter)
|
||||
|
||||
def __str__(self):
|
||||
return 'CDX Handler: ' + str(self.index_reader)
|
||||
|
||||
@staticmethod
|
||||
def extract_params_from_wsgi_env(env):
|
||||
""" utility function to extract params and create a CDXQuery
|
||||
from a WSGI environment dictionary
|
||||
"""
|
||||
params = parse_qs(env['QUERY_STRING'])
|
||||
|
||||
# parse_qs produces arrays for single values
|
||||
# cdx processing expects singleton params for all params,
|
||||
# except filters, so convert here
|
||||
# use first value of the list
|
||||
for name, val in params.iteritems():
|
||||
if name != 'filter':
|
||||
params[name] = val[0]
|
||||
|
||||
if not 'output' in params:
|
||||
params['output'] = 'text'
|
||||
|
||||
return params
|
||||
|
||||
|
||||
#=================================================================
|
||||
DEFAULT_RULES = 'pywb/rules.yaml'
|
||||
|
||||
|
||||
#=================================================================
|
||||
def create_cdx_server_app(config):
|
||||
"""
|
||||
@ -38,6 +62,9 @@ def create_cdx_server_app(config):
|
||||
TODO: more complex example with multiple collections?
|
||||
"""
|
||||
cdx_server = create_cdx_server(config, DEFAULT_RULES)
|
||||
perms_policy = config.get('perms_policy')
|
||||
cdx_server = IndexReader(cdx_server, perms_policy)
|
||||
|
||||
port = config.get('port')
|
||||
routes = [Route('cdx', CDXHandler(cdx_server))]
|
||||
return ArchivalRouter(routes, port=port)
|
||||
|
@ -6,13 +6,14 @@ import urllib2
|
||||
class IndexReader(object):
|
||||
"""
|
||||
Main interface for reading index (currently only CDX) from a
|
||||
source server (currenlt a cdx server)
|
||||
source server (currently a cdx server)
|
||||
|
||||
Creates an appropriate query based on wbrequest type info
|
||||
"""
|
||||
|
||||
def __init__(self, cdx_server):
|
||||
def __init__(self, cdx_server, perms_policy):
|
||||
self.cdx_server = cdx_server
|
||||
self.perms_policy = perms_policy
|
||||
|
||||
def load_for_request(self, wbrequest):
|
||||
wburl = wbrequest.wb_url
|
||||
@ -29,12 +30,18 @@ class IndexReader(object):
|
||||
|
||||
params['allowFuzzy'] = True
|
||||
params['output'] = 'cdxobject'
|
||||
params['url'] = wburl.url
|
||||
|
||||
cdxlines = self.load_cdx(url=wburl.url, **params)
|
||||
cdxlines = self.load_cdx(wbrequest, params)
|
||||
|
||||
return cdxlines
|
||||
|
||||
def load_cdx(self, **params):
|
||||
def load_cdx(self, wbrequest, params):
|
||||
if self.perms_policy:
|
||||
perms_op = self.perms_policy.create_perms_filter_op(wbrequest)
|
||||
if perms_op:
|
||||
params['custom_ops'] = [perms_op]
|
||||
|
||||
return self.cdx_server.load_cdx(**params)
|
||||
|
||||
def get_query_params(self, wburl, limit = 150000, collapse_time = None, replay_closest = 100):
|
||||
@ -53,9 +60,6 @@ class IndexReader(object):
|
||||
wburl.REPLAY:
|
||||
{'sort': 'closest', 'filter': ['!statuscode:(500|502|504)'], 'limit': replay_closest, 'closest': wburl.timestamp, 'resolveRevisits': True},
|
||||
|
||||
# BUG: resolveRevisits currently doesn't work for this type of query
|
||||
# This is not an issue in archival mode, as there is a redirect to the actual timestamp query
|
||||
# but may be an issue in proxy mode
|
||||
wburl.LATEST_REPLAY:
|
||||
{'sort': 'reverse', 'filter': ['statuscode:[23]..'], 'limit': '1', 'resolveRevisits': True}
|
||||
|
||||
|
@ -131,13 +131,12 @@ def create_wb_router(passed_config = {}):
|
||||
|
||||
ds_rules_file = route_config.get('domain_specific_rules', None)
|
||||
|
||||
perms_checker = route_config.get('perms_checker', None)
|
||||
perms_policy = route_config.get('perms_policy', None)
|
||||
|
||||
cdx_server = create_cdx_server(route_config,
|
||||
ds_rules_file,
|
||||
perms_checker)
|
||||
ds_rules_file)
|
||||
|
||||
cdx_server = IndexReader(cdx_server)
|
||||
cdx_server = IndexReader(cdx_server, perms_policy)
|
||||
|
||||
wb_handler = create_wb_handler(
|
||||
cdx_server=cdx_server,
|
||||
|
0
pywb/perms/__init__.py
Normal file
0
pywb/perms/__init__.py
Normal file
60
pywb/perms/perms_filter.py
Normal file
60
pywb/perms/perms_filter.py
Normal file
@ -0,0 +1,60 @@
|
||||
from pywb.utils.wbexception import AccessException
|
||||
|
||||
|
||||
#=================================================================
|
||||
def create_filter_op(perms_checker):
|
||||
|
||||
def perms_filter_op(cdx_iter, query):
|
||||
"""
|
||||
filter out those cdx records that user doesn't have access to,
|
||||
by consulting :param perms_checker:.
|
||||
:param cdx_iter: cdx record source iterable
|
||||
:param query: request parameters (CDXQuery)
|
||||
:param perms_checker: object implementing permission checker
|
||||
"""
|
||||
if not perms_checker.allow_url_lookup(query.key):
|
||||
if query.is_exact:
|
||||
raise AccessException('Excluded')
|
||||
|
||||
for cdx in cdx_iter:
|
||||
cdx = perms_checker.access_check_capture(cdx)
|
||||
if cdx:
|
||||
yield cdx
|
||||
|
||||
return perms_filter_op
|
||||
|
||||
|
||||
#================================================================
|
||||
class AllowAllPermsPolicy(object):
|
||||
def create_perms_filter_op(self, wbrequest):
|
||||
return create_filter_op(self.create_perms_checker(wbrequest))
|
||||
|
||||
def create_perms_checker(self, wbrequest):
|
||||
return AllowAllPerms()
|
||||
|
||||
|
||||
#=================================================================
|
||||
class AllowAllPerms(object):
|
||||
"""
|
||||
Sample Perm Checker which allows all
|
||||
"""
|
||||
|
||||
def allow_url_lookup(self, key):
|
||||
"""
|
||||
Return true/false if urlkey (canonicalized url)
|
||||
should be allowed.
|
||||
|
||||
Default: allow all
|
||||
"""
|
||||
return True
|
||||
|
||||
def access_check_capture(self, cdx):
|
||||
"""
|
||||
Allow/deny specified cdx capture (dict) to be included
|
||||
in the result.
|
||||
Return None to reject, or modify the cdx to exclude
|
||||
any fields that need to be restricted.
|
||||
|
||||
Default: allow cdx line without modifications
|
||||
"""
|
||||
return cdx
|
28
pywb/perms/test/test_perms.py
Normal file
28
pywb/perms/test/test_perms.py
Normal file
@ -0,0 +1,28 @@
|
||||
from pywb.cdx.cdxops import cdx_load
|
||||
from pywb.cdx.query import CDXQuery
|
||||
from pywb.cdx.cdxserver import CDXServer
|
||||
from pywb.utils.wbexception import AccessException
|
||||
from pywb.core.indexreader import IndexReader
|
||||
|
||||
#from pywb.perms.perms_filter import AllowAllPerms
|
||||
|
||||
from pytest import raises
|
||||
|
||||
from tests.fixture import TestExclusionPermsPolicy, testconfig
|
||||
|
||||
|
||||
#================================================================
|
||||
def test_excluded(testconfig):
|
||||
sources = testconfig.get('index_paths')
|
||||
perms_policy = testconfig.get('perms_policy')
|
||||
|
||||
cdx_server = CDXServer(sources)
|
||||
index_reader = IndexReader(cdx_server, perms_policy)
|
||||
|
||||
url = 'http://www.iana.org/_img/bookmark_icon.ico'
|
||||
|
||||
params = dict(url=url)
|
||||
|
||||
with raises(AccessException):
|
||||
cdxobjs = list(index_reader.load_cdx(None, params))
|
||||
print cdxobjs
|
@ -1,3 +1,10 @@
|
||||
#=================================================================
|
||||
class WbException(Exception):
|
||||
def status(self):
|
||||
return '500 Internal Server Error'
|
||||
|
||||
|
||||
#=================================================================
|
||||
class AccessException(WbException):
|
||||
def status(self):
|
||||
return '403 Access Denied'
|
||||
|
1
setup.py
1
setup.py
@ -20,6 +20,7 @@ setup(
|
||||
'pywb.warc',
|
||||
'pywb.rewrite',
|
||||
'pywb.framework'
|
||||
'pywb.perms',
|
||||
'pywb.core',
|
||||
'pywb.apps'
|
||||
],
|
||||
|
@ -101,4 +101,4 @@ reporter: !!python/object/new:tests.fixture.PrintReporter []
|
||||
#domain_specific_rules: rules.yaml
|
||||
|
||||
#perms_checker: !!python/object/new:pywb.cdx.perms.AllowAllPerms []
|
||||
perms_checker: !!python/object/new:tests.fixture.TestExclusionPerms []
|
||||
perms_policy: !!python/object/new:tests.fixture.TestExclusionPermsPolicy []
|
||||
|
@ -3,7 +3,7 @@ import pytest
|
||||
|
||||
import yaml
|
||||
|
||||
from pywb.cdx.perms import AllowAllPerms
|
||||
from pywb.perms.perms_filter import AllowAllPerms, AllowAllPermsPolicy
|
||||
|
||||
@pytest.fixture
|
||||
def testconfig():
|
||||
@ -29,17 +29,23 @@ class PrintReporter:
|
||||
#================================================================
|
||||
class TestExclusionPerms(AllowAllPerms):
|
||||
"""
|
||||
Perm Checker fixture which can block one URL.
|
||||
Perm Checker fixture to block a single url for testing
|
||||
"""
|
||||
# sample_archive has captures for this URLKEY
|
||||
URLKEY_EXCLUDED = 'org,iana)/_img/bookmark_icon.ico'
|
||||
|
||||
def allow_url_lookup(self, urlkey):
|
||||
"""
|
||||
Return true/false if url or urlkey (canonicalized url)
|
||||
Return true/false if url (canonicalized url)
|
||||
should be allowed
|
||||
"""
|
||||
if urlkey == self.URLKEY_EXCLUDED:
|
||||
return False
|
||||
|
||||
return super(TestExclusionPerms, self).allow_url_lookup(urlkey)
|
||||
|
||||
|
||||
#================================================================
|
||||
class TestExclusionPermsPolicy(AllowAllPermsPolicy):
|
||||
def create_perms_checker(self, wbrequest):
|
||||
return TestExclusionPerms()
|
||||
|
Loading…
x
Reference in New Issue
Block a user