diff --git a/test/test_memento_agg.py b/test/test_memento_agg.py index c323da13..017d1871 100644 --- a/test/test_memento_agg.py +++ b/test/test_memento_agg.py @@ -134,13 +134,14 @@ def test_handler_output_cdxj(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) url = 'http://vvork.com/' - res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait')) + headers, res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait')) exp = """\ com,vvork)/ 20141006184357 {"url": "http://www.vvork.com/", "mem_rel": "memento", "memento_url": "http://webenact.rhizome.org/vvork/20141006184357/http://www.vvork.com/", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source": "rhiz"} com,vvork)/ 20131004231540 {"url": "http://vvork.com/", "mem_rel": "last memento", "memento_url": "http://wayback.archive-it.org/all/20131004231540/http://vvork.com/", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source": "ait"} """ + assert(headers['Content-Type'] == 'text/x-cdxj') assert(''.join(res) == exp) assert(errs == {}) @@ -149,13 +150,14 @@ def test_handler_output_json(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) url = 'http://vvork.com/' - res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait', output='json')) + headers, res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait', output='json')) exp = """\ {"urlkey": "com,vvork)/", "timestamp": "20141006184357", "url": "http://www.vvork.com/", "mem_rel": "memento", "memento_url": "http://webenact.rhizome.org/vvork/20141006184357/http://www.vvork.com/", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source": "rhiz"} {"urlkey": "com,vvork)/", "timestamp": "20131004231540", "url": "http://vvork.com/", "mem_rel": "last memento", "memento_url": "http://wayback.archive-it.org/all/20131004231540/http://vvork.com/", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source": "ait"} """ + assert(headers['Content-Type'] == 'application/x-ndjson') assert(''.join(res) == exp) assert(errs == {}) @@ -163,12 +165,13 @@ def test_handler_output_link(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) url = 'http://vvork.com/' - res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait', output='link')) + headers, res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait', output='link')) exp = """\ ; rel="memento"; datetime="Mon, 06 Oct 2014 18:43:57 GMT"; src="rhiz", ; rel="memento"; datetime="Fri, 04 Oct 2013 23:15:40 GMT"; src="ait" """ + assert(headers['Content-Type'] == 'application/link') assert(''.join(res) == exp) assert(errs == {}) @@ -177,7 +180,7 @@ def test_handler_output_link_2(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) url = 'http://iana.org/' - res, errs = handler(dict(url=url, closest='20140126000000', limit=5, output='link')) + headers, res, errs = handler(dict(url=url, closest='20140126000000', limit=5, output='link')) exp = """\ ; rel="memento"; datetime="Sun, 26 Jan 2014 09:37:43 GMT"; src="ia", @@ -186,6 +189,7 @@ def test_handler_output_link_2(): ; rel="memento"; datetime="Wed, 29 Jan 2014 17:52:03 GMT"; src="ia", ; rel="memento"; datetime="Tue, 07 Jan 2014 04:05:52 GMT"; src="ait" """ + assert(headers['Content-Type'] == 'application/link') assert(''.join(res) == exp) exp_errs = {'bl': "NotFoundException('http://www.webarchive.org.uk/wayback/archive/http://iana.org/',)", @@ -199,10 +203,11 @@ def test_handler_output_link_3(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) url = 'http://foo.bar.non-existent' - res, errs = handler(dict(url=url, closest='20140126000000', limit=5, output='link')) + headers, res, errs = handler(dict(url=url, closest='20140126000000', limit=5, output='link')) exp = '' + assert(headers['Content-Type'] == 'application/link') assert(''.join(res) == exp) exp_errs = {'ait': "NotFoundException('http://wayback.archive-it.org/all/http://foo.bar.non-existent',)", @@ -216,12 +221,13 @@ def test_handler_output_text(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) url = 'http://vvork.com/' - res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait', output='text')) + headers, res, errs = handler(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait', output='text')) exp = """\ com,vvork)/ 20141006184357 http://www.vvork.com/ memento http://webenact.rhizome.org/vvork/20141006184357/http://www.vvork.com/ http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/ rhiz com,vvork)/ 20131004231540 http://vvork.com/ last memento http://wayback.archive-it.org/all/20131004231540/http://vvork.com/ http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/ ait """ + assert(headers['Content-Type'] == 'text/plain') assert(''.join(res) == exp) assert(errs == {}) @@ -229,8 +235,9 @@ com,vvork)/ 20131004231540 http://vvork.com/ last memento http://wayback.archive def test_handler_list_sources(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) - res, errs = handler(dict(mode='list_sources')) + headers, res, errs = handler(dict(mode='list_sources')) + assert(headers == {}) assert(res == {'sources': {'bl': 'memento', 'ait': 'memento', 'ia': 'memento', diff --git a/webagg/aggregator.py b/webagg/aggregator.py index 544ffd55..2810d3d0 100644 --- a/webagg/aggregator.py +++ b/webagg/aggregator.py @@ -17,6 +17,9 @@ from itertools import chain from webagg.indexsource import FileIndexSource from pywb.utils.wbexception import NotFoundException, WbException + +from webagg.utils import ParamFormatter, res_template + import six import glob @@ -28,43 +31,19 @@ class BaseAggregator(object): params['closest'] = timestamp_now() query = CDXQuery(params) - self._set_src_params(params) cdx_iter, errs = self.load_index(query.params) cdx_iter = process_cdx(cdx_iter, query) return cdx_iter, dict(errs) - def _set_src_params(self, params): - src_params = {} - for param, value in six.iteritems(params): - if not param.startswith('param.'): - continue - - parts = param.split('.', 3)[1:] - - if len(parts) == 2: - src = parts[0] - name = parts[1] - else: - src = '' - name = parts[0] - - if not src in src_params: - src_params[src] = {} - - src_params[src][name] = value - - params['_all_src_params'] = src_params - def load_child_source_list(self, name, source, params): res = self.load_child_source(name, source, params) return list(res[0]), res[1] def load_child_source(self, name, source, params): try: - _src_params = params['_all_src_params'].get(name) - params['_src_params'] = _src_params + params['_formatter'] = ParamFormatter(params, name) res = source.load_index(params) if isinstance(res, tuple): cdx_iter, err_list = res @@ -277,18 +256,7 @@ class BaseDirectoryIndexSource(BaseAggregator): self.base_dir = base_dir def _iter_sources(self, params): - self._set_src_params(params) - # see if specific params (when part of another agg) - src_params = params.get('_src_params') - if not src_params: - # try default param. settings - src_params = params.get('_all_src_params', {}).get('') - - if src_params: - the_dir = self.base_dir.format(**src_params) - else: - the_dir = self.base_dir - + the_dir = res_template(self.base_dir, params) the_dir = os.path.join(self.base_prefix, the_dir) try: sources = list(self._load_files(the_dir)) diff --git a/webagg/app.py b/webagg/app.py index baa61a35..5a9bae15 100644 --- a/webagg/app.py +++ b/webagg/app.py @@ -1,3 +1,5 @@ +from webagg.liverec import request as remote_request + from webagg.inputrequest import DirectWSGIInputRequest, POSTInputRequest from bottle import route, request, response, default_app, abort import bottle @@ -7,6 +9,42 @@ import json JSON_CT = 'application/json; charset=utf-8' + +#============================================================================= +route_dict = {} + + +#============================================================================= +def add_route(path, handler): + @route([path, path + '/'], 'ANY') + @wrap_error + def direct_input_request(mode=''): + params = dict(request.query) + params['mode'] = mode + params['_input_req'] = DirectWSGIInputRequest(request.environ) + return handler(params) + + @route([path + '/postreq', path + '//postreq'], 'POST') + @wrap_error + def post_fullrequest(mode=''): + params = dict(request.query) + params['mode'] = mode + params['_input_req'] = POSTInputRequest(request.environ) + return handler(params) + + global route_dict + handler_dict = handler.get_supported_modes() + route_dict[path] = handler_dict + route_dict[path + '/postreq'] = handler_dict + + +#============================================================================= +@route('/') +def list_routes(): + return route_dict + + +#============================================================================= def err_handler(exc): response.status = exc.status_code response.content_type = JSON_CT @@ -15,10 +53,15 @@ def err_handler(exc): return err_msg +#============================================================================= def wrap_error(func): def wrap_func(*args, **kwargs): try: - res, errs = func(*args, **kwargs) + out_headers, res, errs = func(*args, **kwargs) + + if out_headers: + for n, v in out_headers.items(): + response.headers[n] = v if res: if errs: @@ -53,36 +96,7 @@ def wrap_error(func): return wrap_func -route_dict = {} - -def add_route(path, handler): - @route([path, path + '/'], 'ANY') - @wrap_error - def direct_input_request(mode=''): - params = dict(request.query) - params['mode'] = mode - params['_input_req'] = DirectWSGIInputRequest(request.environ) - return handler(params) - - @route([path + '/postreq', path + '//postreq'], 'POST') - @wrap_error - def post_fullrequest(mode=''): - params = dict(request.query) - params['mode'] = mode - params['_input_req'] = POSTInputRequest(request.environ) - return handler(params) - - global route_dict - handler_dict = handler.get_supported_modes() - route_dict[path] = handler_dict - route_dict[path + '/postreq'] = handler_dict - - -@route('/') -def list_routes(): - return route_dict - - +#============================================================================= application = default_app() application.default_error_handler = err_handler diff --git a/webagg/handlers.py b/webagg/handlers.py index 6f05405a..da2ed837 100644 --- a/webagg/handlers.py +++ b/webagg/handlers.py @@ -2,25 +2,24 @@ from webagg.responseloader import WARCPathLoader, LiveWebLoader from webagg.utils import MementoUtils from pywb.utils.wbexception import BadRequestException, WbException from pywb.utils.wbexception import NotFoundException -from bottle import response #============================================================================= def to_cdxj(cdx_iter, fields): - response.headers['Content-Type'] = 'text/x-cdxj' - return [cdx.to_cdxj(fields) for cdx in cdx_iter] + content_type = 'text/x-cdxj' + return content_type, (cdx.to_cdxj(fields) for cdx in cdx_iter) def to_json(cdx_iter, fields): - response.headers['Content-Type'] = 'application/x-ndjson' - return [cdx.to_json(fields) for cdx in cdx_iter] + content_type = 'application/x-ndjson' + return content_type, (cdx.to_json(fields) for cdx in cdx_iter) def to_text(cdx_iter, fields): - response.headers['Content-Type'] = 'text/plain' - return [cdx.to_text(fields) for cdx in cdx_iter] + content_type = 'text/plain' + return content_type, (cdx.to_text(fields) for cdx in cdx_iter) def to_link(cdx_iter, fields): - response.headers['Content-Type'] = 'application/link' - return MementoUtils.make_timemap(cdx_iter) + content_type = 'application/link' + return content_type, MementoUtils.make_timemap(cdx_iter) #============================================================================= @@ -56,10 +55,10 @@ class IndexHandler(object): def __call__(self, params): mode = params.get('mode', 'index') if mode == 'list_sources': - return self.index_source.get_source_list(params), {} + return {}, self.index_source.get_source_list(params), {} if mode != 'index': - return self.get_supported_modes(), {} + return {}, self.get_supported_modes(), {} output = params.get('output', self.DEF_OUTPUT) fields = params.get('fields') @@ -67,14 +66,15 @@ class IndexHandler(object): handler = self.OUTPUTS.get(output) if not handler: errs = dict(last_exc=BadRequestException('output={0} not supported'.format(output))) - return None, errs + return None, None, errs cdx_iter, errs = self._load_index_source(params) if not cdx_iter: - return None, errs + return None, None, errs - res = handler(cdx_iter, fields) - return res, errs + content_type, res = handler(cdx_iter, fields) + out_headers = {'Content-Type': content_type} + return out_headers, res, errs #============================================================================= @@ -94,16 +94,16 @@ class ResourceHandler(IndexHandler): cdx_iter, errs = self._load_index_source(params) if not cdx_iter: - return None, errs + return None, None, errs last_exc = None for cdx in cdx_iter: for loader in self.resource_loaders: try: - resp = loader(cdx, params) + out_headers, resp = loader(cdx, params) if resp is not None: - return resp, errs + return out_headers, resp, errs except WbException as e: last_exc = e errs[str(loader)] = repr(e) @@ -111,7 +111,7 @@ class ResourceHandler(IndexHandler): if last_exc: errs['last_exc'] = last_exc - return None, errs + return None, None, errs #============================================================================= @@ -137,11 +137,11 @@ class HandlerSeq(object): def __call__(self, params): all_errs = {} for handler in self.handlers: - res, errs = handler(params) + out_headers, res, errs = handler(params) all_errs.update(errs) if res is not None: - return res, all_errs + return out_headers, res, all_errs - return None, all_errs + return None, None, all_errs diff --git a/webagg/indexsource.py b/webagg/indexsource.py index 269df379..1637bc81 100644 --- a/webagg/indexsource.py +++ b/webagg/indexsource.py @@ -11,6 +11,7 @@ from pywb.cdx.query import CDXQuery from webagg.liverec import patched_requests as requests +from webagg.utils import ParamFormatter, res_template from webagg.utils import MementoUtils @@ -22,15 +23,6 @@ class BaseIndexSource(object): def load_index(self, params): #pragma: no cover raise NotImplemented() - @staticmethod - def res_template(template, params): - src_params = params.get('_src_params') - if not src_params: - res = template.format(url=params['url']) - else: - res = template.format(url=params['url'], **src_params) - return res - #============================================================================= class FileIndexSource(BaseIndexSource): @@ -38,7 +30,7 @@ class FileIndexSource(BaseIndexSource): self.filename_template = filename def load_index(self, params): - filename = self.res_template(self.filename_template, params) + filename = res_template(self.filename_template, params) try: fh = open(filename, 'rb') @@ -64,7 +56,7 @@ class RemoteIndexSource(BaseIndexSource): self.replay_url = replay_url def load_index(self, params): - api_url = self.res_template(self.api_url_template, params) + api_url = res_template(self.api_url_template, params) r = requests.get(api_url, timeout=params.get('_timeout')) if r.status_code >= 400: raise NotFoundException(api_url) @@ -73,7 +65,9 @@ class RemoteIndexSource(BaseIndexSource): def do_load(lines): for line in lines: cdx = CDXObject(line) - cdx['load_url'] = self.replay_url.format(timestamp=cdx['timestamp'], url=cdx['url']) + cdx['load_url'] = self.replay_url.format( + timestamp=cdx['timestamp'], + url=cdx['url']) yield cdx return do_load(lines) @@ -114,7 +108,7 @@ class RedisIndexSource(BaseIndexSource): self.redis = redis.StrictRedis.from_url(redis_url) def load_index(self, params): - z_key = self.res_template(self.redis_key_template, params) + z_key = res_template(self.redis_key_template, params) index_list = self.redis.zrangebylex(z_key, b'[' + params['key'], b'(' + params['end_key']) @@ -173,7 +167,7 @@ class MementoIndexSource(BaseIndexSource): yield cdx def get_timegate_links(self, params, closest): - url = self.res_template(self.timegate_url, params) + url = res_template(self.timegate_url, params) accept_dt = timestamp_to_http_date(closest) res = requests.head(url, headers={'Accept-Datetime': accept_dt}) if res.status_code >= 400: @@ -182,7 +176,7 @@ class MementoIndexSource(BaseIndexSource): return res.headers.get('Link') def get_timemap_links(self, params): - url = self.res_template(self.timemap_url, params) + url = res_template(self.timemap_url, params) res = requests.get(url, timeout=params.get('_timeout')) if res.status_code >= 400: raise NotFoundException(url) diff --git a/webagg/responseloader.py b/webagg/responseloader.py index 96e64067..b0f2ba5b 100644 --- a/webagg/responseloader.py +++ b/webagg/responseloader.py @@ -9,7 +9,6 @@ from pywb.utils.wbexception import LiveResourceException from pywb.warc.resolvingloader import ResolvingLoader from io import BytesIO -from bottle import response import uuid import six @@ -52,19 +51,19 @@ class StreamIter(six.Iterator): #============================================================================= class BaseLoader(object): def __call__(self, cdx, params): - res = self._load_resource(cdx, params) + out_headers, res = self._load_resource(cdx, params) if not res: - return res + return None, None - response.headers['WARC-Coll'] = cdx.get('source', '') + out_headers['WARC-Coll'] = cdx.get('source', '') - response.headers['Link'] = MementoUtils.make_link( - response.headers['WARC-Target-URI'], + out_headers['Link'] = MementoUtils.make_link( + out_headers['WARC-Target-URI'], 'original') - memento_dt = iso_date_to_datetime(response.headers['WARC-Date']) - response.headers['Memento-Datetime'] = datetime_to_http_date(memento_dt) - return res + memento_dt = iso_date_to_datetime(out_headers['WARC-Date']) + out_headers['Memento-Datetime'] = datetime_to_http_date(memento_dt) + return out_headers, res def _load_resource(self, cdx, params): #pragma: no cover raise NotImplemented() @@ -91,8 +90,8 @@ class WARCPathLoader(BaseLoader): for path in self.paths: def check(filename, cdx): try: - if hasattr(cdx, '_src_params') and cdx._src_params: - full_path = path.format(**cdx._src_params) + if hasattr(cdx, '_formatter') and cdx._formatter: + full_path = cdx._formatter.format(path) else: full_path = path full_path += filename @@ -104,9 +103,9 @@ class WARCPathLoader(BaseLoader): def _load_resource(self, cdx, params): if not cdx.get('filename') or cdx.get('offset') is None: - return None + return None, None - cdx._src_params = params.get('_src_params') + cdx._formatter = params.get('_formatter') failed_files = [] headers, payload = (self.resolve_loader. load_headers_and_payload(cdx, @@ -114,18 +113,19 @@ class WARCPathLoader(BaseLoader): self.cdx_index_source)) record = payload + out_headers = {} for n, v in record.rec_headers.headers: - response.headers[n] = v + out_headers[n] = v if headers != payload: - response.headers['WARC-Target-URI'] = headers.rec_headers.get_header('WARC-Target-URI') - response.headers['WARC-Date'] = headers.rec_headers.get_header('WARC-Date') - response.headers['WARC-Refers-To-Target-URI'] = payload.rec_headers.get_header('WARC-Target-URI') - response.headers['WARC-Refers-To-Date'] = payload.rec_headers.get_header('WARC-Date') + out_headers['WARC-Target-URI'] = headers.rec_headers.get_header('WARC-Target-URI') + out_headers['WARC-Date'] = headers.rec_headers.get_header('WARC-Date') + out_headers['WARC-Refers-To-Target-URI'] = payload.rec_headers.get_header('WARC-Target-URI') + out_headers['WARC-Refers-To-Date'] = payload.rec_headers.get_header('WARC-Date') headers.stream.close() - return StreamIter(record.stream) + return out_headers, StreamIter(record.stream) def __str__(self): return 'WARCPathLoader' @@ -137,6 +137,7 @@ class HeaderRecorder(BaseRecorder): self.buff = BytesIO() self.skip_list = skip_list self.skipped = [] + self.target_ip = None def write_response_header_line(self, line): if self.accept_header(line): @@ -152,6 +153,11 @@ class HeaderRecorder(BaseRecorder): return True + def finish_request(self, socket): + ip = socket.getpeername() + if ip: + self.target_ip = ip[0] + #============================================================================= class LiveWebLoader(BaseLoader): @@ -163,7 +169,7 @@ class LiveWebLoader(BaseLoader): def _load_resource(self, cdx, params): load_url = cdx.get('load_url') if not load_url: - return None + return None, None recorder = HeaderRecorder(self.SKIP_HEADERS) @@ -200,30 +206,33 @@ class LiveWebLoader(BaseLoader): resp_headers = recorder.get_header() - response.headers['Content-Type'] = 'application/http; msgtype=response' + out_headers = {} + out_headers['Content-Type'] = 'application/http; msgtype=response' - #response.headers['WARC-Type'] = 'response' - #response.headers['WARC-Record-ID'] = self._make_warc_id() - response.headers['WARC-Target-URI'] = cdx['url'] - response.headers['WARC-Date'] = self._make_date(dt) + out_headers['WARC-Type'] = 'response' + out_headers['WARC-Record-ID'] = self._make_warc_id() + out_headers['WARC-Target-URI'] = cdx['url'] + out_headers['WARC-Date'] = self._make_date(dt) + if recorder.target_ip: + out_headers['WARC-IP-Address'] = recorder.target_ip # Try to set content-length, if it is available and valid try: content_len = int(upstream_res.headers.get('content-length', 0)) if content_len > 0: content_len += len(resp_headers) - response.headers['Content-Length'] = content_len + out_headers['Content-Length'] = content_len except (KeyError, TypeError): pass - return StreamIter(upstream_res.raw, header=resp_headers) + return out_headers, StreamIter(upstream_res.raw, header=resp_headers) @staticmethod def _make_date(dt): return dt.strftime('%Y-%m-%dT%H:%M:%SZ') @staticmethod - def _make_warc_id(id_=None): #pragma: no cover + def _make_warc_id(id_=None): if not id_: id_ = uuid.uuid1() return ''.format(id_) diff --git a/webagg/utils.py b/webagg/utils.py index 126c0f40..ea4cec10 100644 --- a/webagg/utils.py +++ b/webagg/utils.py @@ -1,5 +1,6 @@ import re import six +import string from pywb.utils.timeutils import timestamp_to_http_date from pywb.utils.wbexception import BadRequestException @@ -10,12 +11,12 @@ LINK_URL = re.compile('<(.*)>') LINK_PROP = re.compile('([\w]+)="([^"]+)') -#================================================================= +#============================================================================= class MementoException(BadRequestException): pass -#================================================================= +#============================================================================= class MementoUtils(object): @staticmethod def parse_links(link_header, def_name='timemap'): @@ -102,3 +103,42 @@ class MementoUtils(object): @staticmethod def make_link(url, type): return '<{0}>; rel="{1}"'.format(url, type) + + +#============================================================================= +class ParamFormatter(string.Formatter): + def __init__(self, params, name='', prefix='param.'): + self.params = params + self.prefix = prefix + self.name = name + + def get_value(self, key, args, kwargs): + # First, try the named param 'param.{name}.{key}' + if self.name: + named_key = self.prefix + self.name + '.' + key + value = self.params.get(named_key) + if value is not None: + return value + + # Then, try 'param.{key}' + named_key = self.prefix + key + value = self.params.get(named_key) + if value is not None: + return value + + # default to just '{key}' + value = kwargs.get(key, '') + return value + + +#============================================================================= +def res_template(template, params): + formatter = params.get('_formatter') + if not formatter: + formatter = ParamFormatter(params) + + res = formatter.format(template, url=params['url']) + + return res + +