diff --git a/pywb/manager/aclmanager.py b/pywb/manager/aclmanager.py index 7319af89..d533f16a 100644 --- a/pywb/manager/aclmanager.py +++ b/pywb/manager/aclmanager.py @@ -40,11 +40,14 @@ class ACLManager(CollectionsManager): else: self.acl_file = r.coll_name - if r.op == 'add': + # for add/import, file doesn't have to exist + if r.op in ('add', 'importtxt'): self.load_acl(False) + # otherwise, ensure file loaded successfully and log errors else: if not self.load_acl(True): + sys.exit(2) return # if 'validate', the command itself is validation @@ -83,7 +86,7 @@ class ACLManager(CollectionsManager): def save_acl(self, r=None): try: os.makedirs(os.path.dirname(self.acl_file)) - except IOError: + except OSError: pass try: @@ -105,6 +108,7 @@ class ACLManager(CollectionsManager): def validate_access(self, access): if access not in self.VALID_ACCESS: print('Valid access values are: ' + ', '.join(self.VALID_ACCESS)) + sys.exit(1) return False return True @@ -226,6 +230,7 @@ class ACLManager(CollectionsManager): except Exception as e: print('Error Importing: ' + str(e)) + sys.exit(1) def print_rule(self, rule): print(' ' + rule.to_cdxj()) @@ -235,14 +240,14 @@ class ACLManager(CollectionsManager): subparsers = parser.add_subparsers(dest='op') subparsers.required = True - def command(name, *args, func=None): + def command(name, *args, **kwargs): op = subparsers.add_parser(name) for arg in args: if arg == 'default_access': op.add_argument(arg, nargs='?', default='allow') else: op.add_argument(arg) - op.set_defaults(acl_func=func) + op.set_defaults(acl_func=kwargs['func']) command('add', 'coll_name', 'url', 'access', func=cls.add_rule) command('remove', 'coll_name', 'url', func=cls.remove_rule) diff --git a/tests/test_acl_manager.py b/tests/test_acl_manager.py new file mode 100644 index 00000000..dc54b76f --- /dev/null +++ b/tests/test_acl_manager.py @@ -0,0 +1,136 @@ +import os + +from .base_config_test import BaseConfigTest, CollsDirMixin, fmod +from pywb.manager.manager import main as wb_manager +from pytest import raises + + +# ============================================================================ +class TestACLManager(CollsDirMixin, BaseConfigTest): + @classmethod + def setup_class(cls): + super(TestACLManager, cls).setup_class('config_test_access.yaml') + + cls.acl_filename = os.path.join(cls.root_dir, 'acl', 'test.aclj') + + @classmethod + def teardown_class(cls): + super(TestACLManager, cls).teardown_class() + try: + os.remove(cls.acl_filename) + except: + pass + + def test_acl_add_err_wrong_access(self): + with raises(SystemExit): + wb_manager(['acl', 'add', self.acl_filename, 'http://example.com/', 'access']) + + def test_acl_add(self): + wb_manager(['acl', 'add', self.acl_filename, 'http://example.com/', 'allow']) + + with open(self.acl_filename, 'rt') as fh: + assert fh.read() == """\ +com,example)/ - {"access": "allow", "url": "http://example.com/"} +""" + + def test_acl_add_surt(self): + wb_manager(['acl', 'add', self.acl_filename, 'com,example,', 'exclude']) + + with open(self.acl_filename, 'rt') as fh: + assert fh.read() == """\ +com,example, - {"access": "exclude", "url": "com,example,"} +com,example)/ - {"access": "allow", "url": "http://example.com/"} +""" + + def test_acl_list(self, capsys): + wb_manager(['acl', 'list', self.acl_filename]) + + out, err = capsys.readouterr() + + assert out == """\ +Rules for %s from %s: + +com,example, - {"access": "exclude", "url": "com,example,"} +com,example)/ - {"access": "allow", "url": "http://example.com/"} + +""" % (self.acl_filename, self.acl_filename) + + + def test_acl_list_err_no_such_file(self): + with raises(SystemExit): + wb_manager(['acl', 'list', self.acl_filename + '2']) + + + def test_acl_match(self, capsys): + wb_manager(['acl', 'match', self.acl_filename, 'http://abc.example.com/foo']) + + out, err = capsys.readouterr() + + assert out == """\ +Matched rule: + + com,example, - {"access": "exclude", "url": "com,example,", "source": "%s", "source-coll": "%s"} + +""" % (self.acl_filename, self.acl_filename) + + def test_remove_acl(self): + wb_manager(['acl', 'remove', self.acl_filename, 'com,example,']) + + with open(self.acl_filename, 'rt') as fh: + assert fh.read() == """\ +com,example)/ - {"access": "allow", "url": "http://example.com/"} +""" + + def test_validate_and_sort_acl(self): + with open(self.acl_filename, 'at') as fh: + fh.write('com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"}\n') + + with open(self.acl_filename, 'rt') as fh: + assert fh.read() == """\ +com,example)/ - {"access": "allow", "url": "http://example.com/"} +com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"} +""" + + wb_manager(['acl', 'validate', self.acl_filename]) + + with open(self.acl_filename, 'rt') as fh: + assert fh.read() == """\ +com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"} +com,example)/ - {"access": "allow", "url": "http://example.com/"} +""" + + def test_importtxt_acl(self, capsys): + name = os.path.join(self.root_dir, 'excludes.txt') + with open(name, 'wt') as exc: + exc.write('http://iana.org/\n') + exc.write('http://example.com/subpath/another\n') + exc.write('http://example.co/foo/\n') + exc.write('http://example.com/\n') + + wb_manager(['acl', 'importtxt', self.acl_filename, name, 'exclude']) + + with open(self.acl_filename, 'rt') as fh: + assert fh.read() == """\ +org,iana)/ - {"access": "exclude", "url": "http://iana.org/"} +com,example)/subpath/another - {"access": "exclude", "url": "http://example.com/subpath/another"} +com,example)/subpath - {"access": "block", "url": "http://example.com/subpath"} +com,example)/ - {"access": "exclude", "url": "http://example.com/"} +co,example)/foo - {"access": "exclude", "url": "http://example.co/foo/"} +""" + + out, err = capsys.readouterr() + + assert 'Added or replaced 4 rules from {0}'.format(name) in out, out + + os.remove(name) + + def test_import_errors(self): + # missing access mode + with raises(SystemExit): + wb_manager(['acl', 'importtxt', self.acl_filename, 'foo']) + + # no such file + with raises(SystemExit): + wb_manager(['acl', 'importtxt', self.acl_filename, 'foo', 'exclude']) + +