1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

aggregator improvements:

- support for 'WARC-Provenance' header added to response
- aggregator supports source collection: if 'name:coll', coll parsed out and stored in 'param.<name>.src_coll' field,
available for use in remote index, included in provenance
- remoteindexsource: support interpolating '{src_coll}' in api_url and replay_url to allow handling src_coll
- recorder: CollectionFilter supports dict of prefixes to filter regexs, and catch-all '*' prefix
- recorder: provenance written to paired request record
- rename: ProxyIndexSource -> UpstreamIndexSource to avoid confusion with actual proxy
- autoapp: register_source() supports adding source classes at beginning of list
This commit is contained in:
Ilya Kreymer 2017-05-20 02:11:04 -07:00
parent d8f035642b
commit 685804919a
9 changed files with 86 additions and 28 deletions

View File

@ -60,26 +60,39 @@ class WriteDupePolicy(object):
# Skip Record Filters
# ============================================================================
class SkipNothingFilter(object):
def skip_request(self, req_headers):
def skip_request(self, path, req_headers):
return False
def skip_response(self, req_headers, resp_headers):
def skip_response(self, path, req_headers, resp_headers):
return False
# ============================================================================
class CollectionFilter(SkipNothingFilter):
def __init__(self, accept_colls):
self.rx_accept_colls = re.compile(accept_colls)
self.rx_accept_map = {}
def skip_request(self, req_headers):
if isinstance(accept_colls, str):
self.rx_accept_map = {'*': re.compile(accept_colls)}
elif isinstance(accept_colls, dict):
for name in accept_colls:
self.rx_accept_map[name] = re.compile(accept_colls[name])
def skip_request(self, path, req_headers):
if req_headers.get('Recorder-Skip') == '1':
return True
return False
def skip_response(self, req_headers, resp_headers):
if not self.rx_accept_colls.match(resp_headers.get('WebAgg-Source-Coll', '')):
def skip_response(self, path, req_headers, resp_headers):
path = path[1:].split('/', 1)[0]
rx = self.rx_accept_map.get(path)
if not rx:
rx = self.rx_accept_map.get('*')
if rx and not rx.match(resp_headers.get('WebAgg-Source-Coll', '')):
return True
return False
@ -87,7 +100,7 @@ class CollectionFilter(SkipNothingFilter):
# ============================================================================
class SkipRangeRequestFilter(SkipNothingFilter):
def skip_request(self, req_headers):
def skip_request(self, path, req_headers):
range_ = req_headers.get('Range')
if range_ and not range_.lower().startswith('bytes=0-'):
return True

View File

@ -133,6 +133,10 @@ class MultiFileWARCWriter(BaseWARCWriter):
self._do_write_req_resp(None, record, params)
def _do_write_req_resp(self, req, resp, params):
prov = resp.rec_headers.get_header('WARC-Provenance')
if prov:
req.rec_headers.add_header('WARC-Provenance', prov)
resp = self._check_revisit(resp, params)
if not resp:
print('Skipping due to dedup')

View File

@ -186,6 +186,8 @@ class RecorderApp(object):
method = input_req.get_req_method()
path = environ['PATH_INFO']
# write request body as metadata/resource
put_record = params.get('put_record')
if put_record and method in ('PUT', 'POST'):
@ -196,7 +198,7 @@ class RecorderApp(object):
params,
start_response)
skipping = any(x.skip_request(headers) for x in self.skip_filters)
skipping = any(x.skip_request(path, headers) for x in self.skip_filters)
if not skipping:
req_stream = ReqWrapper(input_buff,
@ -232,6 +234,7 @@ class RecorderApp(object):
params,
self.write_queue,
self.skip_filters,
path,
self.create_buff_func)
else:
resp_stream = res.raw
@ -264,13 +267,14 @@ class Wrapper(object):
#==============================================================================
class RespWrapper(Wrapper):
def __init__(self, stream, headers, req,
params, queue, skip_filters, create_func):
params, queue, skip_filters, path, create_func):
super(RespWrapper, self).__init__(stream, params, create_func)
self.headers = headers
self.req = req
self.queue = queue
self.skip_filters = skip_filters
self.path = path
def close(self):
try:
@ -296,7 +300,9 @@ class RespWrapper(Wrapper):
if self.interrupted:
skipping = True
else:
skipping = any(x.skip_response(self.req.headers, self.headers)
skipping = any(x.skip_response(self.path,
self.req.headers,
self.headers)
for x in self.skip_filters)
if not skipping:

View File

@ -42,6 +42,7 @@ class BaseAggregator(object):
def load_child_source(self, name, source, params):
try:
params['_name'] = name
params['_formatter'] = ParamFormatter(params, name)
res = source.load_index(params)
if isinstance(res, tuple):
@ -62,6 +63,10 @@ class BaseAggregator(object):
return cdx
if params.get('nosource') != 'true':
src_coll = params.get('param.' + name + '.src_coll')
if src_coll:
name += ':' + src_coll
cdx_iter = (add_name(cdx, name) for cdx in cdx_iter)
return cdx_iter, err_list
@ -107,12 +112,23 @@ class BaseSourceListAggregator(BaseAggregator):
def _iter_sources(self, params):
sources = self.get_all_sources(params)
srcs_list = params.get('sources')
if not srcs_list:
if not srcs_list or srcs_list == '*':
return sources.items()
sel_sources = tuple(srcs_list.split(','))
return [(name, sources[name]) for name in sources.keys() if name in sel_sources]
def yield_sources(sources, sel_sources, params):
for name in sel_sources:
if name in sources:
yield (name, sources[name])
elif ':' in name:
name, param = name.split(':', 1)
if name in sources:
params['param.' + name + '.src_coll'] = param
yield (name, sources[name])
return yield_sources(sources, sel_sources, params)
#=============================================================================
@ -320,7 +336,7 @@ class BaseRedisMultiKeyIndexSource(BaseAggregator, RedisIndexSource):
return RedisIndexSource(None, self.redis, key)
def __str__(self):
return 'redis'
return 'redis-multikey'
#=============================================================================

View File

@ -215,8 +215,11 @@ def init_index_source(value, source_list=None):
# ============================================================================
def register_source(source_cls):
SOURCE_LIST.append(source_cls)
def register_source(source_cls, end=False):
if not end:
SOURCE_LIST.insert(0, source_cls)
else:
SOURCE_LIST.append(source_cls)
# ============================================================================

View File

@ -117,16 +117,20 @@ class RemoteIndexSource(BaseIndexSource):
continue
cdx = CDXObject(line)
self._set_load_url(cdx)
self._set_load_url(cdx, params)
yield cdx
return do_load(lines)
def _set_load_url(self, cdx):
cdx[self.url_field] = self.replay_url.format(
timestamp=cdx['timestamp'],
url=cdx['url'])
def _set_load_url(self, cdx, params):
source_coll = ''
name = params.get('_name')
if name:
source_coll = params.get('param.' + name + '.src_coll', '')
cdx[self.url_field] = self.replay_url.format(url=cdx['url'],
timestamp=cdx['timestamp'],
src_coll=source_coll)
def __repr__(self):
return '{0}({1}, {2})'.format(self.__class__.__name__,
self.api_url,

View File

@ -41,12 +41,14 @@ class BaseLoader(object):
warc_headers, other_headers, stream = entry
source = self._get_provenance(cdx)
out_headers = {}
out_headers['WebAgg-Type'] = 'warc'
out_headers['WebAgg-Source-Coll'] = quote(cdx.get('source', ''), safe=':/')
out_headers['Content-Type'] = 'application/warc-record'
out_headers['WebAgg-Cdx'] = to_native_str(cdx.to_cdxj().rstrip())
out_headers['WebAgg-Source-Coll'] = source
if not warc_headers:
if other_headers:
@ -60,6 +62,7 @@ class BaseLoader(object):
target_uri = warc_headers.get_header('WARC-Target-URI')
out_headers['WARC-Target-URI'] = target_uri
out_headers['Link'] = MementoUtils.make_link(target_uri, 'original')
memento_dt = iso_date_to_datetime(warc_headers.get_header('WARC-Date'))
@ -88,6 +91,9 @@ class BaseLoader(object):
return out_headers, streamiter
def _get_provenance(self, cdx):
return quote(cdx.get('source', ''), safe=':/')
def _set_content_len(self, content_len_str, headers, existing_len):
# Try to set content-length, if it is available and valid
try:
@ -424,6 +430,10 @@ class LiveWebLoader(BaseLoader):
warc_headers['WARC-Record-ID'] = self._make_warc_id()
warc_headers['WARC-Target-URI'] = cdx['url']
warc_headers['WARC-Date'] = datetime_to_iso_date(dt)
if not cdx.get('is_live'):
warc_headers['WARC-Provenance'] = self._get_provenance(cdx)
if remote_ip:
warc_headers['WARC-IP-Address'] = remote_ip

View File

@ -1,3 +1,5 @@
from gevent import monkey; monkey.patch_all(thread=False)
import webtest
from io import BytesIO
@ -6,7 +8,7 @@ import requests
from pywb.webagg.handlers import DefaultResourceHandler
from pywb.webagg.aggregator import SimpleAggregator
from pywb.webagg.proxyindexsource import ProxyMementoIndexSource, UpstreamAggIndexSource
from pywb.webagg.upstreamindexsource import UpstreamMementoIndexSource, UpstreamAggIndexSource
from warcio.recordloader import ArcWarcRecordLoader
@ -26,7 +28,7 @@ class TestUpstream(LiveServerTests, BaseTestClass):
app.add_route('/upstream_opt',
DefaultResourceHandler(SimpleAggregator(
{'upstream_opt': ProxyMementoIndexSource.upstream_resource(base_url + '/live')})
{'upstream_opt': UpstreamMementoIndexSource.upstream_resource(base_url + '/live')})
)
)

View File

@ -13,14 +13,14 @@ class UpstreamAggIndexSource(RemoteIndexSource):
proxy_url = base_url + '/resource?url={url}&closest={timestamp}'
super(UpstreamAggIndexSource, self).__init__(api_url, proxy_url, 'filename')
def _set_load_url(self, cdx):
super(UpstreamAggIndexSource, self)._set_load_url(cdx)
def _set_load_url(self, cdx, params):
super(UpstreamAggIndexSource, self)._set_load_url(cdx, params)
cdx['offset'] = '0'
cdx.pop('load_url', '')
#=============================================================================
class ProxyMementoIndexSource(BaseIndexSource):
class UpstreamMementoIndexSource(BaseIndexSource):
def __init__(self, proxy_url='{url}'):
self.proxy_url = proxy_url
self.loader = LiveWebLoader()
@ -45,10 +45,10 @@ class ProxyMementoIndexSource(BaseIndexSource):
yield cdx
def __str__(self):
return 'proxy'
return 'upstream'
@staticmethod
def upstream_resource(base_url):
return ProxyMementoIndexSource(base_url + '/resource?url={url}&closest={closest}')
return UpstreamMementoIndexSource(base_url + '/resource?url={url}&closest={closest}')