diff --git a/rezag/aggindexsource.py b/rezag/aggindexsource.py index 292622c9..738c781d 100644 --- a/rezag/aggindexsource.py +++ b/rezag/aggindexsource.py @@ -59,11 +59,14 @@ class BaseAggregator(object): params['_all_src_params'] = src_params - def load_child_source(self, name, source, all_params): + def load_child_source_list(self, name, source, params): + return list(self.load_child_source(name, source, params)) + + def load_child_source(self, name, source, params): try: - _src_params = all_params['_all_src_params'].get(name) - all_params['_src_params'] = _src_params - cdx_iter = source.load_index(all_params) + _src_params = params['_all_src_params'].get(name) + params['_src_params'] = _src_params + cdx_iter = source.load_index(params) except NotFoundException as nf: print('Not found in ' + name) cdx_iter = iter([]) @@ -75,10 +78,10 @@ class BaseAggregator(object): cdx['source'] = name return cdx - return [add_name(cdx) for cdx in cdx_iter] + return (add_name(cdx) for cdx in cdx_iter) def load_index(self, params): - iter_list = list(self._load_all(params)) + iter_list = self._load_all(params) #optimization: if only a single entry (or empty) just load directly if len(iter_list) <= 1: @@ -130,9 +133,9 @@ class SeqAggMixin(object): def _load_all(self, params): - sources = list(self._iter_sources(params)) - return list([self.load_child_source(name, source, params) - for name, source in sources]) + sources = self._iter_sources(params) + return [self.load_child_source(name, source, params) + for name, source in sources] #============================================================================= @@ -232,7 +235,7 @@ class ConcurrentMixin(object): with self.pool_class(max_workers=self.size) as executor: def do_spawn(name, source): - return executor.submit(self.load_child_source, + return executor.submit(self.load_child_source_list, name, source, params), name jobs = dict([do_spawn(name, source) for name, source in sources]) @@ -255,10 +258,10 @@ class ThreadedTimeoutAggregator(TimeoutMixin, ConcurrentMixin, BaseSourceListAgg #============================================================================= -class BaseDirectoryIndexAggregator(BaseAggregator): +class BaseDirectoryIndexSource(BaseAggregator): CDX_EXT = ('.cdx', '.cdxj') - def __init__(self, base_prefix, base_dir): + def __init__(self, base_prefix, base_dir=''): self.base_prefix = base_prefix self.base_dir = base_dir @@ -299,7 +302,7 @@ class BaseDirectoryIndexAggregator(BaseAggregator): return 'file_dir' -class DirectoryIndexAggregator(SeqAggMixin, BaseDirectoryIndexAggregator): +class DirectoryIndexSource(SeqAggMixin, BaseDirectoryIndexSource): pass diff --git a/rezag/app.py b/rezag/app.py index 90275d21..bb4b4892 100644 --- a/rezag/app.py +++ b/rezag/app.py @@ -1,5 +1,6 @@ from rezag.inputrequest import DirectWSGIInputRequest, POSTInputRequest from bottle import route, request, response, default_app, abort +import bottle from pywb.utils.wbexception import WbException @@ -11,37 +12,53 @@ def err_handler(exc): response.content_type = 'application/json' return json.dumps({'message': exc.body}) + def wrap_error(func): - def do_d(*args, **kwargs): + def wrap_func(*args, **kwargs): try: return func(*args, **kwargs) except WbException as exc: - if application.debug: + if bottle.debug: traceback.print_exc() abort(exc.status(), exc.msg) except Exception as e: - if application.debug: + if bottle.debug: traceback.print_exc() abort(500, 'Internal Error: ' + str(e)) - return do_d + return wrap_func +route_dict = {} + def add_route(path, handler): + @route(path, 'ANY') @wrap_error - def direct_input_request(mode=''): + def direct_input_request(): params = dict(request.query) params['_input_req'] = DirectWSGIInputRequest(request.environ) return handler(params) + @route(path + '/postreq', 'POST') @wrap_error - def post_fullrequest(mode=''): + def post_fullrequest(): params = dict(request.query) params['_input_req'] = POSTInputRequest(request.environ) return handler(params) - route(path + '/postreq', method=['POST'], callback=post_fullrequest) - route(path, method=['ANY'], callback=direct_input_request) + global route_dict + handler_dict = {'handler': handler.get_supported_modes()} + route_dict[path] = handler_dict + route_dict[path + '/postreq'] = handler_dict + +@route('/') +def list_routes(): + return route_dict + + + + + application = default_app() diff --git a/rezag/handlers.py b/rezag/handlers.py index ff19c725..a2a7fcd7 100644 --- a/rezag/handlers.py +++ b/rezag/handlers.py @@ -7,7 +7,7 @@ from bottle import response #============================================================================= def to_cdxj(cdx_iter, fields): - response.headers['Content-Type'] = 'application/x-cdxj' + response.headers['Content-Type'] = 'text/x-cdxj' return [cdx.to_cdxj(fields) for cdx in cdx_iter] def to_json(cdx_iter, fields): @@ -120,6 +120,10 @@ class HandlerSeq(object): def __init__(self, handlers): self.handlers = handlers + def get_supported_modes(self): + return [] + # return zip([self.handlers.get_supported_modes()] + def __call__(self, params): last_exc = None for handler in self.handlers: diff --git a/rezag/indexsource.py b/rezag/indexsource.py index ed4a26a6..06822150 100644 --- a/rezag/indexsource.py +++ b/rezag/indexsource.py @@ -45,10 +45,13 @@ class FileIndexSource(BaseIndexSource): except IOError: raise NotFoundException(filename) - with fh: - gen = iter_range(fh, params['key'], params['end_key']) - for line in gen: - yield CDXObject(line) + def do_load(fh): + with fh: + gen = iter_range(fh, params['key'], params['end_key']) + for line in gen: + yield CDXObject(line) + + return do_load(fh) def __str__(self): return 'file' @@ -62,7 +65,6 @@ class RemoteIndexSource(BaseIndexSource): def load_index(self, params): api_url = self.res_template(self.api_url_template, params) - print('API URL', api_url) r = requests.get(api_url, timeout=params.get('_timeout')) if r.status_code >= 400: raise NotFoundException(api_url) diff --git a/rezag/responseloader.py b/rezag/responseloader.py index ee835c80..ed4cc6aa 100644 --- a/rezag/responseloader.py +++ b/rezag/responseloader.py @@ -12,21 +12,37 @@ import uuid #============================================================================= -def incr_reader(stream, header=None, size=8192): - if header: - yield header +class StreamIter(object): + def __init__(self, stream, header=None, size=8192): + self.stream = stream + self.header = header + self.size = size - while True: - data = stream.read(size) + def __iter__(self): + return self + + def __next__(self): + if self.header: + header = self.header + self.header = None + return header + + data = self.stream.read(self.size) if data: - yield data - else: - break + return data - try: - stream.close() - except: - pass + self.close() + raise StopIteration + + def close(self): + if not self.stream: + return + + try: + self.stream.close() + self.stream = None + except Exception: + pass #============================================================================= @@ -83,7 +99,8 @@ class WARCPathLoader(object): response.headers['WARC-Refers-To-Date'] = payload.rec_headers.get_header('WARC-Date') headers.stream.close() - return incr_reader(record.stream) + res = StreamIter(record.stream) + return res #============================================================================= @@ -172,7 +189,7 @@ class LiveWebLoader(object): except: raise - return incr_reader(upstream_res.raw, header=resp_headers) + return StreamIter(upstream_res.raw, header=resp_headers) @staticmethod def _make_date(dt): diff --git a/rezag/utils.py b/rezag/utils.py index b10eeef8..ab44aa4e 100644 --- a/rezag/utils.py +++ b/rezag/utils.py @@ -59,7 +59,7 @@ class MementoUtils(object): def make_timemap_memento_link(cdx, datetime=None, rel='memento', end=',\n'): url = cdx.get('load_url') if not url: - url = 'filename://' + cdx.get('filename') + url = 'file://{0}:{1}:{2}'.format(cdx.get('filename'), cdx.get('offset'), cdx.get('length')) memento = '<{0}>; rel="{1}"; datetime="{2}"; src="{3}"' + end diff --git a/test/test_dir_agg.py b/test/test_dir_agg.py index 42f6387f..6ec1c6a4 100644 --- a/test/test_dir_agg.py +++ b/test/test_dir_agg.py @@ -5,7 +5,7 @@ import json from .testutils import to_path -from rezag.aggindexsource import DirectoryIndexAggregator, SimpleAggregator +from rezag.aggindexsource import DirectoryIndexSource, SimpleAggregator from rezag.indexsource import MementoIndexSource @@ -37,7 +37,7 @@ def setup_module(): fh.write('foo') global dir_loader - dir_loader = DirectoryIndexAggregator(dir_prefix, dir_path) + dir_loader = DirectoryIndexSource(dir_prefix, dir_path) global orig_cwd orig_cwd = os.getcwd() @@ -147,7 +147,7 @@ def test_agg_no_dir_1(): def test_agg_no_dir_2(): - loader = DirectoryIndexAggregator(root_dir, '') + loader = DirectoryIndexSource(root_dir, '') res = loader({'url': 'example.com/', 'param.coll': 'X'}) exp = [] @@ -175,7 +175,7 @@ def test_agg_dir_sources_2(): def test_agg_dir_sources_single_dir(): - loader = DirectoryIndexAggregator('testdata/', '') + loader = DirectoryIndexSource('testdata/', '') res = loader.get_source_list({'url': 'example.com/'}) exp = {'sources': {}} diff --git a/test/test_handlers.py b/test/test_handlers.py index f5ac05a2..55d63d62 100644 --- a/test/test_handlers.py +++ b/test/test_handlers.py @@ -6,7 +6,7 @@ from rezag.handlers import DefaultResourceHandler, HandlerSeq from rezag.indexsource import MementoIndexSource, FileIndexSource, LiveIndexSource from rezag.aggindexsource import GeventTimeoutAggregator, SimpleAggregator -from rezag.aggindexsource import DirectoryIndexAggregator +from rezag.aggindexsource import DirectoryIndexSource from rezag.app import add_route, application @@ -18,7 +18,7 @@ from .testutils import to_path import json sources = { - 'local': DirectoryIndexAggregator(to_path('testdata/'), ''), + 'local': DirectoryIndexSource(to_path('testdata/'), ''), 'ia': MementoIndexSource.from_timegate_url('http://web.archive.org/web/'), 'rhiz': MementoIndexSource.from_timegate_url('http://webenact.rhizome.org/vvork/', path='*'), 'live': LiveIndexSource(), diff --git a/test/test_indexsource.py b/test/test_indexsource.py index c935a5fd..5853f02c 100644 --- a/test/test_indexsource.py +++ b/test/test_indexsource.py @@ -162,7 +162,6 @@ def test_all_not_found(source): assert(key_ts_res(res) == expected) - # ============================================================================ def test_another_remote_not_found(): source = MementoIndexSource.from_timegate_url('http://www.webarchive.org.uk/wayback/archive/') @@ -180,12 +179,11 @@ def test_file_not_found(): url = 'http://x-not-found-x.notfound/' res = query_single_source(source, dict(url=url, limit=3)) - expected = '' assert(key_ts_res(res) == expected) - +# ============================================================================ def test_ait_filters(): ait_source = RemoteIndexSource('http://wayback.archive-it.org/cdx/search/cdx?url={url}&filter=filename:ARCHIVEIT-({colls})-.*', 'http://wayback.archive-it.org/all/{timestamp}id_/{url}') diff --git a/test/test_memento_agg.py b/test/test_memento_agg.py index 59040670..9a1b9209 100644 --- a/test/test_memento_agg.py +++ b/test/test_memento_agg.py @@ -27,6 +27,13 @@ aggs = {'simple': SimpleAggregator(sources), 'processes': ThreadedTimeoutAggregator(sources, timeout=5.0, use_processes=True), } +nf = {'notfound': FileIndexSource(to_path('testdata/not-found-x'))} +agg_nf = {'simple': SimpleAggregator(nf), + 'gevent': GeventTimeoutAggregator(nf, timeout=5.0), + 'threaded': ThreadedTimeoutAggregator(nf, timeout=5.0), + 'processes': ThreadedTimeoutAggregator(nf, timeout=5.0, use_processes=True), + } + #def pytest_generate_tests(metafunc): # metafunc.parametrize("agg", list(aggs.values()), ids=list(aggs.keys())) @@ -87,6 +94,14 @@ def test_mem_agg_index_4(agg): assert(json_list(res) == exp) +@pytest.mark.parametrize("agg", list(agg_nf.values()), ids=list(agg_nf.keys())) +def test_mem_agg_not_found(agg): + url = 'http://vvork.com/' + res = agg(dict(url=url, closest='20141001', limit=2)) + + assert(json_list(res) == []) + + def test_handler_output_cdxj(): agg = GeventTimeoutAggregator(sources, timeout=5.0) handler = IndexHandler(agg) @@ -136,7 +151,7 @@ def test_handler_output_link_2(): exp = """\ ; rel="memento"; datetime="Sun, 26 Jan 2014 09:37:43 GMT"; src="ia", -; rel="memento"; datetime="Sun, 26 Jan 2014 20:06:24 GMT"; src="local", +; rel="memento"; datetime="Sun, 26 Jan 2014 20:06:24 GMT"; src="local", ; rel="memento"; datetime="Thu, 23 Jan 2014 03:47:55 GMT"; src="ia", ; rel="memento"; datetime="Wed, 29 Jan 2014 17:52:03 GMT"; src="ia", ; rel="memento"; datetime="Tue, 07 Jan 2014 04:05:52 GMT"; src="ait"