mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
aclmanager: add unit tests for 'wb-manager acl' commands (ukwa/ukwa-pywb#7)
- add, importtxt will create an access file if it doesn't exist - return status code 1 on errors, including if file doesn't exist (for other commands)
This commit is contained in:
parent
bfa3aa7264
commit
0c1dfba1da
@ -40,11 +40,14 @@ class ACLManager(CollectionsManager):
|
||||
else:
|
||||
self.acl_file = r.coll_name
|
||||
|
||||
if r.op == 'add':
|
||||
# for add/import, file doesn't have to exist
|
||||
if r.op in ('add', 'importtxt'):
|
||||
self.load_acl(False)
|
||||
|
||||
# otherwise, ensure file loaded successfully and log errors
|
||||
else:
|
||||
if not self.load_acl(True):
|
||||
sys.exit(2)
|
||||
return
|
||||
|
||||
# if 'validate', the command itself is validation
|
||||
@ -83,7 +86,7 @@ class ACLManager(CollectionsManager):
|
||||
def save_acl(self, r=None):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(self.acl_file))
|
||||
except IOError:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
try:
|
||||
@ -105,6 +108,7 @@ class ACLManager(CollectionsManager):
|
||||
def validate_access(self, access):
|
||||
if access not in self.VALID_ACCESS:
|
||||
print('Valid access values are: ' + ', '.join(self.VALID_ACCESS))
|
||||
sys.exit(1)
|
||||
return False
|
||||
|
||||
return True
|
||||
@ -226,6 +230,7 @@ class ACLManager(CollectionsManager):
|
||||
|
||||
except Exception as e:
|
||||
print('Error Importing: ' + str(e))
|
||||
sys.exit(1)
|
||||
|
||||
def print_rule(self, rule):
|
||||
print(' ' + rule.to_cdxj())
|
||||
@ -235,14 +240,14 @@ class ACLManager(CollectionsManager):
|
||||
subparsers = parser.add_subparsers(dest='op')
|
||||
subparsers.required = True
|
||||
|
||||
def command(name, *args, func=None):
|
||||
def command(name, *args, **kwargs):
|
||||
op = subparsers.add_parser(name)
|
||||
for arg in args:
|
||||
if arg == 'default_access':
|
||||
op.add_argument(arg, nargs='?', default='allow')
|
||||
else:
|
||||
op.add_argument(arg)
|
||||
op.set_defaults(acl_func=func)
|
||||
op.set_defaults(acl_func=kwargs['func'])
|
||||
|
||||
command('add', 'coll_name', 'url', 'access', func=cls.add_rule)
|
||||
command('remove', 'coll_name', 'url', func=cls.remove_rule)
|
||||
|
136
tests/test_acl_manager.py
Normal file
136
tests/test_acl_manager.py
Normal file
@ -0,0 +1,136 @@
|
||||
import os
|
||||
|
||||
from .base_config_test import BaseConfigTest, CollsDirMixin, fmod
|
||||
from pywb.manager.manager import main as wb_manager
|
||||
from pytest import raises
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class TestACLManager(CollsDirMixin, BaseConfigTest):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestACLManager, cls).setup_class('config_test_access.yaml')
|
||||
|
||||
cls.acl_filename = os.path.join(cls.root_dir, 'acl', 'test.aclj')
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
super(TestACLManager, cls).teardown_class()
|
||||
try:
|
||||
os.remove(cls.acl_filename)
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_acl_add_err_wrong_access(self):
|
||||
with raises(SystemExit):
|
||||
wb_manager(['acl', 'add', self.acl_filename, 'http://example.com/', 'access'])
|
||||
|
||||
def test_acl_add(self):
|
||||
wb_manager(['acl', 'add', self.acl_filename, 'http://example.com/', 'allow'])
|
||||
|
||||
with open(self.acl_filename, 'rt') as fh:
|
||||
assert fh.read() == """\
|
||||
com,example)/ - {"access": "allow", "url": "http://example.com/"}
|
||||
"""
|
||||
|
||||
def test_acl_add_surt(self):
|
||||
wb_manager(['acl', 'add', self.acl_filename, 'com,example,', 'exclude'])
|
||||
|
||||
with open(self.acl_filename, 'rt') as fh:
|
||||
assert fh.read() == """\
|
||||
com,example, - {"access": "exclude", "url": "com,example,"}
|
||||
com,example)/ - {"access": "allow", "url": "http://example.com/"}
|
||||
"""
|
||||
|
||||
def test_acl_list(self, capsys):
|
||||
wb_manager(['acl', 'list', self.acl_filename])
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
|
||||
assert out == """\
|
||||
Rules for %s from %s:
|
||||
|
||||
com,example, - {"access": "exclude", "url": "com,example,"}
|
||||
com,example)/ - {"access": "allow", "url": "http://example.com/"}
|
||||
|
||||
""" % (self.acl_filename, self.acl_filename)
|
||||
|
||||
|
||||
def test_acl_list_err_no_such_file(self):
|
||||
with raises(SystemExit):
|
||||
wb_manager(['acl', 'list', self.acl_filename + '2'])
|
||||
|
||||
|
||||
def test_acl_match(self, capsys):
|
||||
wb_manager(['acl', 'match', self.acl_filename, 'http://abc.example.com/foo'])
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
|
||||
assert out == """\
|
||||
Matched rule:
|
||||
|
||||
com,example, - {"access": "exclude", "url": "com,example,", "source": "%s", "source-coll": "%s"}
|
||||
|
||||
""" % (self.acl_filename, self.acl_filename)
|
||||
|
||||
def test_remove_acl(self):
|
||||
wb_manager(['acl', 'remove', self.acl_filename, 'com,example,'])
|
||||
|
||||
with open(self.acl_filename, 'rt') as fh:
|
||||
assert fh.read() == """\
|
||||
com,example)/ - {"access": "allow", "url": "http://example.com/"}
|
||||
"""
|
||||
|
||||
def test_validate_and_sort_acl(self):
|
||||
with open(self.acl_filename, 'at') as fh:
|
||||
fh.write('com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"}\n')
|
||||
|
||||
with open(self.acl_filename, 'rt') as fh:
|
||||
assert fh.read() == """\
|
||||
com,example)/ - {"access": "allow", "url": "http://example.com/"}
|
||||
com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"}
|
||||
"""
|
||||
|
||||
wb_manager(['acl', 'validate', self.acl_filename])
|
||||
|
||||
with open(self.acl_filename, 'rt') as fh:
|
||||
assert fh.read() == """\
|
||||
com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"}
|
||||
com,example)/ - {"access": "allow", "url": "http://example.com/"}
|
||||
"""
|
||||
|
||||
def test_importtxt_acl(self, capsys):
|
||||
name = os.path.join(self.root_dir, 'excludes.txt')
|
||||
with open(name, 'wt') as exc:
|
||||
exc.write('http://iana.org/\n')
|
||||
exc.write('http://example.com/subpath/another\n')
|
||||
exc.write('http://example.co/foo/\n')
|
||||
exc.write('http://example.com/\n')
|
||||
|
||||
wb_manager(['acl', 'importtxt', self.acl_filename, name, 'exclude'])
|
||||
|
||||
with open(self.acl_filename, 'rt') as fh:
|
||||
assert fh.read() == """\
|
||||
org,iana)/ - {"access": "exclude", "url": "http://iana.org/"}
|
||||
com,example)/subpath/another - {"access": "exclude", "url": "http://example.com/subpath/another"}
|
||||
com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"}
|
||||
com,example)/ - {"access": "exclude", "url": "http://example.com/"}
|
||||
co,example)/foo - {"access": "exclude", "url": "http://example.co/foo/"}
|
||||
"""
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
|
||||
assert 'Added or replaced 4 rules from {0}'.format(name) in out, out
|
||||
|
||||
os.remove(name)
|
||||
|
||||
def test_import_errors(self):
|
||||
# missing access mode
|
||||
with raises(SystemExit):
|
||||
wb_manager(['acl', 'importtxt', self.acl_filename, 'foo'])
|
||||
|
||||
# no such file
|
||||
with raises(SystemExit):
|
||||
wb_manager(['acl', 'importtxt', self.acl_filename, 'foo', 'exclude'])
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user