From 2c40c9b11286f54443848ad1e6fa62e0ab55292a Mon Sep 17 00:00:00 2001 From: Kenji Nagahashi Date: Thu, 27 Feb 2014 01:58:07 +0000 Subject: [PATCH] refactor cdxserver, add tests focused on wsgi_cdxserver, add docstrings. align cdxops function interfaces - all cdx_iter. move module functions / common ops to class methods support both 0/1 and true/false for boolean parameters move CDXObject to text conversion to wsgi_cdxserver (may have broken embedded cdxserver mode). pass config object as function arg rather than as global var. --- pywb/cdx/cdxobject.py | 7 +- pywb/cdx/cdxops.py | 63 +++++------ pywb/cdx/cdxserver.py | 119 +++++++++++---------- pywb/cdx/wsgi_cdxserver.py | 56 ++++++++-- tests/test_wsgi_cdxserver.py | 197 +++++++++++++++++++++++++++++++++++ 5 files changed, 334 insertions(+), 108 deletions(-) create mode 100644 tests/test_wsgi_cdxserver.py diff --git a/pywb/cdx/cdxobject.py b/pywb/cdx/cdxobject.py index 4eba8025..1b38722a 100644 --- a/pywb/cdx/cdxobject.py +++ b/pywb/cdx/cdxobject.py @@ -71,12 +71,15 @@ class CDXObject(OrderedDict): # force regen on next __str__ call self.cdxline = None + def is_revisit(self): + return (self['mimetype'] == 'warc/revisit' or + self['filename'] == '-') + def __str__(self): if self.cdxline: return self.cdxline - li = itertools.imap(lambda (n, val): val, self.items()) - return ' '.join(li) + return ' '.join(val for n, val in self.iteritems()) #================================================================= diff --git a/pywb/cdx/cdxops.py b/pywb/cdx/cdxops.py index 247f3d18..b24df6bf 100644 --- a/pywb/cdx/cdxops.py +++ b/pywb/cdx/cdxops.py @@ -10,32 +10,38 @@ from collections import deque #================================================================= -def cdx_load(sources, params, perms_checker=None): +def cdx_load(sources, params, filter=True, perms_checker=None): + """ + merge text CDX lines from sources, return an iterator for + filtered and access-checked sequence of CDX objects. + :param sources: iterable for text CDX sources. + :param perms_checker: access check filter object implementing + allow_url_lookup(key, url), allow_capture(cdxobj) and + filter_fields(cdxobj) methods. + """ + cdx_iter = load_cdx_streams(sources, params) + cdx_iter = make_obj_iter(cdx_iter, params) + cdx_iter = filter_cdx(cdx_iter, params) if perms_checker: - cdx_iter = cdx_load_with_perms(sources, params, perms_checker) - else: - cdx_iter = cdx_load_and_filter(sources, params) - - # output raw cdx objects - if params.get('output') == 'raw': - return cdx_iter - - def write_cdx(fields): - for cdx in cdx_iter: - yield cdx_text_out(cdx, fields) + '\n' - - return write_cdx(params.get('fields')) - + cdx_iter = restrict_cdx(cdx_iter, params, perms_checker) + return cdx_iter #================================================================= -def cdx_load_with_perms(sources, params, perms_checker): +def restrict_cdx(cdx_iter, params, perms_checker): + """ + filter out those cdx records that user doesn't have access to, + by consulting :param perms_checker:. + :param cdx_iter: cdx record source iterable + :param params: request parameters (dict) + :param perms_checker: object implementing permission checker + """ if not perms_checker.allow_url_lookup(params['key'], params['url']): if params.get('matchType', 'exact') == 'exact': raise AccessException('Excluded') - cdx_iter = cdx_load_and_filter(sources, params) - for cdx in cdx_iter: + # TODO: we could let filter_fields handle this case by accepting + # None as a return value. if not perms_checker.allow_capture(cdx): continue @@ -43,21 +49,8 @@ def cdx_load_with_perms(sources, params, perms_checker): yield cdx - #================================================================= -def cdx_text_out(cdx, fields): - if not fields: - return str(cdx) - else: - return ' '.join(map(lambda x: cdx[x], fields.split(','))) - - -#================================================================= -def cdx_load_and_filter(sources, params): - cdx_iter = load_cdx_streams(sources, params) - - cdx_iter = make_obj_iter(cdx_iter, params) - +def filter_cdx(cdx_iter, params): if params.get('proxyAll'): return cdx_iter @@ -110,7 +103,7 @@ def make_obj_iter(text_iter, params): else: cls = CDXObject - return itertools.imap(lambda line: cls(line), text_iter) + return (cls(line) for line in text_iter) #================================================================= @@ -242,8 +235,8 @@ def cdx_resolve_revisits(cdx_iter): originals = {} for cdx in cdx_iter: - is_revisit = ((cdx['mimetype'] == 'warc/revisit') or - (cdx['filename'] == '-')) + + is_revisit = cdx.is_revisit() digest = cdx['digest'] diff --git a/pywb/cdx/cdxserver.py b/pywb/cdx/cdxserver.py index 1a68f7e4..b102aff1 100644 --- a/pywb/cdx/cdxserver.py +++ b/pywb/cdx/cdxserver.py @@ -84,7 +84,10 @@ class CDXServer(BaseCDXServer): def __init__(self, paths, **kwargs): super(CDXServer, self).__init__(**kwargs) - self.sources = create_cdx_sources(paths, kwargs.get('config')) + # TODO: we could save config in member, so that other + # methods can use it. it's bad for add_cdx_source to take + # config argument. + self._create_cdx_sources(paths, kwargs.get('config')) def load_cdx(self, **params): # if key not set, assume 'url' is set and needs canonicalization @@ -105,9 +108,62 @@ class CDXServer(BaseCDXServer): params['end_key'] = end_key cdx_iter = cdx_load(self.sources, params, self.perms_checker) - return self._check_cdx_iter(cdx_iter, params) + def _create_cdx_sources(self, paths, config): + """ + build CDXSource instances for each of path in :param paths:. + :param paths: list of sources or single source. + each source may be either string or CDXSource instance. value + of any other types will be silently ignored. + :param config: config object passed to :method:`add_cdx_source`. + """ + self.sources = [] + + if paths is not None: + if not isinstance(paths, (list, tuple)): + paths = [paths] + + for path in paths: + self.add_cdx_source(path, config) + + if len(self.sources) == 0: + logging.warn('No CDX Sources configured from paths=%s', paths) + + def _add_cdx_source(self, source): + if source is None: return + logging.debug('Adding CDX Source: %s', source) + self.sources.append(source) + + def add_cdx_source(self, source, config): + if source is None: return + if isinstance(source, CDXSource): + self._add_cdx_source(source) + elif isinstance(source, str): + if os.path.isdir(source): + for fn in os.listdir(source): + self._add_cdx_source(self._create_cdx_source( + os.path.join(source, fn), config)) + else: + self._add_cdx_source(self._create_cdx_source( + source, config)) + + def _create_cdx_source(self, filename, config): + if is_http(filename): + return RemoteCDXSource(filename) + + if filename.startswith('redis://'): + return RedisCDXSource(filename, config) + + if filename.endswith('.cdx'): + return CDXFile(filename) + + if filename.endswith('.summary'): + return ZipNumCluster(filename, config) + + logging.warn('skipping unrecognized URI:%s', filename) + return None + def __str__(self): return 'CDX server serving from ' + str(self.sources) @@ -131,12 +187,7 @@ class RemoteCDXServer(BaseCDXServer): raise Exception('Invalid remote cdx source: ' + str(source)) def load_cdx(self, **params): - remote_iter = self.source.load_cdx(params) - - # if need raw, convert to raw format here - if params.get('output') == 'raw': - remote_iter = (CDXObject(cdx) for cdx in remote_iter) - + remote_iter = cdx_load((self.sources,), params, filter=False) return self._check_cdx_iter(remote_iter, params) def __str__(self): @@ -169,58 +220,6 @@ def create_cdx_server(config, ds_rules_file=None): ds_rules=ds_rules_file, perms_checker=perms_checker) - -#================================================================= -def create_cdx_sources(paths, config=None): - sources = [] - - if not isinstance(paths, list): - paths = [paths] - - for path in paths: - if isinstance(path, CDXSource): - add_cdx_source(sources, path, config) - elif isinstance(path, str): - if os.path.isdir(path): - for file in os.listdir(path): - add_cdx_source(sources, path + file, config) - else: - add_cdx_source(sources, path, config) - - if len(sources) == 0: - logging.exception('No CDX Sources Found from: ' + str(sources)) - - return sources - - -#================================================================= -def add_cdx_source(sources, source, config): - if not isinstance(source, CDXSource): - source = create_cdx_source(source, config) - if not source: - return - - logging.debug('Adding CDX Source: ' + str(source)) - sources.append(source) - - -#================================================================= -def create_cdx_source(filename, config): - if is_http(filename): - return RemoteCDXSource(filename) - - if filename.startswith('redis://'): - return RedisCDXSource(filename, config) - - if filename.endswith('.cdx'): - return CDXFile(filename) - - if filename.endswith('.summary'): - return ZipNumCluster(filename, config) - - return None - - #================================================================= def extract_params_from_wsgi_env(env): """ utility function to extract params from the query diff --git a/pywb/cdx/wsgi_cdxserver.py b/pywb/cdx/wsgi_cdxserver.py index 609928a0..57945904 100644 --- a/pywb/cdx/wsgi_cdxserver.py +++ b/pywb/cdx/wsgi_cdxserver.py @@ -7,6 +7,8 @@ import os import yaml import pkg_resources +import cdxops + #================================================================= CONFIG_FILE = 'config.yaml' @@ -15,10 +17,19 @@ RULES_FILE = 'rules.yaml' DEFAULT_PORT = 8080 #================================================================= + class CDXQueryRequest(BaseRequest): def __init__(self, environ): super(CDXQueryRequest, self).__init__(environ) + def _get_bool(self, name): + v = self.args.get(name) + if v: + try: + v = int(s) + except ValueError as ex: + v = (s.lower() == 'true') + return bool(v) @property def output(self): return self.args.get('output', 'text') @@ -26,13 +37,22 @@ class CDXQueryRequest(BaseRequest): def filter(self): return self.args.getlist('filter', []) @property + def fields(self): + v = self.args.get('fields') + return v.split(',') if v else None + @property + def reverse(self): + # sort=reverse overrides reverse=0 + return (self._get_bool('reverse') or + self.args.get('sort') == 'reverse') + @property def params(self): return dict(t if t[0] == 'filter' else (t[0], t[1][0]) for t in self.args.iterlists()) class WSGICDXServer(object): - def __init__(self, paths, rules_file): - self.cdxserver = create_cdx_server(paths, rules_file) + def __init__(self, config, rules_file): + self.cdxserver = create_cdx_server(config, rules_file) def __call__(self, environ, start_response): request = CDXQueryRequest(environ) @@ -41,7 +61,7 @@ class WSGICDXServer(object): result = self.cdxserver.load_cdx(**request.params) # TODO: select response type by "output" parameter - response = PlainTextResponse(result) + response = PlainTextResponse(result, request.fields) return response(environ, start_response) except Exception as exc: logging.error('load_cdx failed', exc_info=1) @@ -50,25 +70,38 @@ class WSGICDXServer(object): start_response('400 Error', [('Content-Type', 'text/plain')]) return [str(exc)] +def cdx_text_out(cdx, fields): + if not fields: + return str(cdx) + '\n' + else: + logging.info('cdx fields=%s', cdx.keys()) + # TODO: this will results in an exception if fields contain + # non-existent field name. + return ' '.join(cdx[x] for x in fields) + '\n' + class PlainTextResponse(BaseResponse): - def __init__(self, cdxitr, status=200, content_type='text/plain'): + def __init__(self, cdxitr, fields, status=200, content_type='text/plain'): super(PlainTextResponse, self).__init__( - response=cdxitr, + response=( + cdx_text_out(cdx, fields) + for cdx in cdxitr + ), status=status, content_type=content_type) - + # class JsonResponse(Response): # pass # class MementoResponse(Response): # pass -def create_app(paths=None): +def create_app(config=None): logging.basicConfig(format='%(asctime)s: [%(levelname)s]: %(message)s', level=logging.DEBUG) - if not paths: - paths = config or get_test_dir() + 'cdx/' + if not config: + index_paths = get_test_dir() + 'cdx/' + config = dict(index_paths=index_paths) - return WSGICDXServer(paths, RULES_FILE) + return WSGICDXServer(config, RULES_FILE) if __name__ == "__main__": from optparse import OptionParser @@ -86,7 +119,7 @@ if __name__ == "__main__": if port is None: port = (config and config.get('port')) or DEFAULT_PORT - app = create_app() + app = create_app(config) logging.debug('Starting CDX Server on port %s', port) try: @@ -95,4 +128,5 @@ if __name__ == "__main__": pass logging.debug('Stopping CDX Server') else: + # XXX pass production config application = create_app() diff --git a/tests/test_wsgi_cdxserver.py b/tests/test_wsgi_cdxserver.py new file mode 100644 index 00000000..333b8a8b --- /dev/null +++ b/tests/test_wsgi_cdxserver.py @@ -0,0 +1,197 @@ +import os +import re + +import pytest +from urllib import urlencode + +from werkzeug.test import Client +from werkzeug.wrappers import BaseResponse, Response + +import yaml + +from pywb.cdx.cdxobject import CDXObject +from pywb.cdx.wsgi_cdxserver import create_app + +@pytest.fixture +def testconfig(): + config = yaml.load(open('test_config.yaml')) + assert config + if 'index_paths' not in config: + config['index_paths'] = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '../sample_archive/cdx') + return config + +@pytest.fixture +def client(testconfig): + app = create_app(testconfig) + return Client(app, Response) + +# ================================================================ + +def query(client, url, **params): + params['url'] = url + return client.get('/cdx?' + urlencode(params, doseq=1)) + +# ================================================================ + +def test_exact_url(client): + """ + basic exact match, no filters, etc. + """ + resp = query(client, 'http://www.iana.org/') + + assert resp.status_code == 200 + print resp.data + +def test_prefix_match(client): + """ + prefix match test + """ + resp = query(client, 'http://www.iana.org/', matchType='prefix') + + print resp.data.splitlines() + assert resp.status_code == 200 + + suburls = 0 + for l in resp.data.splitlines(): + fields = l.split(' ') + if len(fields[0]) > len('org,iana)/'): + suburls += 1 + assert suburls > 0 + +def test_filters(client): + """ + filter cdxes by mimetype and filename field, exact match. + """ + resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css', + filter=('mimetype:warc/revisit', 'filename:dupes.warc.gz')) + + assert resp.status_code == 200 + assert resp.mimetype == 'text/plain' + + for l in resp.data.splitlines(): + fields = l.split(' ') + assert fields[0] == 'org,iana)/_css/2013.1/screen.css' + assert fields[3] == 'warc/revisit' + assert fields[10] == 'dupes.warc.gz' + +def test_limit(client): + resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css', + limit='1') + + assert resp.status_code == 200 + assert resp.mimetype == 'text/plain' + + cdxes = resp.data.splitlines() + assert len(cdxes) == 1 + fields = cdxes[0].split(' ') + assert fields[0] == 'org,iana)/_css/2013.1/screen.css' + assert fields[1] == '20140126200625' + assert fields[3] == 'text/css' + + resp = query(client, 'http://www.iana.org/_css/2013.1/screen.css', + limit='1', reverse='1') + + assert resp.status_code == 200 + assert resp.mimetype == 'text/plain' + + cdxes = resp.data.splitlines() + assert len(cdxes) == 1 + fields = cdxes[0].split(' ') + assert fields[0] == 'org,iana)/_css/2013.1/screen.css' + assert fields[1] == '20140127171239' + assert fields[3] == 'warc/revisit' + +def test_fields(client): + """ + retrieve subset of fields with ``fields`` parameter. + """ + resp = query(client, 'http://www.iana.org/_css/2013.1/print.css', + fields='urlkey,timestamp,statuscode') + + assert resp.status_code == 200 + + cdxes = resp.data.splitlines() + + for cdx in cdxes: + fields = cdx.split(' ') + assert len(fields) == 3 + assert fields[0] == 'org,iana)/_css/2013.1/print.css' + assert re.match(r'\d{14}$', fields[1]) + assert re.match(r'\d{3}|-', fields[2]) + +def test_fields_undefined(client): + """ + server shall respond with Bad Request (TODO: with proper explanation), + when ``fields`` parameter contains undefined name(s). + """ + resp = query(client, 'http://www.iana.org/_css/2013.1/print.css', + fields='urlkey,nosuchfield') + + resp.status_code == 400 + +def test_resolveRevisits(client): + """ + with ``resolveRevisits=true``, server adds three fields pointing to + the *original* capture. + """ + resp = query(client, 'http://www.iana.org/_css/2013.1/print.css', + resolveRevisits='true' + ) + assert resp.status_code == 200 + assert resp.mimetype == 'text/plain' + + cdxes = resp.data.splitlines() + originals = {} + for cdx in cdxes: + fields = cdx.split(' ') + assert len(fields) == 14 + (key, ts, url, mt, st, sha, _, _, size, offset, fn, + orig_size, orig_offset, orig_fn) = fields + # orig_* fields are either all '-' or (int, int, filename) + # check if orig_* fields are equals to corresponding fields + # for the original capture. + if orig_size == '-': + assert orig_offset == '-' and orig_fn == '-' + originals[sha] = (int(size), int(offset), fn) + else: + orig = originals.get(sha) + assert orig == (int(orig_size), int(orig_offset), orig_fn) + +def test_resolveRevisits_orig_fields(client): + """ + when resolveRevisits=true, extra three fields are named + ``orig.length``, ``orig.offset`` and ``orig.filename``, respectively. + it is possible to filter fields by these names. + """ + resp = query(client, 'http://www.iana.org/_css/2013.1/print.css', + resolveRevisits='1', + fields='urlkey,orig.length,orig.offset,orig.filename' + ) + assert resp.status_code == 200 + assert resp.mimetype == 'text/plain' + + cdxes = resp.data.splitlines() + for cdx in cdxes: + fields = cdx.split(' ') + assert len(fields) == 4 + key, orig_len, orig_offset, orig_fn = fields + assert (orig_len == '-' and orig_offset == '-' and orig_fn == '-' or + (int(orig_len), int(orig_offset), orig_fn)) + +def test_collapseTime_resolveRevisits_reverse(client): + resp = query(client, 'http://www.iana.org/_css/2013.1/print.css', + collapseTime='11', + resolveRevisits='true', + reverse='true' + ) + + cdxes = [CDXObject(l) for l in resp.data.splitlines()] + + assert len(cdxes) == 3 + + # timestamp is in descending order + for i in range(len(cdxes) - 1): + assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp'] +