1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

acl: support for exact acl rules via '###' suffix

- ex: rule 'com,example)/###' matches http://example.com/ only
- wb-manager acl add/remove --exact-match adds/remove exact match rules
- tests: add tests for exact match queries, acl
This commit is contained in:
Ilya Kreymer 2019-03-08 10:10:02 -08:00 committed by John Berlin
parent 3589240431
commit 7ac9a37bb4
No known key found for this signature in database
GPG Key ID: 6EF5E4B442011B02
6 changed files with 80 additions and 13 deletions

View File

@ -101,13 +101,19 @@ class ACLManager(CollectionsManager):
except Exception as e:
print('Error Saving ACL Rules: ' + str(e))
def to_key(self, url_or_surt):
def to_key(self, url_or_surt, exact_match=False):
""" If 'url_or_surt' already a SURT, use as is
If exact match, add the exact match suffix
"""
if self.SURT_RX.search(url_or_surt):
return url_or_surt
result = url_or_surt
else:
return canonicalize(url_or_surt)
result = canonicalize(url_or_surt)
if exact_match:
result += AccessChecker.EXACT_SUFFIX
return result
def validate_access(self, access):
if access not in self.VALID_ACCESS:
@ -118,14 +124,14 @@ class ACLManager(CollectionsManager):
return True
def add_rule(self, r):
return self._add_rule(r.url, r.access)
return self._add_rule(r.url, r.access, r.exact_match)
def _add_rule(self, url, access):
def _add_rule(self, url, access, exact_match=False):
if not self.validate_access(access):
return
acl = CDXObject()
acl['urlkey'] = self.to_key(url)
acl['urlkey'] = self.to_key(url, exact_match)
acl['timestamp'] = '-'
acl['access'] = access
acl['url'] = url
@ -183,7 +189,7 @@ class ACLManager(CollectionsManager):
def remove_rule(self, r):
i = 0
urlkey = self.to_key(r.url)
urlkey = self.to_key(r.url, r.exact_match)
for rule in self.rules:
if urlkey == rule['urlkey']:# and r.timestamp == rule['timestamp']:
acl = self.rules.pop(i)
@ -251,10 +257,14 @@ class ACLManager(CollectionsManager):
op.add_argument(arg, nargs='?', default='allow')
else:
op.add_argument(arg)
if kwargs.get('exact_opt'):
op.add_argument('-e', '--exact-match', action='store_true', default=False)
op.set_defaults(acl_func=kwargs['func'])
command('add', 'coll_name', 'url', 'access', func=cls.add_rule)
command('remove', 'coll_name', 'url', func=cls.remove_rule)
command('add', 'coll_name', 'url', 'access', func=cls.add_rule, exact_opt=True)
command('remove', 'coll_name', 'url', func=cls.remove_rule, exact_opt=True)
command('list', 'coll_name', func=cls.list_rules)
command('validate', 'coll_name', func=cls.validate_save)
command('match', 'coll_name', 'url', 'default_access', func=cls.find_match)

View File

@ -16,7 +16,12 @@ class FileAccessIndexSource(FileIndexSource):
return (a < b) - (a > b)
def _do_iter(self, fh, params):
for line in search(fh, params['key'], prev_size=1, compare_func=self.rev_cmp):
exact_suffix = params.get('exact_match_suffix')
key = params['key']
if exact_suffix:
key += exact_suffix
for line in search(fh, key, prev_size=1, compare_func=self.rev_cmp):
yield line
@ -43,6 +48,9 @@ class CacheDirectoryAccessSource(CacheDirectoryMixin, DirectoryAccessSource):
# ============================================================================
class AccessChecker(object):
EXACT_SUFFIX = '###'
EXACT_SUFFIX_B = b'###'
def __init__(self, access_source, default_access='allow'):
if isinstance(access_source, str):
self.aggregator = self.create_access_aggregator([access_source])
@ -76,22 +84,32 @@ class AccessChecker(object):
raise Exception('Invalid Access Source: ' + filename)
def find_access_rule(self, url, ts=None, urlkey=None):
params = {'url': url, 'urlkey': urlkey, 'nosource': 'true'}
params = {'url': url,
'urlkey': urlkey,
'nosource': 'true',
'exact_match_suffix': self.EXACT_SUFFIX_B
}
acl_iter, errs = self.aggregator(params)
if errs:
print(errs)
key = params['key']
key_exact = key + self.EXACT_SUFFIX_B
tld = key.split(b',')[0]
for acl in acl_iter:
# skip empty/invalid lines
if not acl:
continue
acl_key = acl.split(b' ')[0]
if key_exact == acl_key:
return CDXObject(acl)
if key.startswith(acl_key):
return CDXObject(acl)

View File

@ -114,3 +114,17 @@ class TestAccess(TempDirTests, BaseTestClass):
assert edx['urlkey'] == 'net,example)/abc/path'
assert edx['access'] == 'block'
# exact-only matchc
edx = access.find_access_rule('https://www.iana.org/')
assert edx['urlkey'] == 'org,iana)/###'
assert edx['access'] == 'allow'
edx = access.find_access_rule('https://www.iana.org/any/other')
assert edx['urlkey'] == 'org,iana)/'
assert edx['access'] == 'exclude'
edx = access.find_access_rule('https://www.iana.org/x')
assert edx['urlkey'] == 'org,iana)/'
assert edx['access'] == 'exclude'

View File

@ -1,5 +1,6 @@
org,iana)/about - {"access": "block"}
org,iana)/_css/2013.1/fonts/opensans-semibold.ttf - {"access": "allow"}
org,iana)/_css - {"access": "exclude"}
org,iana)/### - {"access": "allow"}
org,iana)/ - {"access": "exclude"}
org,example)/?example=1 - {"access": "block"}

View File

@ -18,11 +18,18 @@ class TestACLApp(BaseConfigTest):
return self.testapp.get('/{coll}/cdx?'.format(coll=coll) + urlencode(params, doseq=1))
def test_excluded_url(self):
resp = self.query('http://www.iana.org/')
resp = self.query('http://www.iana.org/domains/root')
assert len(resp.text.splitlines()) == 0
self.testapp.get('/pywb/mp_/http://www.iana.org/', status=404)
self.testapp.get('/pywb/mp_/http://www.iana.org/domains/root', status=404)
def test_allowed_exact_url(self):
resp = self.query('http://www.iana.org/')
assert len(resp.text.splitlines()) == 3
self.testapp.get('/pywb/mp_/http://www.iana.org/', status=200)
def test_blocked_url(self):
resp = self.query('http://www.iana.org/about/')

View File

@ -79,6 +79,23 @@ Matched rule:
with open(self.acl_filename, 'rt') as fh:
assert fh.read() == """\
com,example)/ - {"access": "allow", "url": "http://example.com/"}
"""
def test_acl_add_exact(self):
wb_manager(['acl', 'add', '--exact-match', self.acl_filename, 'example.com', 'block'])
with open(self.acl_filename, 'rt') as fh:
assert fh.read() == """\
com,example)/### - {"access": "block", "url": "example.com"}
com,example)/ - {"access": "allow", "url": "http://example.com/"}
"""
def test_remove_acl_exact(self):
wb_manager(['acl', 'remove', '-e', self.acl_filename, 'https://example.com/'])
with open(self.acl_filename, 'rt') as fh:
assert fh.read() == """\
com,example)/ - {"access": "allow", "url": "http://example.com/"}
"""
def test_validate_and_sort_acl(self):