1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

documented and cleaned up the aclmanager.py

This commit is contained in:
John Berlin 2019-04-03 18:02:25 -04:00
parent ce10d9af7c
commit 1a7fdd0d70
No known key found for this signature in database
GPG Key ID: 6EF5E4B442011B02

View File

@ -1,16 +1,11 @@
import os
import sys
import json
import re
from argparse import ArgumentParser, RawTextHelpFormatter
from collections import OrderedDict
import sys
from pywb.manager.manager import CollectionsManager
from pywb.warcserver.index.cdxobject import CDXObject
from pywb.utils.canonicalize import canonicalize
from pywb.warcserver.access_checker import AccessChecker
from pywb.warcserver.index.cdxobject import CDXObject
# ============================================================================
@ -23,7 +18,8 @@ class ACLManager(CollectionsManager):
def __init__(self, r):
"""
:param r: Parsed result from ArgumentParser
:param argparse.Namespace r: Parsed result from ArgumentParser
:rtype: None
"""
self.rules = []
@ -35,11 +31,14 @@ class ACLManager(CollectionsManager):
super(ACLManager, self).__init__(coll_name, must_exist=False)
self.acl_file = None
def process(self, r):
"""
Process acl command
:param r: Parsed result from ArgumentParser
:return:
:param argparse.Namespace r: Parsed result from ArgumentParser
:rtype: None
"""
# if target exists as a file, use that
@ -71,6 +70,13 @@ class ACLManager(CollectionsManager):
r.acl_func(self, r)
def is_valid_auto_coll(self, coll_name):
"""Returns T/F indicating if the supplied collection name
is a valid collection
:param coll_name: The collection name to check
:return: T/F indicating a valid collection
:rtype: bool
"""
if not self.COLL_RX.match(coll_name):
return False
@ -80,6 +86,12 @@ class ACLManager(CollectionsManager):
return True
def load_acl(self, must_exist=True):
"""Loads the access control list
:param bool must_exist: Does the acl file have to exist
:return: T/F indicating load success
:rtype: bool
"""
try:
with open(self.acl_file, 'rb') as fh:
for line in fh:
@ -98,6 +110,12 @@ class ACLManager(CollectionsManager):
return False
def save_acl(self, r=None):
"""Save the contents of the rules as cdxj entries to
the access control list file
:param argparse.Namespace|None r: Not used
:rtype: None
"""
try:
os.makedirs(os.path.dirname(self.acl_file))
except OSError:
@ -114,6 +132,10 @@ class ACLManager(CollectionsManager):
def to_key(self, url_or_surt, exact_match=False):
""" If 'url_or_surt' already a SURT, use as is
If exact match, add the exact match suffix
:param str url_or_surt: The url or surt to be converted to an acl key
:param bool exact_match: Should the exact match suffix be added to key
:rtype: str
"""
if self.SURT_RX.search(url_or_surt):
result = url_or_surt
@ -126,17 +148,35 @@ class ACLManager(CollectionsManager):
return result
def validate_access(self, access):
"""Returns true if the supplied access value is valid
otherwise the terminates the process
:param str access: The access value to be validated
:return: True if valid
:rtype: bool
"""
if access not in self.VALID_ACCESS:
print('Valid access values are: ' + ', '.join(self.VALID_ACCESS))
sys.exit(1)
return False
return True
def add_rule(self, r):
"""Adds a rule the ACL manager
:param argparse.Namespace r: The argparse namespace representing the rule to be added
:rtype: None
"""
return self._add_rule(r.url, r.access, r.exact_match)
def _add_rule(self, url, access, exact_match=False):
"""Adds an rule to the acl file
:param str url: The URL for the rule
:param str access: The access value for the rule
:param bool exact_match: Is the rule an absolute value
:rtype: None
"""
if not self.validate_access(access):
return
@ -172,11 +212,22 @@ class ACLManager(CollectionsManager):
self.save_acl()
def validate_save(self, r=None):
if self.validate(True):
self.save_acl()
def validate_save(self, r=None, log=False):
"""Validates the acl rules and saves the file
def validate(self, log=False):
:param argparse.Namespace|None r: Not used
:param bool log: Should a report be printed to stdout
:rtype: None
"""
self.validate(log=log, correct=True)
def validate(self, log=False, correct=False):
"""Validates the acl rules returning T/F if the list should be saved
:param bool log: Should the results of validating be logged to stdout
:param bool correct: Should invalid results be corrected and saved
:rtype: None
"""
last_rule = None
out_of_order = False
for rule in self.rules:
@ -189,19 +240,22 @@ class ACLManager(CollectionsManager):
if out_of_order:
if log:
print('Rules out of order, resorting')
self.rules.sort(reverse=True)
return True
else:
if log:
print('Rules in order')
return False
if correct:
self.rules.sort(reverse=True)
self.save_acl()
elif log:
print('Rules in order')
def remove_rule(self, r):
"""Removes a rule from the acl file
:param argparse.Namespace r: Parsed result from ArgumentParser
:rtype: None
"""
i = 0
urlkey = self.to_key(r.url, r.exact_match)
for rule in self.rules:
if urlkey == rule['urlkey']:# and r.timestamp == rule['timestamp']:
if urlkey == rule['urlkey']:
acl = self.rules.pop(i)
print('Removed Rule:')
self.print_rule(acl)
@ -213,6 +267,11 @@ class ACLManager(CollectionsManager):
print('Rule to remove not found!')
def list_rules(self, r):
"""Print the acl rules to the stdout
:param argparse.Namespace|None r: Not used
:rtype: None
"""
print('Rules for {0} from {1}:'.format(self.target, self.acl_file))
print('')
for rule in self.rules:
@ -220,6 +279,11 @@ class ACLManager(CollectionsManager):
print('')
def find_match(self, r):
"""Finds a matching acl rule
:param argparse.Namespace r: Parsed result from ArgumentParser
:rtype: None
"""
access_checker = AccessChecker(self.acl_file, '<default>')
rule = access_checker.find_access_rule(r.url)
@ -234,6 +298,8 @@ class ACLManager(CollectionsManager):
def add_excludes(self, r):
"""
Import old-style excludes, in url-per-line format
:param argparse.Namespace r: Parsed result from ArgumentParser
"""
if not self.validate_access(r.access):
return
@ -253,10 +319,20 @@ class ACLManager(CollectionsManager):
sys.exit(1)
def print_rule(self, rule):
"""Prints the supplied rule to the std out
:param CDXObject rule: The rule to be printed
:rtype: None
"""
print(' ' + rule.to_cdxj())
@classmethod
def init_parser(cls, parser):
"""Initializes an argument parser for acl commands
:param argparse.ArgumentParser parser: The parser to be initialized
:rtype: None
"""
subparsers = parser.add_subparsers(dest='op')
subparsers.required = True