From 685804919af5d70f6aa72d2c0432d09a68b689ed Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 20 May 2017 02:11:04 -0700 Subject: [PATCH] aggregator improvements: - support for 'WARC-Provenance' header added to response - aggregator supports source collection: if 'name:coll', coll parsed out and stored in 'param..src_coll' field, available for use in remote index, included in provenance - remoteindexsource: support interpolating '{src_coll}' in api_url and replay_url to allow handling src_coll - recorder: CollectionFilter supports dict of prefixes to filter regexs, and catch-all '*' prefix - recorder: provenance written to paired request record - rename: ProxyIndexSource -> UpstreamIndexSource to avoid confusion with actual proxy - autoapp: register_source() supports adding source classes at beginning of list --- pywb/recorder/filters.py | 27 ++++++++++++++----- pywb/recorder/multifilewarcwriter.py | 4 +++ pywb/recorder/recorderapp.py | 12 ++++++--- pywb/webagg/aggregator.py | 22 ++++++++++++--- pywb/webagg/autoapp.py | 7 +++-- pywb/webagg/indexsource.py | 14 ++++++---- pywb/webagg/responseloader.py | 12 ++++++++- pywb/webagg/test/test_upstream.py | 6 +++-- ...yindexsource.py => upstreamindexsource.py} | 10 +++---- 9 files changed, 86 insertions(+), 28 deletions(-) rename pywb/webagg/{proxyindexsource.py => upstreamindexsource.py} (85%) diff --git a/pywb/recorder/filters.py b/pywb/recorder/filters.py index b9ccd540..aaedc596 100644 --- a/pywb/recorder/filters.py +++ b/pywb/recorder/filters.py @@ -60,26 +60,39 @@ class WriteDupePolicy(object): # Skip Record Filters # ============================================================================ class SkipNothingFilter(object): - def skip_request(self, req_headers): + def skip_request(self, path, req_headers): return False - def skip_response(self, req_headers, resp_headers): + def skip_response(self, path, req_headers, resp_headers): return False # ============================================================================ class CollectionFilter(SkipNothingFilter): def __init__(self, accept_colls): - self.rx_accept_colls = re.compile(accept_colls) + self.rx_accept_map = {} - def skip_request(self, req_headers): + if isinstance(accept_colls, str): + self.rx_accept_map = {'*': re.compile(accept_colls)} + + elif isinstance(accept_colls, dict): + for name in accept_colls: + self.rx_accept_map[name] = re.compile(accept_colls[name]) + + def skip_request(self, path, req_headers): if req_headers.get('Recorder-Skip') == '1': return True return False - def skip_response(self, req_headers, resp_headers): - if not self.rx_accept_colls.match(resp_headers.get('WebAgg-Source-Coll', '')): + def skip_response(self, path, req_headers, resp_headers): + path = path[1:].split('/', 1)[0] + + rx = self.rx_accept_map.get(path) + if not rx: + rx = self.rx_accept_map.get('*') + + if rx and not rx.match(resp_headers.get('WebAgg-Source-Coll', '')): return True return False @@ -87,7 +100,7 @@ class CollectionFilter(SkipNothingFilter): # ============================================================================ class SkipRangeRequestFilter(SkipNothingFilter): - def skip_request(self, req_headers): + def skip_request(self, path, req_headers): range_ = req_headers.get('Range') if range_ and not range_.lower().startswith('bytes=0-'): return True diff --git a/pywb/recorder/multifilewarcwriter.py b/pywb/recorder/multifilewarcwriter.py index 06f980bb..806cc009 100644 --- a/pywb/recorder/multifilewarcwriter.py +++ b/pywb/recorder/multifilewarcwriter.py @@ -133,6 +133,10 @@ class MultiFileWARCWriter(BaseWARCWriter): self._do_write_req_resp(None, record, params) def _do_write_req_resp(self, req, resp, params): + prov = resp.rec_headers.get_header('WARC-Provenance') + if prov: + req.rec_headers.add_header('WARC-Provenance', prov) + resp = self._check_revisit(resp, params) if not resp: print('Skipping due to dedup') diff --git a/pywb/recorder/recorderapp.py b/pywb/recorder/recorderapp.py index 56d897ef..b1fdecb5 100644 --- a/pywb/recorder/recorderapp.py +++ b/pywb/recorder/recorderapp.py @@ -186,6 +186,8 @@ class RecorderApp(object): method = input_req.get_req_method() + path = environ['PATH_INFO'] + # write request body as metadata/resource put_record = params.get('put_record') if put_record and method in ('PUT', 'POST'): @@ -196,7 +198,7 @@ class RecorderApp(object): params, start_response) - skipping = any(x.skip_request(headers) for x in self.skip_filters) + skipping = any(x.skip_request(path, headers) for x in self.skip_filters) if not skipping: req_stream = ReqWrapper(input_buff, @@ -232,6 +234,7 @@ class RecorderApp(object): params, self.write_queue, self.skip_filters, + path, self.create_buff_func) else: resp_stream = res.raw @@ -264,13 +267,14 @@ class Wrapper(object): #============================================================================== class RespWrapper(Wrapper): def __init__(self, stream, headers, req, - params, queue, skip_filters, create_func): + params, queue, skip_filters, path, create_func): super(RespWrapper, self).__init__(stream, params, create_func) self.headers = headers self.req = req self.queue = queue self.skip_filters = skip_filters + self.path = path def close(self): try: @@ -296,7 +300,9 @@ class RespWrapper(Wrapper): if self.interrupted: skipping = True else: - skipping = any(x.skip_response(self.req.headers, self.headers) + skipping = any(x.skip_response(self.path, + self.req.headers, + self.headers) for x in self.skip_filters) if not skipping: diff --git a/pywb/webagg/aggregator.py b/pywb/webagg/aggregator.py index c5fe9739..656ac4cf 100644 --- a/pywb/webagg/aggregator.py +++ b/pywb/webagg/aggregator.py @@ -42,6 +42,7 @@ class BaseAggregator(object): def load_child_source(self, name, source, params): try: + params['_name'] = name params['_formatter'] = ParamFormatter(params, name) res = source.load_index(params) if isinstance(res, tuple): @@ -62,6 +63,10 @@ class BaseAggregator(object): return cdx if params.get('nosource') != 'true': + src_coll = params.get('param.' + name + '.src_coll') + if src_coll: + name += ':' + src_coll + cdx_iter = (add_name(cdx, name) for cdx in cdx_iter) return cdx_iter, err_list @@ -107,12 +112,23 @@ class BaseSourceListAggregator(BaseAggregator): def _iter_sources(self, params): sources = self.get_all_sources(params) srcs_list = params.get('sources') - if not srcs_list: + if not srcs_list or srcs_list == '*': return sources.items() sel_sources = tuple(srcs_list.split(',')) - return [(name, sources[name]) for name in sources.keys() if name in sel_sources] + def yield_sources(sources, sel_sources, params): + for name in sel_sources: + if name in sources: + yield (name, sources[name]) + + elif ':' in name: + name, param = name.split(':', 1) + if name in sources: + params['param.' + name + '.src_coll'] = param + yield (name, sources[name]) + + return yield_sources(sources, sel_sources, params) #============================================================================= @@ -320,7 +336,7 @@ class BaseRedisMultiKeyIndexSource(BaseAggregator, RedisIndexSource): return RedisIndexSource(None, self.redis, key) def __str__(self): - return 'redis' + return 'redis-multikey' #============================================================================= diff --git a/pywb/webagg/autoapp.py b/pywb/webagg/autoapp.py index 9388bd0a..b9106469 100644 --- a/pywb/webagg/autoapp.py +++ b/pywb/webagg/autoapp.py @@ -215,8 +215,11 @@ def init_index_source(value, source_list=None): # ============================================================================ -def register_source(source_cls): - SOURCE_LIST.append(source_cls) +def register_source(source_cls, end=False): + if not end: + SOURCE_LIST.insert(0, source_cls) + else: + SOURCE_LIST.append(source_cls) # ============================================================================ diff --git a/pywb/webagg/indexsource.py b/pywb/webagg/indexsource.py index 2ddb63bc..c2a7f673 100644 --- a/pywb/webagg/indexsource.py +++ b/pywb/webagg/indexsource.py @@ -117,16 +117,20 @@ class RemoteIndexSource(BaseIndexSource): continue cdx = CDXObject(line) - self._set_load_url(cdx) + self._set_load_url(cdx, params) yield cdx return do_load(lines) - def _set_load_url(self, cdx): - cdx[self.url_field] = self.replay_url.format( - timestamp=cdx['timestamp'], - url=cdx['url']) + def _set_load_url(self, cdx, params): + source_coll = '' + name = params.get('_name') + if name: + source_coll = params.get('param.' + name + '.src_coll', '') + cdx[self.url_field] = self.replay_url.format(url=cdx['url'], + timestamp=cdx['timestamp'], + src_coll=source_coll) def __repr__(self): return '{0}({1}, {2})'.format(self.__class__.__name__, self.api_url, diff --git a/pywb/webagg/responseloader.py b/pywb/webagg/responseloader.py index 4a41392c..c07de57f 100644 --- a/pywb/webagg/responseloader.py +++ b/pywb/webagg/responseloader.py @@ -41,12 +41,14 @@ class BaseLoader(object): warc_headers, other_headers, stream = entry + source = self._get_provenance(cdx) + out_headers = {} out_headers['WebAgg-Type'] = 'warc' - out_headers['WebAgg-Source-Coll'] = quote(cdx.get('source', ''), safe=':/') out_headers['Content-Type'] = 'application/warc-record' out_headers['WebAgg-Cdx'] = to_native_str(cdx.to_cdxj().rstrip()) + out_headers['WebAgg-Source-Coll'] = source if not warc_headers: if other_headers: @@ -60,6 +62,7 @@ class BaseLoader(object): target_uri = warc_headers.get_header('WARC-Target-URI') out_headers['WARC-Target-URI'] = target_uri + out_headers['Link'] = MementoUtils.make_link(target_uri, 'original') memento_dt = iso_date_to_datetime(warc_headers.get_header('WARC-Date')) @@ -88,6 +91,9 @@ class BaseLoader(object): return out_headers, streamiter + def _get_provenance(self, cdx): + return quote(cdx.get('source', ''), safe=':/') + def _set_content_len(self, content_len_str, headers, existing_len): # Try to set content-length, if it is available and valid try: @@ -424,6 +430,10 @@ class LiveWebLoader(BaseLoader): warc_headers['WARC-Record-ID'] = self._make_warc_id() warc_headers['WARC-Target-URI'] = cdx['url'] warc_headers['WARC-Date'] = datetime_to_iso_date(dt) + + if not cdx.get('is_live'): + warc_headers['WARC-Provenance'] = self._get_provenance(cdx) + if remote_ip: warc_headers['WARC-IP-Address'] = remote_ip diff --git a/pywb/webagg/test/test_upstream.py b/pywb/webagg/test/test_upstream.py index 037a9ea8..30367ba9 100644 --- a/pywb/webagg/test/test_upstream.py +++ b/pywb/webagg/test/test_upstream.py @@ -1,3 +1,5 @@ +from gevent import monkey; monkey.patch_all(thread=False) + import webtest from io import BytesIO @@ -6,7 +8,7 @@ import requests from pywb.webagg.handlers import DefaultResourceHandler from pywb.webagg.aggregator import SimpleAggregator -from pywb.webagg.proxyindexsource import ProxyMementoIndexSource, UpstreamAggIndexSource +from pywb.webagg.upstreamindexsource import UpstreamMementoIndexSource, UpstreamAggIndexSource from warcio.recordloader import ArcWarcRecordLoader @@ -26,7 +28,7 @@ class TestUpstream(LiveServerTests, BaseTestClass): app.add_route('/upstream_opt', DefaultResourceHandler(SimpleAggregator( - {'upstream_opt': ProxyMementoIndexSource.upstream_resource(base_url + '/live')}) + {'upstream_opt': UpstreamMementoIndexSource.upstream_resource(base_url + '/live')}) ) ) diff --git a/pywb/webagg/proxyindexsource.py b/pywb/webagg/upstreamindexsource.py similarity index 85% rename from pywb/webagg/proxyindexsource.py rename to pywb/webagg/upstreamindexsource.py index 60b0f195..9b0ba05d 100644 --- a/pywb/webagg/proxyindexsource.py +++ b/pywb/webagg/upstreamindexsource.py @@ -13,14 +13,14 @@ class UpstreamAggIndexSource(RemoteIndexSource): proxy_url = base_url + '/resource?url={url}&closest={timestamp}' super(UpstreamAggIndexSource, self).__init__(api_url, proxy_url, 'filename') - def _set_load_url(self, cdx): - super(UpstreamAggIndexSource, self)._set_load_url(cdx) + def _set_load_url(self, cdx, params): + super(UpstreamAggIndexSource, self)._set_load_url(cdx, params) cdx['offset'] = '0' cdx.pop('load_url', '') #============================================================================= -class ProxyMementoIndexSource(BaseIndexSource): +class UpstreamMementoIndexSource(BaseIndexSource): def __init__(self, proxy_url='{url}'): self.proxy_url = proxy_url self.loader = LiveWebLoader() @@ -45,10 +45,10 @@ class ProxyMementoIndexSource(BaseIndexSource): yield cdx def __str__(self): - return 'proxy' + return 'upstream' @staticmethod def upstream_resource(base_url): - return ProxyMementoIndexSource(base_url + '/resource?url={url}&closest={closest}') + return UpstreamMementoIndexSource(base_url + '/resource?url={url}&closest={closest}')