1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-24 06:59:52 +01:00

Merge branch 'aggregator-improvements' into refactor2

This commit is contained in:
Ilya Kreymer 2017-06-02 21:33:23 -07:00
commit dbc56b864b
10 changed files with 110 additions and 44 deletions

View File

@ -59,16 +59,22 @@ class WriteDupePolicy(object):
# ============================================================================ # ============================================================================
# Skip Record Filters # Skip Record Filters
# ============================================================================ # ============================================================================
class SkipNothingFilter(object): class SkipDefaultFilter(object):
def skip_request(self, path, req_headers): def skip_request(self, path, req_headers):
if req_headers.get('Recorder-Skip') == '1':
return True
return False return False
def skip_response(self, path, req_headers, resp_headers): def skip_response(self, path, req_headers, resp_headers, params):
if resp_headers.get('Recorder-Skip') == '1':
return True
return False return False
# ============================================================================ # ============================================================================
class CollectionFilter(SkipNothingFilter): class CollectionFilter(SkipDefaultFilter):
def __init__(self, accept_colls): def __init__(self, accept_colls):
self.rx_accept_map = {} self.rx_accept_map = {}
@ -79,14 +85,9 @@ class CollectionFilter(SkipNothingFilter):
for name in accept_colls: for name in accept_colls:
self.rx_accept_map[name] = re.compile(accept_colls[name]) self.rx_accept_map[name] = re.compile(accept_colls[name])
def skip_request(self, path, req_headers): def skip_response(self, path, req_headers, resp_headers, params):
if req_headers.get('Recorder-Skip') == '1': if super(CollectionFilter, self).skip_response(path, req_headers,
return True resp_headers, params):
return False
def skip_response(self, path, req_headers, resp_headers):
if resp_headers.get('Recorder-Skip') == '1':
return True return True
path = path[1:].split('/', 1)[0] path = path[1:].split('/', 1)[0]
@ -102,8 +103,12 @@ class CollectionFilter(SkipNothingFilter):
# ============================================================================ # ============================================================================
class SkipRangeRequestFilter(SkipNothingFilter): class SkipRangeRequestFilter(SkipDefaultFilter):
def skip_request(self, path, req_headers): def skip_request(self, path, req_headers):
if super(SkipRangeRequestFilter, self).skip_request(path,
req_headers):
return True
range_ = req_headers.get('Range') range_ = req_headers.get('Range')
if range_ and not range_.lower().startswith('bytes=0-'): if range_ and not range_.lower().startswith('bytes=0-'):
return True return True

View File

@ -138,8 +138,8 @@ class MultiFileWARCWriter(BaseWARCWriter):
to_rec.rec_headers.add_header(name, header) to_rec.rec_headers.add_header(name, header)
def _do_write_req_resp(self, req, resp, params): def _do_write_req_resp(self, req, resp, params):
self._copy_header(resp, req, 'WARC-Recorded-From-URI') self._copy_header(resp, req, 'WARC-Source-URI')
self._copy_header(resp, req, 'WARC-Recorded-On-Date') self._copy_header(resp, req, 'WARC-Creation-Date')
resp = self._check_revisit(resp, params) resp = self._check_revisit(resp, params)
if not resp: if not resp:

View File

@ -225,7 +225,13 @@ class RecorderApp(object):
req_stream.out.close() req_stream.out.close()
return self.send_error(e, start_response) return self.send_error(e, start_response)
start_response('200 OK', list(res.headers.items())) if not skipping:
skipping = any(x.skip_response(path,
req_stream.headers,
res.headers,
params)
for x in self.skip_filters)
if not skipping: if not skipping:
resp_stream = RespWrapper(res.raw, resp_stream = RespWrapper(res.raw,
@ -233,14 +239,15 @@ class RecorderApp(object):
req_stream, req_stream,
params, params,
self.write_queue, self.write_queue,
self.skip_filters,
path, path,
self.create_buff_func) self.create_buff_func)
else: else:
resp_stream = res.raw resp_stream = res.raw
resp_iter = StreamIter(resp_stream) resp_iter = StreamIter(resp_stream)
start_response('200 OK', list(res.headers.items()))
return resp_iter return resp_iter
@ -267,13 +274,12 @@ class Wrapper(object):
#============================================================================== #==============================================================================
class RespWrapper(Wrapper): class RespWrapper(Wrapper):
def __init__(self, stream, headers, req, def __init__(self, stream, headers, req,
params, queue, skip_filters, path, create_func): params, queue, path, create_func):
super(RespWrapper, self).__init__(stream, params, create_func) super(RespWrapper, self).__init__(stream, params, create_func)
self.headers = headers self.headers = headers
self.req = req self.req = req
self.queue = queue self.queue = queue
self.skip_filters = skip_filters
self.path = path self.path = path
def close(self): def close(self):
@ -299,11 +305,6 @@ class RespWrapper(Wrapper):
try: try:
if self.interrupted: if self.interrupted:
skipping = True skipping = True
else:
skipping = any(x.skip_response(self.path,
self.req.headers,
self.headers)
for x in self.skip_filters)
if not skipping: if not skipping:
entry = (self.req.headers, self.req.out, entry = (self.req.headers, self.req.out,

View File

@ -94,8 +94,8 @@ class BaseAggregator(object):
raise NotImplemented() raise NotImplemented()
def get_source_list(self, params): def get_source_list(self, params):
srcs = self._iter_sources(params) sources = self._iter_sources(params)
result = [(name, str(value)) for name, value in srcs] result = [(name, str(value)) for name, value in sources]
result = {'sources': dict(result)} result = {'sources': dict(result)}
return result return result
@ -104,30 +104,51 @@ class BaseAggregator(object):
class BaseSourceListAggregator(BaseAggregator): class BaseSourceListAggregator(BaseAggregator):
def __init__(self, sources, **kwargs): def __init__(self, sources, **kwargs):
self.sources = sources self.sources = sources
self.sources_key = kwargs.get('sources_key', 'sources')
self.invert_sources = kwargs.get('invert_sources', False)
def get_all_sources(self, params): def get_all_sources(self, params):
return self.sources return self.sources
def _iter_sources(self, params): def _iter_sources(self, params):
invert_sources = self.invert_sources
sel_sources = params.get(self.sources_key)
if sel_sources and sel_sources[0] == '!':
invert_sources = True
sel_sources = sel_sources[1:]
if not sel_sources or sel_sources == '*':
if not invert_sources:
return six.iteritems(self.get_all_sources(params))
else:
return iter([])
if not invert_sources:
return self.yield_sources(sel_sources, params)
else:
return self.yield_invert_sources(sel_sources, params)
def yield_sources(self, sel_sources, params):
sources = self.get_all_sources(params) sources = self.get_all_sources(params)
srcs_list = params.get('sources') sel_sources = tuple(sel_sources.split(','))
if not srcs_list or srcs_list == '*': for name in sel_sources:
return sources.items() if name in sources:
yield (name, sources[name])
sel_sources = tuple(srcs_list.split(',')) elif ':' in name:
name, param = name.split(':', 1)
def yield_sources(sources, sel_sources, params):
for name in sel_sources:
if name in sources: if name in sources:
params['param.' + name + '.src_coll'] = param
yield (name, sources[name]) yield (name, sources[name])
elif ':' in name: def yield_invert_sources(self, sel_sources, params):
name, param = name.split(':', 1) sources = self.get_all_sources(params)
if name in sources: sel_sources = tuple([src.split(':', 1)[0]
params['param.' + name + '.src_coll'] = param for src in sel_sources.split(',')])
yield (name, sources[name])
return yield_sources(sources, sel_sources, params) for name in six.iterkeys(sources):
if name not in sel_sources:
yield (name, sources[name])
#============================================================================= #=============================================================================

View File

@ -260,6 +260,7 @@ class RedisIndexSource(BaseIndexSource):
key = res_template(member_key, params) key = res_template(member_key, params)
keys = self.redis.smembers(key) keys = self.redis.smembers(key)
params['scan:' + key] = keys
match_templ = match_templ.encode('utf-8') match_templ = match_templ.encode('utf-8')

View File

@ -30,6 +30,10 @@ aggs = {'simple': SimpleAggregator(sources),
'gevent': GeventTimeoutAggregator(sources, timeout=5.0), 'gevent': GeventTimeoutAggregator(sources, timeout=5.0),
} }
aggs_inv = {'simple': SimpleAggregator(sources, invert_sources=True),
'gevent': GeventTimeoutAggregator(sources, invert_sources=True, timeout=5.0),
}
agg_tm = {'gevent': GeventTimeoutAggregator(sources, timeout=0.0)} agg_tm = {'gevent': GeventTimeoutAggregator(sources, timeout=0.0)}
nf = {'notfound': FileIndexSource(to_path('testdata/not-found-x'))} nf = {'notfound': FileIndexSource(to_path('testdata/not-found-x'))}
@ -105,6 +109,30 @@ class TestMemAgg(MementoOverrideTests, BaseTestClass):
assert(errs == {}) assert(errs == {})
@pytest.mark.parametrize("agg", list(aggs.values()), ids=list(aggs.keys()))
@patch('pywb.webagg.indexsource.MementoIndexSource.get_timegate_links', MementoOverrideTests.mock_link_header('agg_test_5'))
def test_mem_agg_index_5(self, agg):
url = 'http://vvork.com/'
res, errs = agg(dict(url=url, closest='20141001', limit=2, sources='!rhiz,ait'))
exp = [{'timestamp': '20141018133107', 'load_url': 'http://web.archive.org/web/20141018133107id_/http://vvork.com/', 'source': 'ia'}]
assert(to_json_list(res) == exp)
assert(errs == {'bl': "NotFoundException('http://www.webarchive.org.uk/wayback/archive/http://vvork.com/',)"})
@pytest.mark.parametrize("agg", list(aggs_inv.values()), ids=list(aggs_inv.keys()))
@patch('pywb.webagg.indexsource.MementoIndexSource.get_timegate_links', MementoOverrideTests.mock_link_header('agg_test_5'))
def test_mem_agg_index_5_inverse_preset(self, agg):
url = 'http://vvork.com/'
res, errs = agg(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait'))
exp = [{'timestamp': '20141018133107', 'load_url': 'http://web.archive.org/web/20141018133107id_/http://vvork.com/', 'source': 'ia'}]
assert(to_json_list(res) == exp)
assert(errs == {'bl': "NotFoundException('http://www.webarchive.org.uk/wayback/archive/http://vvork.com/',)"})
@pytest.mark.parametrize("agg", list(agg_nf.values()), ids=list(agg_nf.keys())) @pytest.mark.parametrize("agg", list(agg_nf.values()), ids=list(agg_nf.keys()))
def test_mem_agg_not_found(self, agg): def test_mem_agg_not_found(self, agg):
url = 'http://vvork.com/' url = 'http://vvork.com/'

View File

@ -48,11 +48,12 @@ class BaseLoader(object):
out_headers['WebAgg-Type'] = 'warc' out_headers['WebAgg-Type'] = 'warc'
out_headers['Content-Type'] = 'application/warc-record' out_headers['Content-Type'] = 'application/warc-record'
out_headers['WebAgg-Cdx'] = to_native_str(cdx.to_cdxj().rstrip())
out_headers['WebAgg-Source-Coll'] = source
if params.get('recorder_skip'): if params.get('recorder_skip'):
out_headers['Recorder-Skip'] = '1' out_headers['Recorder-Skip'] = '1'
cdx['recorder_skip'] = '1'
out_headers['WebAgg-Cdx'] = to_native_str(cdx.to_cdxj().rstrip())
out_headers['WebAgg-Source-Coll'] = source
if not warc_headers: if not warc_headers:
if other_headers: if other_headers:
@ -371,8 +372,8 @@ class LiveWebLoader(BaseLoader):
if not cdx.get('is_live'): if not cdx.get('is_live'):
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
warc_headers['WARC-Recorded-From-URI'] = cdx.get('load_url') warc_headers['WARC-Source-URI'] = cdx.get('load_url')
warc_headers['WARC-Recorded-On-Date'] = datatime_to_iso_date(now) warc_headers['WARC-Creation-Date'] = datetime_to_iso_date(now)
if remote_ip: if remote_ip:
warc_headers['WARC-IP-Address'] = remote_ip warc_headers['WARC-IP-Address'] = remote_ip

View File

@ -5,6 +5,10 @@ import webtest
from io import BytesIO from io import BytesIO
import requests import requests
from pywb.webagg.handlers import DefaultResourceHandler
from pywb.webagg.aggregator import SimpleAggregator
from pywb.webagg.upstreamindexsource import UpstreamMementoIndexSource, UpstreamAggIndexSource
from warcio.recordloader import ArcWarcRecordLoader from warcio.recordloader import ArcWarcRecordLoader
from pywb.warcserver.handlers import DefaultResourceHandler from pywb.warcserver.handlers import DefaultResourceHandler

View File

@ -1,5 +1,5 @@
six six
warcio==1.3 warcio==1.3.3
chardet chardet
requests requests
redis redis

View File

@ -28,6 +28,11 @@ agg_test_4:
'http://webenact.rhizome.org/vvork/{url}': '<http://webenact.rhizome.org/vvork/20141006184357/http://www.vvork.com/>; rel="memento"; datetime="Mon, 06 Oct 2014 18:43:57 GMT", <http://www.vvork.com/>; rel="original", <http://webenact.rhizome.org/vvork/timemap/*/http://www.vvork.com/>; rel="timemap"; type="application/link-format"' 'http://webenact.rhizome.org/vvork/{url}': '<http://webenact.rhizome.org/vvork/20141006184357/http://www.vvork.com/>; rel="memento"; datetime="Mon, 06 Oct 2014 18:43:57 GMT", <http://www.vvork.com/>; rel="original", <http://webenact.rhizome.org/vvork/timemap/*/http://www.vvork.com/>; rel="timemap"; type="application/link-format"'
agg_test_5:
'http://web.archive.org/web/{url}': '<http://vvork.com/>; rel="original", <http://web.archive.org/web/20141018133107/http://www.vvork.com/>; rel="memento"; datetime="Sat, 18 Oct 2014 13:31:07 GMT", <http://web.archive.org/web/timemap/link/http://vvork.com/>; rel="timemap"; type="application/link-format"'
select_mem_1: select_mem_1:
'http://web.archive.org/web/{url}': '<http://vvork.com/>; rel="original", <http://web.archive.org/web/timemap/link/http://vvork.com/>; rel="timemap"; type="application/link-format", <http://web.archive.org/web/20020727091331/http://vvork.com/>; rel="first memento"; datetime="Sat, 27 Jul 2002 09:13:31 GMT", <http://web.archive.org/web/20140806161228/http://vvork.com/>; rel="prev memento"; datetime="Wed, 06 Aug 2014 16:12:28 GMT", <http://web.archive.org/web/20141018133107/http://vvork.com/>; rel="memento"; datetime="Sat, 18 Oct 2014 13:31:07 GMT", <http://web.archive.org/web/20141020161243/http://vvork.com/>; rel="next memento"; datetime="Mon, 20 Oct 2014 16:12:43 GMT", <http://web.archive.org/web/20161027001353/http://vvork.com/>; rel="last memento"; datetime="Thu, 27 Oct 2016 00:13:53 GMT"' 'http://web.archive.org/web/{url}': '<http://vvork.com/>; rel="original", <http://web.archive.org/web/timemap/link/http://vvork.com/>; rel="timemap"; type="application/link-format", <http://web.archive.org/web/20020727091331/http://vvork.com/>; rel="first memento"; datetime="Sat, 27 Jul 2002 09:13:31 GMT", <http://web.archive.org/web/20140806161228/http://vvork.com/>; rel="prev memento"; datetime="Wed, 06 Aug 2014 16:12:28 GMT", <http://web.archive.org/web/20141018133107/http://vvork.com/>; rel="memento"; datetime="Sat, 18 Oct 2014 13:31:07 GMT", <http://web.archive.org/web/20141020161243/http://vvork.com/>; rel="next memento"; datetime="Mon, 20 Oct 2014 16:12:43 GMT", <http://web.archive.org/web/20161027001353/http://vvork.com/>; rel="last memento"; datetime="Thu, 27 Oct 2016 00:13:53 GMT"'