diff --git a/test/test_handlers.py b/test/test_handlers.py index f5c96e0f..c5577c5a 100644 --- a/test/test_handlers.py +++ b/test/test_handlers.py @@ -132,7 +132,7 @@ class TestResAgg(object): headers = {'foo': 'bar'} resp = self.testapp.get('/live/resource?url=http://httpbin.org/get?foo=bar', headers=headers) - assert resp.headers['Source-Coll'] == 'live' + assert resp.headers['WebAgg-Source-Coll'] == 'live' self._check_uri_date(resp, 'http://httpbin.org/get?foo=bar', True) @@ -148,7 +148,7 @@ class TestResAgg(object): resp = self.testapp.post('/live/resource?url=http://httpbin.org/post', OrderedDict([('foo', 'bar')])) - assert resp.headers['Source-Coll'] == 'live' + assert resp.headers['WebAgg-Source-Coll'] == 'live' self._check_uri_date(resp, 'http://httpbin.org/post', True) @@ -163,7 +163,7 @@ class TestResAgg(object): def test_agg_select_mem_1(self): resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20141001') - assert resp.headers['Source-Coll'] == 'rhiz' + assert resp.headers['WebAgg-Source-Coll'] == 'rhiz' self._check_uri_date(resp, 'http://www.vvork.com/', '2014-10-06T18:43:57Z') @@ -177,7 +177,7 @@ class TestResAgg(object): def test_agg_select_mem_2(self): resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20151231') - assert resp.headers['Source-Coll'] == 'ia' + assert resp.headers['WebAgg-Source-Coll'] == 'ia' self._check_uri_date(resp, 'http://vvork.com/', '2016-01-10T13:48:55Z') @@ -191,7 +191,7 @@ class TestResAgg(object): def test_agg_select_live(self): resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=2016') - assert resp.headers['Source-Coll'] == 'live' + assert resp.headers['WebAgg-Source-Coll'] == 'live' self._check_uri_date(resp, 'http://vvork.com/', True) @@ -203,7 +203,7 @@ class TestResAgg(object): def test_agg_select_local(self): resp = self.testapp.get('/many/resource?url=http://iana.org/&closest=20140126200624') - assert resp.headers['Source-Coll'] == 'local' + assert resp.headers['WebAgg-Source-Coll'] == 'local' self._check_uri_date(resp, 'http://www.iana.org/', '2014-01-26T20:06:24Z') @@ -222,7 +222,7 @@ Host: iana.org resp = self.testapp.post('/many/resource/postreq?url=http://iana.org/&closest=20140126200624', req_data) - assert resp.headers['Source-Coll'] == 'local' + assert resp.headers['WebAgg-Source-Coll'] == 'local' self._check_uri_date(resp, 'http://www.iana.org/', '2014-01-26T20:06:24Z') @@ -241,7 +241,7 @@ Host: httpbin.org resp = self.testapp.post('/many/resource/postreq?url=http://httpbin.org/get?foo=bar&closest=now', req_data) - assert resp.headers['Source-Coll'] == 'live' + assert resp.headers['WebAgg-Source-Coll'] == 'live' self._check_uri_date(resp, 'http://httpbin.org/get?foo=bar', True) @@ -266,7 +266,7 @@ foo=bar&test=abc""" resp = self.testapp.post('/posttest/resource/postreq?url=http://httpbin.org/post', req_data) - assert resp.headers['Source-Coll'] == 'post' + assert resp.headers['WebAgg-Source-Coll'] == 'post' self._check_uri_date(resp, 'http://httpbin.org/post', True) @@ -285,7 +285,7 @@ foo=bar&test=abc""" resp = self.testapp.post('/fallback/resource?url=http://httpbin.org/post', req_data) - assert resp.headers['Source-Coll'] == 'post' + assert resp.headers['WebAgg-Source-Coll'] == 'post' self._check_uri_date(resp, 'http://httpbin.org/post', True) @@ -301,7 +301,7 @@ foo=bar&test=abc""" def test_agg_seq_fallback_1(self): resp = self.testapp.get('/fallback/resource?url=http://www.iana.org/') - assert resp.headers['Source-Coll'] == 'live' + assert resp.headers['WebAgg-Source-Coll'] == 'live' self._check_uri_date(resp, 'http://www.iana.org/', True) @@ -314,7 +314,7 @@ foo=bar&test=abc""" def test_agg_seq_fallback_2(self): resp = self.testapp.get('/fallback/resource?url=http://www.example.com/') - assert resp.headers['Source-Coll'] == 'example' + assert resp.headers['WebAgg-Source-Coll'] == 'example' self._check_uri_date(resp, 'http://example.com/', '2016-02-25T04:23:29Z') @@ -336,7 +336,7 @@ foo=bar&test=abc""" def test_agg_local_revisit(self): resp = self.testapp.get('/many/resource?url=http://www.example.com/&closest=20140127171251&sources=local') - assert resp.headers['Source-Coll'] == 'local' + assert resp.headers['WebAgg-Source-Coll'] == 'local' buff = BytesIO(resp.body) status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(buff) diff --git a/test/test_memento_agg.py b/test/test_memento_agg.py index 934a9474..52dc79da 100644 --- a/test/test_memento_agg.py +++ b/test/test_memento_agg.py @@ -1,7 +1,7 @@ from gevent import monkey; monkey.patch_all(thread=False) from webagg.aggregator import SimpleAggregator, GeventTimeoutAggregator -from webagg.aggregator import ThreadedTimeoutAggregator, BaseAggregator +from webagg.aggregator import BaseAggregator from webagg.indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource from .testutils import json_list, to_path @@ -25,29 +25,15 @@ sources = { aggs = {'simple': SimpleAggregator(sources), 'gevent': GeventTimeoutAggregator(sources, timeout=5.0), - 'threaded': ThreadedTimeoutAggregator(sources, timeout=5.0), - 'processes': ThreadedTimeoutAggregator(sources, timeout=5.0, use_processes=True), } -agg_tm = {'gevent': GeventTimeoutAggregator(sources, timeout=0.0), - 'threaded': ThreadedTimeoutAggregator(sources, timeout=0.0), - 'processes': ThreadedTimeoutAggregator(sources, timeout=0.0, use_processes=True)} +agg_tm = {'gevent': GeventTimeoutAggregator(sources, timeout=0.0)} nf = {'notfound': FileIndexSource(to_path('testdata/not-found-x'))} agg_nf = {'simple': SimpleAggregator(nf), 'gevent': GeventTimeoutAggregator(nf, timeout=5.0), - 'threaded': ThreadedTimeoutAggregator(nf, timeout=5.0), - 'processes': ThreadedTimeoutAggregator(nf, timeout=5.0, use_processes=True), } -if six.PY2: - del aggs['threaded'] - del aggs['processes'] - del agg_tm['threaded'] - del agg_tm['processes'] - del agg_nf['threaded'] - del agg_nf['processes'] - @pytest.mark.parametrize("agg", list(aggs.values()), ids=list(aggs.keys())) def test_mem_agg_index_1(agg): diff --git a/test/test_upstream.py b/test/test_upstream.py index 6ca6bb61..505b8edb 100644 --- a/test/test_upstream.py +++ b/test/test_upstream.py @@ -83,7 +83,7 @@ class TestUpstream(object): def test_live_1(self): resp = requests.get(self.base_url + '/live/resource?url=http://httpbin.org/get', stream=True) - assert resp.headers['Source-Coll'] == 'live' + assert resp.headers['WebAgg-Source-Coll'] == 'live' record = ArcWarcRecordLoader().parse_record_stream(resp.raw, no_record_parse=False) assert record.rec_headers.get_header('WARC-Target-URI') == 'http://httpbin.org/get' @@ -91,7 +91,7 @@ class TestUpstream(object): def test_upstream_1(self): resp = self.testapp.get('/upstream/resource?url=http://httpbin.org/get') - assert resp.headers['Source-Coll'] == 'upstream:live' + assert resp.headers['WebAgg-Source-Coll'] == 'upstream:live' raw = BytesIO(resp.body) @@ -101,7 +101,7 @@ class TestUpstream(object): def test_upstream_2(self): resp = self.testapp.get('/upstream_opt/resource?url=http://httpbin.org/get') - assert resp.headers['Source-Coll'] == 'upstream_opt:live', resp.headers + assert resp.headers['WebAgg-Source-Coll'] == 'upstream_opt:live', resp.headers raw = BytesIO(resp.body) diff --git a/webagg/aggregator.py b/webagg/aggregator.py index 2810d3d0..8a810a63 100644 --- a/webagg/aggregator.py +++ b/webagg/aggregator.py @@ -37,10 +37,6 @@ class BaseAggregator(object): cdx_iter = process_cdx(cdx_iter, query) return cdx_iter, dict(errs) - def load_child_source_list(self, name, source, params): - res = self.load_child_source(name, source, params) - return list(res[0]), res[1] - def load_child_source(self, name, source, params): try: params['_formatter'] = ParamFormatter(params, name) @@ -205,48 +201,6 @@ class GeventTimeoutAggregator(TimeoutMixin, GeventMixin, BaseSourceListAggregato pass -#============================================================================= -class ConcurrentMixin(object): - def __init__(self, *args, **kwargs): - super(ConcurrentMixin, self).__init__(*args, **kwargs) - if kwargs.get('use_processes'): - self.pool_class = futures.ThreadPoolExecutor - else: - self.pool_class = futures.ProcessPoolExecutor - self.timeout = kwargs.get('timeout', 5.0) - self.size = kwargs.get('size') - - def _load_all(self, params): - params['_timeout'] = self.timeout - - sources = list(self._iter_sources(params)) - - with self.pool_class(max_workers=self.size) as executor: - def do_spawn(name, source): - return executor.submit(self.load_child_source_list, - name, source, params), name - - jobs = dict([do_spawn(name, source) for name, source in sources]) - - res_done, res_not_done = futures.wait(jobs.keys(), timeout=self.timeout) - - results = [] - for job in res_done: - results.append(job.result()) - - for job in res_not_done: - name = jobs[job] - results.append((iter([]), [(name, 'timeout')])) - self._on_source_error(name) - - return results - - -#============================================================================= -class ThreadedTimeoutAggregator(TimeoutMixin, ConcurrentMixin, BaseSourceListAggregator): - pass - - #============================================================================= class BaseDirectoryIndexSource(BaseAggregator): CDX_EXT = ('.cdx', '.cdxj') diff --git a/webagg/responseloader.py b/webagg/responseloader.py index 82d98e41..31a5298e 100644 --- a/webagg/responseloader.py +++ b/webagg/responseloader.py @@ -70,7 +70,7 @@ class BaseLoader(object): out_headers = {} out_headers['WebAgg-Type'] = 'warc' - out_headers['Source-Coll'] = cdx.get('source', '') + out_headers['WebAgg-Source-Coll'] = cdx.get('source', '') out_headers['Content-Type'] = 'application/warc-record' if not warc_headers: @@ -237,7 +237,7 @@ class LiveWebLoader(BaseLoader): agg_type = upstream_res.headers.get('WebAgg-Type') if agg_type == 'warc': - cdx['source'] = upstream_res.headers.get('Source-Coll') + cdx['source'] = upstream_res.headers.get('WebAgg-Source-Coll') return None, upstream_res.headers, upstream_res.raw http_headers_buff = recorder.get_headers_buff()