mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 08:04:49 +01:00
drop process/thread mixin support (doesn't work as well on py2) could readd processes only if need arises, but for now focusing on gevent
rename header Source-Coll -> WebAgg-Source-Coll
This commit is contained in:
parent
348fb133e0
commit
3477cb0bb5
@ -132,7 +132,7 @@ class TestResAgg(object):
|
||||
headers = {'foo': 'bar'}
|
||||
resp = self.testapp.get('/live/resource?url=http://httpbin.org/get?foo=bar', headers=headers)
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'live'
|
||||
|
||||
self._check_uri_date(resp, 'http://httpbin.org/get?foo=bar', True)
|
||||
|
||||
@ -148,7 +148,7 @@ class TestResAgg(object):
|
||||
resp = self.testapp.post('/live/resource?url=http://httpbin.org/post',
|
||||
OrderedDict([('foo', 'bar')]))
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'live'
|
||||
|
||||
self._check_uri_date(resp, 'http://httpbin.org/post', True)
|
||||
|
||||
@ -163,7 +163,7 @@ class TestResAgg(object):
|
||||
def test_agg_select_mem_1(self):
|
||||
resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20141001')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'rhiz'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'rhiz'
|
||||
|
||||
self._check_uri_date(resp, 'http://www.vvork.com/', '2014-10-06T18:43:57Z')
|
||||
|
||||
@ -177,7 +177,7 @@ class TestResAgg(object):
|
||||
def test_agg_select_mem_2(self):
|
||||
resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20151231')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'ia'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'ia'
|
||||
|
||||
self._check_uri_date(resp, 'http://vvork.com/', '2016-01-10T13:48:55Z')
|
||||
|
||||
@ -191,7 +191,7 @@ class TestResAgg(object):
|
||||
def test_agg_select_live(self):
|
||||
resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=2016')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'live'
|
||||
|
||||
self._check_uri_date(resp, 'http://vvork.com/', True)
|
||||
|
||||
@ -203,7 +203,7 @@ class TestResAgg(object):
|
||||
def test_agg_select_local(self):
|
||||
resp = self.testapp.get('/many/resource?url=http://iana.org/&closest=20140126200624')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'local'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'local'
|
||||
|
||||
self._check_uri_date(resp, 'http://www.iana.org/', '2014-01-26T20:06:24Z')
|
||||
|
||||
@ -222,7 +222,7 @@ Host: iana.org
|
||||
|
||||
resp = self.testapp.post('/many/resource/postreq?url=http://iana.org/&closest=20140126200624', req_data)
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'local'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'local'
|
||||
|
||||
self._check_uri_date(resp, 'http://www.iana.org/', '2014-01-26T20:06:24Z')
|
||||
|
||||
@ -241,7 +241,7 @@ Host: httpbin.org
|
||||
|
||||
resp = self.testapp.post('/many/resource/postreq?url=http://httpbin.org/get?foo=bar&closest=now', req_data)
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'live'
|
||||
|
||||
self._check_uri_date(resp, 'http://httpbin.org/get?foo=bar', True)
|
||||
|
||||
@ -266,7 +266,7 @@ foo=bar&test=abc"""
|
||||
|
||||
resp = self.testapp.post('/posttest/resource/postreq?url=http://httpbin.org/post', req_data)
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'post'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'post'
|
||||
|
||||
self._check_uri_date(resp, 'http://httpbin.org/post', True)
|
||||
|
||||
@ -285,7 +285,7 @@ foo=bar&test=abc"""
|
||||
|
||||
resp = self.testapp.post('/fallback/resource?url=http://httpbin.org/post', req_data)
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'post'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'post'
|
||||
|
||||
self._check_uri_date(resp, 'http://httpbin.org/post', True)
|
||||
|
||||
@ -301,7 +301,7 @@ foo=bar&test=abc"""
|
||||
def test_agg_seq_fallback_1(self):
|
||||
resp = self.testapp.get('/fallback/resource?url=http://www.iana.org/')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'live'
|
||||
|
||||
self._check_uri_date(resp, 'http://www.iana.org/', True)
|
||||
|
||||
@ -314,7 +314,7 @@ foo=bar&test=abc"""
|
||||
def test_agg_seq_fallback_2(self):
|
||||
resp = self.testapp.get('/fallback/resource?url=http://www.example.com/')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'example'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'example'
|
||||
|
||||
self._check_uri_date(resp, 'http://example.com/', '2016-02-25T04:23:29Z')
|
||||
|
||||
@ -336,7 +336,7 @@ foo=bar&test=abc"""
|
||||
def test_agg_local_revisit(self):
|
||||
resp = self.testapp.get('/many/resource?url=http://www.example.com/&closest=20140127171251&sources=local')
|
||||
|
||||
assert resp.headers['Source-Coll'] == 'local'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'local'
|
||||
|
||||
buff = BytesIO(resp.body)
|
||||
status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(buff)
|
||||
|
@ -1,7 +1,7 @@
|
||||
from gevent import monkey; monkey.patch_all(thread=False)
|
||||
|
||||
from webagg.aggregator import SimpleAggregator, GeventTimeoutAggregator
|
||||
from webagg.aggregator import ThreadedTimeoutAggregator, BaseAggregator
|
||||
from webagg.aggregator import BaseAggregator
|
||||
|
||||
from webagg.indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource
|
||||
from .testutils import json_list, to_path
|
||||
@ -25,29 +25,15 @@ sources = {
|
||||
|
||||
aggs = {'simple': SimpleAggregator(sources),
|
||||
'gevent': GeventTimeoutAggregator(sources, timeout=5.0),
|
||||
'threaded': ThreadedTimeoutAggregator(sources, timeout=5.0),
|
||||
'processes': ThreadedTimeoutAggregator(sources, timeout=5.0, use_processes=True),
|
||||
}
|
||||
|
||||
agg_tm = {'gevent': GeventTimeoutAggregator(sources, timeout=0.0),
|
||||
'threaded': ThreadedTimeoutAggregator(sources, timeout=0.0),
|
||||
'processes': ThreadedTimeoutAggregator(sources, timeout=0.0, use_processes=True)}
|
||||
agg_tm = {'gevent': GeventTimeoutAggregator(sources, timeout=0.0)}
|
||||
|
||||
nf = {'notfound': FileIndexSource(to_path('testdata/not-found-x'))}
|
||||
agg_nf = {'simple': SimpleAggregator(nf),
|
||||
'gevent': GeventTimeoutAggregator(nf, timeout=5.0),
|
||||
'threaded': ThreadedTimeoutAggregator(nf, timeout=5.0),
|
||||
'processes': ThreadedTimeoutAggregator(nf, timeout=5.0, use_processes=True),
|
||||
}
|
||||
|
||||
if six.PY2:
|
||||
del aggs['threaded']
|
||||
del aggs['processes']
|
||||
del agg_tm['threaded']
|
||||
del agg_tm['processes']
|
||||
del agg_nf['threaded']
|
||||
del agg_nf['processes']
|
||||
|
||||
|
||||
@pytest.mark.parametrize("agg", list(aggs.values()), ids=list(aggs.keys()))
|
||||
def test_mem_agg_index_1(agg):
|
||||
|
@ -83,7 +83,7 @@ class TestUpstream(object):
|
||||
|
||||
def test_live_1(self):
|
||||
resp = requests.get(self.base_url + '/live/resource?url=http://httpbin.org/get', stream=True)
|
||||
assert resp.headers['Source-Coll'] == 'live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'live'
|
||||
|
||||
record = ArcWarcRecordLoader().parse_record_stream(resp.raw, no_record_parse=False)
|
||||
assert record.rec_headers.get_header('WARC-Target-URI') == 'http://httpbin.org/get'
|
||||
@ -91,7 +91,7 @@ class TestUpstream(object):
|
||||
|
||||
def test_upstream_1(self):
|
||||
resp = self.testapp.get('/upstream/resource?url=http://httpbin.org/get')
|
||||
assert resp.headers['Source-Coll'] == 'upstream:live'
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'upstream:live'
|
||||
|
||||
raw = BytesIO(resp.body)
|
||||
|
||||
@ -101,7 +101,7 @@ class TestUpstream(object):
|
||||
|
||||
def test_upstream_2(self):
|
||||
resp = self.testapp.get('/upstream_opt/resource?url=http://httpbin.org/get')
|
||||
assert resp.headers['Source-Coll'] == 'upstream_opt:live', resp.headers
|
||||
assert resp.headers['WebAgg-Source-Coll'] == 'upstream_opt:live', resp.headers
|
||||
|
||||
raw = BytesIO(resp.body)
|
||||
|
||||
|
@ -37,10 +37,6 @@ class BaseAggregator(object):
|
||||
cdx_iter = process_cdx(cdx_iter, query)
|
||||
return cdx_iter, dict(errs)
|
||||
|
||||
def load_child_source_list(self, name, source, params):
|
||||
res = self.load_child_source(name, source, params)
|
||||
return list(res[0]), res[1]
|
||||
|
||||
def load_child_source(self, name, source, params):
|
||||
try:
|
||||
params['_formatter'] = ParamFormatter(params, name)
|
||||
@ -205,48 +201,6 @@ class GeventTimeoutAggregator(TimeoutMixin, GeventMixin, BaseSourceListAggregato
|
||||
pass
|
||||
|
||||
|
||||
#=============================================================================
|
||||
class ConcurrentMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ConcurrentMixin, self).__init__(*args, **kwargs)
|
||||
if kwargs.get('use_processes'):
|
||||
self.pool_class = futures.ThreadPoolExecutor
|
||||
else:
|
||||
self.pool_class = futures.ProcessPoolExecutor
|
||||
self.timeout = kwargs.get('timeout', 5.0)
|
||||
self.size = kwargs.get('size')
|
||||
|
||||
def _load_all(self, params):
|
||||
params['_timeout'] = self.timeout
|
||||
|
||||
sources = list(self._iter_sources(params))
|
||||
|
||||
with self.pool_class(max_workers=self.size) as executor:
|
||||
def do_spawn(name, source):
|
||||
return executor.submit(self.load_child_source_list,
|
||||
name, source, params), name
|
||||
|
||||
jobs = dict([do_spawn(name, source) for name, source in sources])
|
||||
|
||||
res_done, res_not_done = futures.wait(jobs.keys(), timeout=self.timeout)
|
||||
|
||||
results = []
|
||||
for job in res_done:
|
||||
results.append(job.result())
|
||||
|
||||
for job in res_not_done:
|
||||
name = jobs[job]
|
||||
results.append((iter([]), [(name, 'timeout')]))
|
||||
self._on_source_error(name)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
#=============================================================================
|
||||
class ThreadedTimeoutAggregator(TimeoutMixin, ConcurrentMixin, BaseSourceListAggregator):
|
||||
pass
|
||||
|
||||
|
||||
#=============================================================================
|
||||
class BaseDirectoryIndexSource(BaseAggregator):
|
||||
CDX_EXT = ('.cdx', '.cdxj')
|
||||
|
@ -70,7 +70,7 @@ class BaseLoader(object):
|
||||
|
||||
out_headers = {}
|
||||
out_headers['WebAgg-Type'] = 'warc'
|
||||
out_headers['Source-Coll'] = cdx.get('source', '')
|
||||
out_headers['WebAgg-Source-Coll'] = cdx.get('source', '')
|
||||
out_headers['Content-Type'] = 'application/warc-record'
|
||||
|
||||
if not warc_headers:
|
||||
@ -237,7 +237,7 @@ class LiveWebLoader(BaseLoader):
|
||||
|
||||
agg_type = upstream_res.headers.get('WebAgg-Type')
|
||||
if agg_type == 'warc':
|
||||
cdx['source'] = upstream_res.headers.get('Source-Coll')
|
||||
cdx['source'] = upstream_res.headers.get('WebAgg-Source-Coll')
|
||||
return None, upstream_res.headers, upstream_res.raw
|
||||
|
||||
http_headers_buff = recorder.get_headers_buff()
|
||||
|
Loading…
x
Reference in New Issue
Block a user