diff --git a/rezag/aggindexsource.py b/rezag/aggindexsource.py index 76e32525..435d0152 100644 --- a/rezag/aggindexsource.py +++ b/rezag/aggindexsource.py @@ -1,9 +1,13 @@ from gevent.pool import Pool import gevent + +from concurrent import futures + import json import time import os +from pywb.utils.timeutils import timestamp_now from pywb.cdx.cdxops import process_cdx from pywb.cdx.query import CDXQuery @@ -19,6 +23,9 @@ import glob #============================================================================= class BaseAggregator(object): def __call__(self, params): + if params.get('closest') == 'now': + params['closest'] = timestamp_now() + query = CDXQuery(params) self._set_src_params(params) @@ -55,32 +62,21 @@ class BaseAggregator(object): def load_child_source(self, name, source, all_params): try: _src_params = all_params['_all_src_params'].get(name) + all_params['_src_params'] = _src_params - #params = dict(url=all_params['url'], - # key=all_params['key'], - # end_key=all_params['end_key'], - # closest=all_params.get('closest'), - # _input_req=all_params.get('_input_req'), - # _timeout=all_params.get('_timeout'), - # _all_src_params=all_params.get('_all_src_params'), - # _src_params=_src_params) - - params = all_params - params['_src_params'] = _src_params - cdx_iter = source.load_index(params) + cdx_iter = source.load_index(all_params) except NotFoundException as nf: print('Not found in ' + name) cdx_iter = iter([]) - def add_name(cdx_iter): - for cdx in cdx_iter: - if 'source' in cdx: - cdx['source'] = name + '.' + cdx['source'] - else: - cdx['source'] = name - yield cdx + def add_name(cdx): + if cdx.get('source'): + cdx['source'] = name + ':' + cdx['source'] + else: + cdx['source'] = name + return cdx - return add_name(cdx_iter) + return [add_name(cdx) for cdx in cdx_iter] def load_index(self, params): iter_list = list(self._load_all(params)) @@ -93,6 +89,9 @@ class BaseAggregator(object): return cdx_iter + def _on_source_error(self, name): + pass + def _load_all(self, params): #pragma: no cover raise NotImplemented() @@ -167,7 +166,7 @@ class TimeoutMixin(object): if not self.is_timed_out(name): yield name, source - def track_source_error(self, name): + def _on_source_error(self, name): the_time = time.time() if name not in self.timeouts: self.timeouts[name] = deque() @@ -177,15 +176,12 @@ class TimeoutMixin(object): #============================================================================= -class GeventAggMixin(object): +class GeventMixin(object): def __init__(self, *args, **kwargs): - super(GeventAggMixin, self).__init__(*args, **kwargs) + super(GeventMixin, self).__init__(*args, **kwargs) self.pool = Pool(size=kwargs.get('size')) self.timeout = kwargs.get('timeout', 5.0) - def track_source_error(self, name): - pass - def _load_all(self, params): params['_timeout'] = self.timeout @@ -198,18 +194,58 @@ class GeventAggMixin(object): gevent.joinall(jobs, timeout=self.timeout) - res = [] - for name, job in zip(sources, jobs): - if job.value: - res.append(job.value) + results = [] + for (name, source), job in zip(sources, jobs): + if job.value is not None: + results.append(job.value) else: - self.track_source_error(name) + self._on_source_error(name) - return res + return results #============================================================================= -class GeventTimeoutAggregator(TimeoutMixin, GeventAggMixin, BaseSourceListAggregator): +class GeventTimeoutAggregator(TimeoutMixin, GeventMixin, BaseSourceListAggregator): + pass + + +#============================================================================= +class ConcurrentMixin(object): + def __init__(self, *args, **kwargs): + super(ConcurrentMixin, self).__init__(*args, **kwargs) + if kwargs.get('use_processes'): + self.pool_class = futures.ThreadPoolExecutor + else: + self.pool_class = futures.ProcessPoolExecutor + self.timeout = kwargs.get('timeout', 5.0) + self.size = kwargs.get('size') + + def _load_all(self, params): + params['_timeout'] = self.timeout + + sources = list(self.get_sources(params)) + + with self.pool_class(max_workers=self.size) as executor: + def do_spawn(name, source): + return executor.submit(self.load_child_source, + name, source, params), name + + jobs = dict([do_spawn(name, source) for name, source in sources]) + + res_done, res_not_done = futures.wait(jobs.keys(), timeout=self.timeout) + + results = [] + for job in res_done: + results.append(job.result()) + + for job in res_not_done: + self._on_source_error(jobs[job]) + + return results + + +#============================================================================= +class ThreadedTimeoutAggregator(TimeoutMixin, ConcurrentMixin, BaseSourceListAggregator): pass @@ -244,13 +280,14 @@ class BaseDirectoryIndexAggregator(BaseAggregator): def _load_files(self, glob_dir): for the_dir in glob.iglob(glob_dir): - print(the_dir) for name in os.listdir(the_dir): filename = os.path.join(the_dir, name) if filename.endswith(self.CDX_EXT): print('Adding ' + filename) rel_path = os.path.relpath(the_dir, self.base_prefix) + if rel_path == '.': + rel_path = '' yield rel_path, FileIndexSource(filename) class DirectoryIndexAggregator(SeqAggMixin, BaseDirectoryIndexAggregator): diff --git a/rezag/app.py b/rezag/app.py new file mode 100644 index 00000000..c25b4ac7 --- /dev/null +++ b/rezag/app.py @@ -0,0 +1,31 @@ +from rezag.inputrequest import WSGIInputRequest, POSTInputRequest +from bottle import route, request, response, default_app + + +def add_route(path, handler): + def debug(func): + def do_d(): + try: + return func() + except Exception: + import traceback + traceback.print_exc() + + return do_d + + def direct_input_request(): + params = dict(request.query) + params['_input_req'] = WSGIInputRequest(request.environ) + return handler(params) + + def post_fullrequest(): + params = dict(request.query) + params['_input_req'] = POSTInputRequest(request.environ) + return handler(params) + + route(path + '/postreq', method=['POST'], callback=debug(post_fullrequest)) + route(path, method=['ANY'], callback=debug(direct_input_request)) + + +application = default_app() + diff --git a/rezag/handlers.py b/rezag/handlers.py index 30e3ce98..1a6e3495 100644 --- a/rezag/handlers.py +++ b/rezag/handlers.py @@ -1,6 +1,6 @@ +from rezag.responseloader import WARCPathHandler, LiveWebHandler from rezag.utils import MementoUtils from pywb.warc.recordloader import ArchiveLoadFailed -from rezag.responseloader import WARCPathHandler, LiveWebHandler from bottle import response @@ -46,7 +46,7 @@ class IndexHandler(object): input_req = params.get('_input_req') if input_req: - params['url'] = input_req.include_post_query() + params['alt_url'] = input_req.include_post_query(params.get('url')) cdx_iter = self.index_source(params) @@ -71,13 +71,16 @@ class ResourceHandler(IndexHandler): if params.get('mode', 'resource') != 'resource': return super(ResourceHandler, self).__call__(params) + input_req = params.get('_input_req') + if input_req: + params['alt_url'] = input_req.include_post_query(params.get('url')) + cdx_iter = self.index_source(params) any_found = False for cdx in cdx_iter: any_found = True - cdx['coll'] = params.get('coll', '') for loader in self.resource_loaders: try: diff --git a/rezag/indexsource.py b/rezag/indexsource.py index 200d136a..a597e0c4 100644 --- a/rezag/indexsource.py +++ b/rezag/indexsource.py @@ -9,7 +9,7 @@ from pywb.utils.wbexception import NotFoundException from pywb.cdx.cdxobject import CDXObject from pywb.cdx.query import CDXQuery -import requests +from rezag.liverec import patched_requests as requests from rezag.utils import MementoUtils @@ -37,7 +37,12 @@ class FileIndexSource(BaseIndexSource): def load_index(self, params): filename = self.res_template(self.filename_template, params) - with open(filename, 'rb') as fh: + try: + fh = open(filename, 'rb') + except IOError: + raise NotFoundException(filename) + + with fh: gen = iter_range(fh, params['key'], params['end_key']) for line in gen: yield CDXObject(line) diff --git a/rezag/inputrequest.py b/rezag/inputrequest.py index 221ede0f..17b6ef6b 100644 --- a/rezag/inputrequest.py +++ b/rezag/inputrequest.py @@ -5,6 +5,7 @@ from pywb.utils.statusandheaders import StatusAndHeadersParser from six.moves.urllib.parse import urlsplit from six import StringIO, iteritems +from io import BytesIO #============================================================================= @@ -15,19 +16,19 @@ class WSGIInputRequest(object): def get_req_method(self): return self.env['REQUEST_METHOD'].upper() - def get_req_headers(self, url): + def get_req_headers(self): headers = {} - splits = urlsplit(url) - - for name, value in six.iteritems(self.env): + for name, value in iteritems(self.env): if name == 'HTTP_HOST': - name = 'Host' - value = splits.netloc + #name = 'Host' + #value = splits.netloc + # will be set automatically + continue - elif name == 'HTTP_ORIGIN': - name = 'Origin' - value = (splits.scheme + '://' + splits.netloc) + #elif name == 'HTTP_ORIGIN': + # name = 'Origin' + # value = (splits.scheme + '://' + splits.netloc) elif name == 'HTTP_X_CSRFTOKEN': name = 'X-CSRFToken' @@ -35,9 +36,9 @@ class WSGIInputRequest(object): if cookie_val: value = cookie_val - elif name == 'HTTP_X_FORWARDED_PROTO': - name = 'X-Forwarded-Proto' - value = splits.scheme + #elif name == 'HTTP_X_FORWARDED_PROTO': + # name = 'X-Forwarded-Proto' + # value = splits.scheme elif name.startswith('HTTP_'): name = name[5:].title().replace('_', '-') @@ -83,7 +84,7 @@ class WSGIInputRequest(object): return self.env.get('HTTP_' + name.upper().replace('-', '_')) def include_post_query(self, url): - if self.get_req_method() != 'POST': + if not url or self.get_req_method() != 'POST': return url mime = self._get_content_type() @@ -91,7 +92,7 @@ class WSGIInputRequest(object): length = self._get_content_length() stream = self.env['wsgi.input'] - buffered_stream = StringIO() + buffered_stream = BytesIO() post_query = extract_post_query('POST', mime, length, stream, buffered_stream=buffered_stream) @@ -115,7 +116,7 @@ class POSTInputRequest(WSGIInputRequest): def get_req_method(self): return self.status_headers.protocol - def get_req_headers(self, url): + def get_req_headers(self): headers = {} for n, v in self.status_headers.headers: headers[n] = v diff --git a/rezag/responseloader.py b/rezag/responseloader.py index f4c4fa04..52bf4760 100644 --- a/rezag/responseloader.py +++ b/rezag/responseloader.py @@ -45,7 +45,11 @@ class WARCPathHandler(object): for path in self.paths: def check(filename, cdx): try: - full_path = path.format(**cdx) + if hasattr(cdx, '_src_params') and cdx._src_params: + full_path = path.format(**cdx._src_params) + else: + full_path = path + full_path += filename return full_path except KeyError: return None @@ -57,15 +61,13 @@ class WARCPathHandler(object): if not cdx.get('filename') or cdx.get('offset') is None: return None + cdx._src_params = params.get('_src_params') failed_files = [] headers, payload = (self.resolve_loader. load_headers_and_payload(cdx, failed_files, self.cdx_source)) - if headers != payload: - headers.stream.close() - record = payload for n, v in record.rec_headers.headers: @@ -73,6 +75,13 @@ class WARCPathHandler(object): response.headers['WARC-Coll'] = cdx.get('source') + if headers != payload: + response.headers['WARC-Target-URI'] = headers.rec_headers.get_header('WARC-Target-URI') + response.headers['WARC-Date'] = headers.rec_headers.get_header('WARC-Date') + response.headers['WARC-Refers-To-Target-URI'] = payload.rec_headers.get_header('WARC-Target-URI') + response.headers['WARC-Refers-To-Date'] = payload.rec_headers.get_header('WARC-Date') + headers.stream.close() + return incr_reader(record.stream) @@ -114,13 +123,20 @@ class LiveWebHandler(object): input_req = params['_input_req'] - req_headers = input_req.get_req_headers(cdx['url']) + req_headers = input_req.get_req_headers() dt = timestamp_to_datetime(cdx['timestamp']) if not cdx.get('is_live'): req_headers['Accept-Datetime'] = datetime_to_http_date(dt) + # if different url, ensure origin is not set + # may need to add other headers + if load_url != cdx['url']: + if 'Origin' in req_headers: + splits = urlsplit(load_url) + req_headers['Origin'] = splits.scheme + '://' + splits.netloc + method = input_req.get_req_method() data = input_req.get_req_body() diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/test_dir_agg.py b/test/test_dir_agg.py index 02cd5839..3a9c916f 100644 --- a/test/test_dir_agg.py +++ b/test/test_dir_agg.py @@ -3,13 +3,16 @@ import os import shutil import json +from .testutils import to_path + from rezag.aggindexsource import DirectoryIndexAggregator, SimpleAggregator +from rezag.indexsource import MementoIndexSource #============================================================================= root_dir = None orig_cwd = None -dir_agg = None +dir_loader = None def setup_module(): global root_dir @@ -17,18 +20,21 @@ def setup_module(): coll_A = to_path(root_dir + '/colls/A/indexes') coll_B = to_path(root_dir + '/colls/B/indexes') + coll_C = to_path(root_dir + '/colls/C/indexes') os.makedirs(coll_A) os.makedirs(coll_B) + os.makedirs(coll_C) dir_prefix = to_path(root_dir) dir_path ='colls/{coll}/indexes' shutil.copy(to_path('testdata/example.cdxj'), coll_A) shutil.copy(to_path('testdata/iana.cdxj'), coll_B) + shutil.copy(to_path('testdata/dupes.cdxj'), coll_C) - global dir_agg - dir_agg = DirectoryIndexAggregator(dir_prefix, dir_path) + global dir_loader + dir_loader = DirectoryIndexAggregator(dir_prefix, dir_path) global orig_cwd orig_cwd = os.getcwd() @@ -45,57 +51,103 @@ def teardown_module(): shutil.rmtree(root_dir) -def to_path(path): - if os.path.sep != '/': - path = path.replace('/', os.path.sep) - - return path - - def to_json_list(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source']): return list([json.loads(cdx.to_json(fields)) for cdx in cdxlist]) def test_agg_no_coll_set(): - res = dir_agg(dict(url='example.com/')) + res = dir_loader(dict(url='example.com/')) assert(to_json_list(res) == []) def test_agg_collA_found(): - res = dir_agg({'url': 'example.com/', 'param.coll': 'A'}) + res = dir_loader({'url': 'example.com/', 'param.coll': 'A'}) exp = [{'source': 'colls/A/indexes', 'timestamp': '20160225042329', 'filename': 'example.warc.gz'}] assert(to_json_list(res) == exp) def test_agg_collB(): - res = dir_agg({'url': 'example.com/', 'param.coll': 'B'}) + res = dir_loader({'url': 'example.com/', 'param.coll': 'B'}) exp = [] assert(to_json_list(res) == exp) def test_agg_collB_found(): - res = dir_agg({'url': 'iana.org/', 'param.coll': 'B'}) + res = dir_loader({'url': 'iana.org/', 'param.coll': 'B'}) exp = [{'source': 'colls/B/indexes', 'timestamp': '20140126200624', 'filename': 'iana.warc.gz'}] assert(to_json_list(res) == exp) -def test_agg_all_found(): - res = dir_agg({'url': 'iana.org/', 'param.coll': '*'}) +def test_extra_agg_collB(): + agg_source = SimpleAggregator({'dir': dir_loader}) + res = agg_source({'url': 'iana.org/', 'param.coll': 'B'}) - exp = [{'source': 'colls/B/indexes', 'timestamp': '20140126200624', 'filename': 'iana.warc.gz'}] + exp = [{'source': 'dir:colls/B/indexes', 'timestamp': '20140126200624', 'filename': 'iana.warc.gz'}] assert(to_json_list(res) == exp) -def test_extra_agg_all(): - agg_dir_agg = SimpleAggregator({'dir': dir_agg}) - res = agg_dir_agg({'url': 'iana.org/', 'param.coll': '*'}) +def test_agg_all_found_1(): + res = dir_loader({'url': 'iana.org/', 'param.coll': '*'}) - exp = [{'source': 'dir.colls/B/indexes', 'timestamp': '20140126200624', 'filename': 'iana.warc.gz'}] + exp = [ + {'source': 'colls/B/indexes', 'timestamp': '20140126200624', 'filename': 'iana.warc.gz'}, + {'source': 'colls/C/indexes', 'timestamp': '20140127171238', 'filename': 'dupes.warc.gz'}, + {'source': 'colls/C/indexes', 'timestamp': '20140127171238', 'filename': 'dupes.warc.gz'}, + ] + + assert(to_json_list(res) == exp) + + +def test_agg_all_found_2(): + res = dir_loader({'url': 'example.com/', 'param.coll': '*'}) + + exp = [ + {'source': 'colls/C/indexes', 'timestamp': '20140127171200', 'filename': 'dupes.warc.gz'}, + {'source': 'colls/C/indexes', 'timestamp': '20140127171251', 'filename': 'dupes.warc.gz'}, + {'source': 'colls/A/indexes', 'timestamp': '20160225042329', 'filename': 'example.warc.gz'} + ] + + assert(to_json_list(res) == exp) + + + +def test_agg_dir_and_memento(): + sources = {'ia': MementoIndexSource.from_timegate_url('http://web.archive.org/web/'), + 'local': dir_loader} + agg_source = SimpleAggregator(sources) + + res = agg_source({'url': 'example.com/', 'param.coll': '*', 'closest': '20100512', 'limit': 6}) + + exp = [ + {'source': 'ia', 'timestamp': '20100513052358', 'load_url': 'http://web.archive.org/web/20100513052358id_/http://example.com/'}, + {'source': 'ia', 'timestamp': '20100514231857', 'load_url': 'http://web.archive.org/web/20100514231857id_/http://example.com/'}, + {'source': 'ia', 'timestamp': '20100506013442', 'load_url': 'http://web.archive.org/web/20100506013442id_/http://example.com/'}, + {'source': 'local:colls/C/indexes', 'timestamp': '20140127171200', 'filename': 'dupes.warc.gz'}, + {'source': 'local:colls/C/indexes', 'timestamp': '20140127171251', 'filename': 'dupes.warc.gz'}, + {'source': 'local:colls/A/indexes', 'timestamp': '20160225042329', 'filename': 'example.warc.gz'} + ] + + assert(to_json_list(res) == exp) + + +def test_agg_no_dir_1(): + res = dir_loader({'url': 'example.com/', 'param.coll': 'X'}) + + exp = [] + + assert(to_json_list(res) == exp) + + +def test_agg_no_dir_2(): + loader = DirectoryIndexAggregator(root_dir, 'no_such') + res = loader({'url': 'example.com/', 'param.coll': 'X'}) + + exp = [] assert(to_json_list(res) == exp) diff --git a/test/test_handlers.py b/test/test_handlers.py new file mode 100644 index 00000000..1e2d2822 --- /dev/null +++ b/test/test_handlers.py @@ -0,0 +1,216 @@ +from gevent import monkey; monkey.patch_all(thread=False) + +from collections import OrderedDict + +from rezag.handlers import DefaultResourceHandler, HandlerSeq + +from rezag.indexsource import MementoIndexSource, FileIndexSource, LiveIndexSource +from rezag.aggindexsource import GeventTimeoutAggregator, SimpleAggregator +from rezag.aggindexsource import DirectoryIndexAggregator + +from rezag.app import add_route, application + +import webtest +import bottle + +from .testutils import to_path + +import json + +sources = { + 'local': DirectoryIndexAggregator(to_path('testdata/'), ''), + 'ia': MementoIndexSource.from_timegate_url('http://web.archive.org/web/'), + 'rhiz': MementoIndexSource.from_timegate_url('http://webenact.rhizome.org/vvork/', path='*'), + 'live': LiveIndexSource(), +} + +testapp = None + +def setup_module(self): + live_source = SimpleAggregator({'live': LiveIndexSource()}) + live_handler = DefaultResourceHandler(live_source) + add_route('/live', live_handler) + + source1 = GeventTimeoutAggregator(sources) + handler1 = DefaultResourceHandler(source1, to_path('testdata/')) + add_route('/many', handler1) + + source2 = SimpleAggregator({'post': FileIndexSource(to_path('testdata/post-test.cdxj'))}) + handler2 = DefaultResourceHandler(source2, to_path('testdata/')) + add_route('/posttest', handler2) + + source3 = SimpleAggregator({'example': FileIndexSource(to_path('testdata/example.cdxj'))}) + handler3 = DefaultResourceHandler(source3, to_path('testdata/')) + + + add_route('/fallback', HandlerSeq([handler3, + handler2, + live_handler])) + + + bottle.debug = True + global testapp + testapp = webtest.TestApp(application) + + +def to_json_list(text): + return list([json.loads(cdx) for cdx in text.rstrip().split('\n')]) + + +class TestResAgg(object): + def setup(self): + self.testapp = testapp + + def test_live_index(self): + resp = self.testapp.get('/live?url=http://httpbin.org/get&mode=index&output=json') + resp.charset = 'utf-8' + + res = to_json_list(resp.text) + res[0]['timestamp'] = '2016' + assert(res == [{'url': 'http://httpbin.org/get', 'urlkey': 'org,httpbin)/get', 'is_live': True, + 'load_url': 'http://httpbin.org/get', 'source': 'live', 'timestamp': '2016'}]) + + def test_live_resource(self): + resp = self.testapp.get('/live?url=http://httpbin.org/get?foo=bar&mode=resource') + + assert resp.headers['WARC-Coll'] == 'live' + assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/get?foo=bar' + assert 'WARC-Date' in resp.headers + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + + def test_live_post_resource(self): + resp = self.testapp.post('/live?url=http://httpbin.org/post&mode=resource', + OrderedDict([('foo', 'bar')])) + + assert resp.headers['WARC-Coll'] == 'live' + assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/post' + assert 'WARC-Date' in resp.headers + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + def test_agg_select_mem_1(self): + resp = self.testapp.get('/many?url=http://vvork.com/&closest=20141001') + + assert resp.headers['WARC-Coll'] == 'rhiz' + assert resp.headers['WARC-Target-URI'] == 'http://www.vvork.com/' + assert resp.headers['WARC-Date'] == '2014-10-06T18:43:57Z' + assert b'HTTP/1.1 200 OK' in resp.body + + + def test_agg_select_mem_2(self): + resp = self.testapp.get('/many?url=http://vvork.com/&closest=20151231') + + assert resp.headers['WARC-Coll'] == 'ia' + assert resp.headers['WARC-Target-URI'] == 'http://vvork.com/' + assert resp.headers['WARC-Date'] == '2016-01-10T13:48:55Z' + assert b'HTTP/1.1 200 OK' in resp.body + + + def test_agg_select_live(self): + resp = self.testapp.get('/many?url=http://vvork.com/&closest=2016') + + assert resp.headers['WARC-Coll'] == 'live' + assert resp.headers['WARC-Target-URI'] == 'http://vvork.com/' + assert resp.headers['WARC-Date'] != '' + + def test_agg_select_local(self): + resp = self.testapp.get('/many?url=http://iana.org/&closest=20140126200624') + + assert resp.headers['WARC-Coll'] == 'local' + assert resp.headers['WARC-Target-URI'] == 'http://www.iana.org/' + assert resp.headers['WARC-Date'] == '2014-01-26T20:06:24Z' + + + def test_agg_select_local_postreq(self): + req_data = """\ +GET / HTTP/1.1 +Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 +User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36 +Host: iana.org +""" + + resp = self.testapp.post('/many/postreq?url=http://iana.org/&closest=20140126200624', req_data) + + assert resp.headers['WARC-Coll'] == 'local' + assert resp.headers['WARC-Target-URI'] == 'http://www.iana.org/' + assert resp.headers['WARC-Date'] == '2014-01-26T20:06:24Z' + + + def test_agg_live_postreq(self): + req_data = """\ +GET /get?foo=bar HTTP/1.1 +Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 +User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36 +Host: httpbin.org +""" + + resp = self.testapp.post('/many/postreq?url=http://httpbin.org/get?foo=bar&closest=now', req_data) + + assert resp.headers['WARC-Coll'] == 'live' + assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/get?foo=bar' + assert 'WARC-Date' in resp.headers + + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + + def test_agg_post_resolve_postreq(self): + req_data = """\ +POST /post HTTP/1.1 +content-length: 16 +accept-encoding: gzip, deflate +accept: */* +host: httpbin.org +content-type: application/x-www-form-urlencoded + +foo=bar&test=abc""" + + resp = self.testapp.post('/posttest/postreq?url=http://httpbin.org/post', req_data) + + assert resp.headers['WARC-Coll'] == 'post' + assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/post' + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + assert b'"test": "abc"' in resp.body + assert b'"url": "http://httpbin.org/post"' in resp.body + + def test_agg_post_resolve_fallback(self): + req_data = OrderedDict([('foo', 'bar'), ('test', 'abc')]) + + resp = self.testapp.post('/fallback?url=http://httpbin.org/post', req_data) + + assert resp.headers['WARC-Coll'] == 'post' + assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/post' + assert b'HTTP/1.1 200 OK' in resp.body + assert b'"foo": "bar"' in resp.body + assert b'"test": "abc"' in resp.body + assert b'"url": "http://httpbin.org/post"' in resp.body + + def test_agg_seq_fallback_1(self): + resp = self.testapp.get('/fallback?url=http://www.iana.org/') + + assert resp.headers['WARC-Coll'] == 'live' + assert resp.headers['WARC-Target-URI'] == 'http://www.iana.org/' + assert b'HTTP/1.1 200 OK' in resp.body + + def test_agg_seq_fallback_2(self): + resp = self.testapp.get('/fallback?url=http://www.example.com/') + + assert resp.headers['WARC-Coll'] == 'example' + assert resp.headers['WARC-Date'] == '2016-02-25T04:23:29Z' + assert resp.headers['WARC-Target-URI'] == 'http://example.com/' + assert b'HTTP/1.1 200 OK' in resp.body + + def test_agg_local_revisit(self): + resp = self.testapp.get('/many?url=http://www.example.com/&closest=20140127171251&sources=local') + + assert resp.headers['WARC-Coll'] == 'local' + assert resp.headers['WARC-Target-URI'] == 'http://example.com' + assert resp.headers['WARC-Date'] == '2014-01-27T17:12:51Z' + assert resp.headers['WARC-Refers-To-Target-URI'] == 'http://example.com' + assert resp.headers['WARC-Refers-To-Date'] == '2014-01-27T17:12:00Z' + assert b'HTTP/1.1 200 OK' in resp.body + assert b'' in resp.body diff --git a/test/test_indexsource.py b/test/test_indexsource.py index b4f81bf1..643bd3e0 100644 --- a/test/test_indexsource.py +++ b/test/test_indexsource.py @@ -5,6 +5,9 @@ from rezag.aggindexsource import SimpleAggregator from pywb.utils.timeutils import timestamp_now +from .testutils import key_ts_res + + import pytest import redis @@ -13,9 +16,6 @@ import fakeredis redis.StrictRedis = fakeredis.FakeStrictRedis redis.Redis = fakeredis.FakeRedis -def key_ts_res(cdxlist, extra='filename'): - return '\n'.join([cdx['urlkey'] + ' ' + cdx['timestamp'] + ' ' + cdx[extra] for cdx in cdxlist]) - def setup_module(): global r r = fakeredis.FakeStrictRedis(db=2) @@ -170,3 +170,16 @@ def test_another_remote_not_found(): assert(key_ts_res(res) == expected) +# ============================================================================ +def test_file_not_found(): + source = FileIndexSource('testdata/not-found-x') + url = 'http://x-not-found-x.notfound/' + res = query_single_source(source, dict(url=url, limit=3)) + + + expected = '' + assert(key_ts_res(res) == expected) + + + + diff --git a/test/test_memento_agg.py b/test/test_memento_agg.py index aff7359f..be49fe9c 100644 --- a/test/test_memento_agg.py +++ b/test/test_memento_agg.py @@ -1,7 +1,11 @@ -from gevent import monkey; monkey.patch_all() +from gevent import monkey; monkey.patch_all(thread=False) + from rezag.aggindexsource import SimpleAggregator, GeventTimeoutAggregator +from rezag.aggindexsource import ThreadedTimeoutAggregator from rezag.indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource +from .testutils import json_list, to_path + import json import pytest @@ -9,26 +13,23 @@ from rezag.handlers import IndexHandler sources = { - 'local': FileIndexSource('testdata/iana.cdxj'), + 'local': FileIndexSource(to_path('testdata/iana.cdxj')), 'ia': MementoIndexSource.from_timegate_url('http://web.archive.org/web/'), 'ait': MementoIndexSource.from_timegate_url('http://wayback.archive-it.org/all/'), 'bl': MementoIndexSource.from_timegate_url('http://www.webarchive.org.uk/wayback/archive/'), 'rhiz': MementoIndexSource.from_timegate_url('http://webenact.rhizome.org/vvork/', path='*') } + +aggs = {'simple': SimpleAggregator(sources), + 'gevent': GeventTimeoutAggregator(sources, timeout=5.0), + 'threaded': ThreadedTimeoutAggregator(sources, timeout=5.0), + 'processes': ThreadedTimeoutAggregator(sources, timeout=5.0, use_processes=True), + } + #@pytest.mark.parametrize("agg", aggs, ids=["simple", "gevent_timeout"]) def pytest_generate_tests(metafunc): - metafunc.parametrize("agg", aggs, ids=["simple", "gevent_timeout"]) - - -aggs = [SimpleAggregator(sources), - GeventTimeoutAggregator(sources, timeout=5.0) - ] - - -def json_list(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source']): - return list([json.loads(cdx.to_json(fields)) for cdx in cdxlist]) - + metafunc.parametrize("agg", list(aggs.values()), ids=list(aggs.keys())) def test_mem_agg_index_1(agg): url = 'http://iana.org/' diff --git a/test/test_timeouts.py b/test/test_timeouts.py new file mode 100644 index 00000000..6a96d464 --- /dev/null +++ b/test/test_timeouts.py @@ -0,0 +1,105 @@ +from gevent import monkey; monkey.patch_all(thread=False) +import time +from rezag.indexsource import FileIndexSource + +from rezag.aggindexsource import SimpleAggregator, TimeoutMixin +from rezag.aggindexsource import GeventTimeoutAggregator, GeventTimeoutAggregator + +from .testutils import json_list + + +class TimeoutFileSource(FileIndexSource): + def __init__(self, filename, timeout): + super(TimeoutFileSource, self).__init__(filename) + self.timeout = timeout + self.calls = 0 + + def load_index(self, params): + self.calls += 1 + print('Sleeping') + time.sleep(self.timeout) + return super(TimeoutFileSource, self).load_index(params) + +TimeoutAggregator = GeventTimeoutAggregator + + + +def setup_module(): + global sources + sources = {'slow': TimeoutFileSource('testdata/example.cdxj', 0.2), + 'slower': TimeoutFileSource('testdata/dupes.cdxj', 0.5) + } + + + +def test_timeout_long_all_pass(): + agg = TimeoutAggregator(sources, timeout=1.0) + + res = agg(dict(url='http://example.com/')) + + exp = [{'source': 'slower', 'timestamp': '20140127171200'}, + {'source': 'slower', 'timestamp': '20140127171251'}, + {'source': 'slow', 'timestamp': '20160225042329'}] + + assert(json_list(res, fields=['source', 'timestamp']) == exp) + + +def test_timeout_slower_skipped_1(): + agg = GeventTimeoutAggregator(sources, timeout=0.49) + + res = agg(dict(url='http://example.com/')) + + exp = [{'source': 'slow', 'timestamp': '20160225042329'}] + + assert(json_list(res, fields=['source', 'timestamp']) == exp) + + + +def test_timeout_slower_skipped_2(): + agg = GeventTimeoutAggregator(sources, timeout=0.19) + + res = agg(dict(url='http://example.com/')) + + exp = [] + + assert(json_list(res, fields=['source', 'timestamp']) == exp) + + + +def test_timeout_skipping(): + assert(sources['slow'].calls == 3) + assert(sources['slower'].calls == 3) + + agg = GeventTimeoutAggregator(sources, timeout=0.49, + t_count=2, t_duration=2.0) + + exp = [{'source': 'slow', 'timestamp': '20160225042329'}] + + res = agg(dict(url='http://example.com/')) + assert(json_list(res, fields=['source', 'timestamp']) == exp) + assert(sources['slow'].calls == 4) + assert(sources['slower'].calls == 4) + + res = agg(dict(url='http://example.com/')) + assert(json_list(res, fields=['source', 'timestamp']) == exp) + assert(sources['slow'].calls == 5) + assert(sources['slower'].calls == 5) + + res = agg(dict(url='http://example.com/')) + assert(json_list(res, fields=['source', 'timestamp']) == exp) + assert(sources['slow'].calls == 6) + assert(sources['slower'].calls == 5) + + res = agg(dict(url='http://example.com/')) + assert(json_list(res, fields=['source', 'timestamp']) == exp) + assert(sources['slow'].calls == 7) + assert(sources['slower'].calls == 5) + + time.sleep(2.01) + + res = agg(dict(url='http://example.com/')) + assert(json_list(res, fields=['source', 'timestamp']) == exp) + assert(sources['slow'].calls == 8) + assert(sources['slower'].calls == 6) + + diff --git a/test/testutils.py b/test/testutils.py new file mode 100644 index 00000000..b9f8ab98 --- /dev/null +++ b/test/testutils.py @@ -0,0 +1,16 @@ +import json +import os + +def json_list(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source']): + return list([json.loads(cdx.to_json(fields)) for cdx in cdxlist]) + +def key_ts_res(cdxlist, extra='filename'): + return '\n'.join([cdx['urlkey'] + ' ' + cdx['timestamp'] + ' ' + cdx[extra] for cdx in cdxlist]) + +def to_path(path): + if os.path.sep != '/': + path = path.replace('/', os.path.sep) + + return path + + diff --git a/testdata/dupes.cdxj b/testdata/dupes.cdxj new file mode 100644 index 00000000..6d42a7b1 --- /dev/null +++ b/testdata/dupes.cdxj @@ -0,0 +1,12 @@ +com,example)/ 20140127171200 {"url": "http://example.com", "mime": "text/html", "status": "200", "digest": "B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", "length": "1046", "offset": "334", "filename": "dupes.warc.gz"} +com,example)/ 20140127171251 {"url": "http://example.com", "mime": "warc/revisit", "digest": "B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", "length": "553", "offset": "11875", "filename": "dupes.warc.gz"} +org,iana)/ 20140127171238 {"url": "http://iana.org", "mime": "unk", "status": "302", "digest": "3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ", "length": "343", "offset": "1858", "filename": "dupes.warc.gz"} +org,iana)/ 20140127171238 {"url": "http://www.iana.org/", "mime": "warc/revisit", "digest": "OSSAPWJ23L56IYVRW3GFEAR4MCJMGPTB", "length": "536", "offset": "2678", "filename": "dupes.warc.gz"} +org,iana)/_css/2013.1/fonts/opensans-bold.ttf 20140127171240 {"url": "http://www.iana.org/_css/2013.1/fonts/OpenSans-Bold.ttf", "mime": "warc/revisit", "digest": "YFUR5ALIWJMWV6FAAFRLVRQNXZQF5HRW", "length": "556", "offset": "10826", "filename": "dupes.warc.gz"} +org,iana)/_css/2013.1/fonts/opensans-regular.ttf 20140127171240 {"url": "http://www.iana.org/_css/2013.1/fonts/OpenSans-Regular.ttf", "mime": "warc/revisit", "digest": "GVSO2C2TMPPVZ4TXYFXAY27NYWTIEIL7", "length": "540", "offset": "9793", "filename": "dupes.warc.gz"} +org,iana)/_css/2013.1/print.css 20140127171239 {"url": "http://www.iana.org/_css/2013.1/print.css", "mime": "warc/revisit", "digest": "VNBXHMUNWJQC5OWWGZ3X7GM5C7X6ZAB4", "length": "537", "offset": "6684", "filename": "dupes.warc.gz"} +org,iana)/_css/2013.1/screen.css 20140127171239 {"url": "http://www.iana.org/_css/2013.1/screen.css", "mime": "warc/revisit", "digest": "BUAEPXZNN44AIX3NLXON4QDV6OY2H5QD", "length": "541", "offset": "4630", "filename": "dupes.warc.gz"} +org,iana)/_img/2013.1/iana-logo-homepage.png 20140127171240 {"url": "http://www.iana.org/_img/2013.1/iana-logo-homepage.png", "mime": "warc/revisit", "digest": "GCW2GM3SIMHEIQYZX25MLSRYVWUCZ7OK", "length": "549", "offset": "8750", "filename": "dupes.warc.gz"} +org,iana)/_img/2013.1/icann-logo.svg 20140127171239 {"url": "http://www.iana.org/_img/2013.1/icann-logo.svg", "mime": "warc/revisit", "digest": "HGRZHOH73EFQQWBYWBSOIV2UU5JDTSGJ", "length": "549", "offset": "7709", "filename": "dupes.warc.gz"} +org,iana)/_js/2013.1/iana.js 20140127171239 {"url": "http://www.iana.org/_js/2013.1/iana.js", "mime": "application/x-javascript", "status": "200", "digest": "3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ", "length": "457", "offset": "3696", "filename": "dupes.warc.gz"} +org,iana)/_js/2013.1/jquery.js 20140127171239 {"url": "http://www.iana.org/_js/2013.1/jquery.js", "mime": "warc/revisit", "digest": "AAW2RS7JB7HTF666XNZDQYJFA6PDQBPO", "length": "547", "offset": "5658", "filename": "dupes.warc.gz"} diff --git a/testdata/dupes.warc.gz b/testdata/dupes.warc.gz new file mode 100644 index 00000000..48e6b6fd Binary files /dev/null and b/testdata/dupes.warc.gz differ diff --git a/testdata/example.warc.gz b/testdata/example.warc.gz new file mode 100644 index 00000000..143b947d Binary files /dev/null and b/testdata/example.warc.gz differ diff --git a/testdata/iana.warc.gz b/testdata/iana.warc.gz new file mode 100644 index 00000000..3a88a71a Binary files /dev/null and b/testdata/iana.warc.gz differ diff --git a/testdata/post-test.cdxj b/testdata/post-test.cdxj new file mode 100644 index 00000000..5856b8b1 --- /dev/null +++ b/testdata/post-test.cdxj @@ -0,0 +1,3 @@ +org,httpbin)/post?foo=bar&test=abc 20140610000859 {"url": "http://httpbin.org/post", "mime": "application/json", "status": "200", "digest": "M532K5WS4GY2H4OVZO6HRPOP47A7KDWU", "length": "720", "offset": "0", "filename": "post-test.warc.gz"} +org,httpbin)/post?a=1&b=[]&c=3 20140610001151 {"url": "http://httpbin.org/post", "mime": "application/json", "status": "200", "digest": "M7YCTM7HS3YKYQTAWQVMQSQZBNEOXGU2", "length": "723", "offset": "1196", "filename": "post-test.warc.gz"} +org,httpbin)/post?data=^&foo=bar 20140610001255 {"url": "http://httpbin.org/post?foo=bar", "mime": "application/json", "status": "200", "digest": "B6E5P6JUZI6UPDTNO4L2BCHMGLTNCUAJ", "length": "723", "offset": "2395", "filename": "post-test.warc.gz"} diff --git a/testdata/post-test.warc.gz b/testdata/post-test.warc.gz new file mode 100644 index 00000000..b9cc1f48 Binary files /dev/null and b/testdata/post-test.warc.gz differ