diff --git a/webagg/inputrequest.py b/webagg/inputrequest.py index 207db397..e7f13a85 100644 --- a/webagg/inputrequest.py +++ b/webagg/inputrequest.py @@ -113,6 +113,9 @@ class DirectWSGIInputRequest(object): buff.write('\r\n') for name, value in iteritems(headers): + if name.lower() == 'host': + continue + buff.write(name) buff.write(': ') buff.write(value) diff --git a/webagg/responseloader.py b/webagg/responseloader.py index 94a1f153..33228444 100644 --- a/webagg/responseloader.py +++ b/webagg/responseloader.py @@ -1,4 +1,5 @@ -from webagg.utils import MementoUtils, StreamIter +from webagg.utils import MementoUtils, StreamIter, chunk_encode_iter +from webagg.indexsource import RedisIndexSource from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_timestamp from pywb.utils.timeutils import iso_date_to_datetime, datetime_to_iso_date @@ -58,7 +59,7 @@ class BaseLoader(object): if not lenset: out_headers['Transfer-Encoding'] = 'chunked' - streamiter = self._chunk_encode(streamiter) + streamiter = chunk_encode_iter(streamiter) return out_headers, streamiter @@ -76,17 +77,32 @@ class BaseLoader(object): return False - @staticmethod - def _chunk_encode(orig_iter): - for chunk in orig_iter: - if not len(chunk): - continue - chunk_len = b'%X\r\n' % len(chunk) - yield chunk_len - yield chunk - yield b'\r\n' - yield b'0\r\n\r\n' +#============================================================================= +class PrefixResolver(object): + def __init__(self, template): + self.template = template + + def __call__(self, filename, cdx): + full_path = self.template + if hasattr(cdx, '_formatter') and cdx._formatter: + full_path = cdx._formatter.format(full_path) + + return full_path + filename + + +#============================================================================= +class RedisResolver(RedisIndexSource): + def __call__(self, filename, cdx): + redis_key = self.redis_key_template + if hasattr(cdx, '_formatter') and cdx._formatter: + redis_key = cdx._formatter.format(redis_key) + + res = self.redis.hget(redis_key, filename) + if res: + res = res.decode('utf-8') + + return res #============================================================================= @@ -96,9 +112,9 @@ class WARCPathLoader(BaseLoader): if isinstance(paths, str): self.paths = [paths] - self.path_checks = list(self.warc_paths()) + self.resolvers = [self._make_resolver(path) for path in self.paths] - self.resolve_loader = ResolvingLoader(self.path_checks, + self.resolve_loader = ResolvingLoader(self.resolvers, no_record_parse=True) self.cdx_source = cdx_source @@ -106,20 +122,15 @@ class WARCPathLoader(BaseLoader): cdx_iter, errs = self.cdx_source(*args, **kwargs) return cdx_iter - def warc_paths(self): - for path in self.paths: - def check(filename, cdx): - try: - if hasattr(cdx, '_formatter') and cdx._formatter: - full_path = cdx._formatter.format(path) - else: - full_path = path - full_path += filename - return full_path - except KeyError: - return None + def _make_resolver(self, path): + if hasattr(path, '__call__'): + return path - yield check + if path.startswith('redis://'): + return RedisResolver(path) + + else: + return PrefixResolver(path) def load_resource(self, cdx, params): if cdx.get('_cached_result'): diff --git a/webagg/test/live.ini b/webagg/test/live.ini index c4e4e10c..f63d5896 100644 --- a/webagg/test/live.ini +++ b/webagg/test/live.ini @@ -12,6 +12,6 @@ venv = $(VIRTUAL_ENV) endif = gevent = 100 -gevent-early-monkey-patch = +gevent-monkey-patch = wsgi = webagg.test.live diff --git a/webagg/test/live.py b/webagg/test/live.py index 21aa73f7..e24084fa 100644 --- a/webagg/test/live.py +++ b/webagg/test/live.py @@ -1,4 +1,44 @@ +from gevent.monkey import patch_all; patch_all() + from webagg.test.testutils import LiveServerTests +from webagg.handlers import DefaultResourceHandler +from webagg.app import ResAggApp +from webagg.indexsource import LiveIndexSource, RedisIndexSource +from webagg.aggregator import SimpleAggregator, CacheDirectoryIndexSource -application = LiveServerTests.make_live_app() +def simpleapp(): + app = ResAggApp() + app.add_route('/live', + DefaultResourceHandler(SimpleAggregator( + {'live': LiveIndexSource()}) + ) + ) + + app.add_route('/replay', + DefaultResourceHandler(SimpleAggregator( + {'replay': RedisIndexSource('redis://localhost/2/rec:cdxj')}), + 'redis://localhost/2/rec:warc' + ) + ) + + app.add_route('/replay-testdata', + DefaultResourceHandler(SimpleAggregator( + {'test': CacheDirectoryIndexSource('./testdata/')}), + './testdata/' + ) + ) + return app.application + + + +application = simpleapp() + + +if __name__ == "__main__": +# from bottle import run +# run(application, server='gevent', port=8080, fast=True) + + from gevent.wsgi import WSGIServer + server = WSGIServer(('', 8080), application) + server.serve_forever() diff --git a/webagg/test/test_upstream.py b/webagg/test/test_upstream.py index cd107811..037b62e9 100644 --- a/webagg/test/test_upstream.py +++ b/webagg/test/test_upstream.py @@ -36,7 +36,7 @@ class TestUpstream(LiveServerTests, BaseTestClass): def test_live_paths(self): res = requests.get(self.base_url + '/') - assert set(res.json().keys()) == {'/live/postreq', '/live', '/replay/postreq', '/replay'} + assert set(res.json().keys()) == {'/live/postreq', '/live'} def test_upstream_paths(self): res = self.testapp.get('/') diff --git a/webagg/test/testutils.py b/webagg/test/testutils.py index c46e17f1..4c5c42b6 100644 --- a/webagg/test/testutils.py +++ b/webagg/test/testutils.py @@ -7,7 +7,7 @@ from multiprocessing import Process from wsgiref.simple_server import make_server -from webagg.aggregator import SimpleAggregator, CacheDirectoryIndexSource +from webagg.aggregator import SimpleAggregator from webagg.app import ResAggApp from webagg.handlers import DefaultResourceHandler from webagg.indexsource import LiveIndexSource @@ -66,12 +66,6 @@ class LiveServerTests(object): {'live': LiveIndexSource()}) ) ) - app.add_route('/replay', - DefaultResourceHandler(SimpleAggregator( - {'replay': CacheDirectoryIndexSource('./testdata/')}), - './testdata/' - ) - ) return app.application @classmethod diff --git a/webagg/utils.py b/webagg/utils.py index e71357c7..79c8bcbc 100644 --- a/webagg/utils.py +++ b/webagg/utils.py @@ -1,7 +1,8 @@ import re import six import string -import time + +from contextlib import closing from pywb.utils.timeutils import timestamp_to_http_date from pywb.utils.wbexception import BadRequestException @@ -11,7 +12,7 @@ LINK_SEG_SPLIT = re.compile(';\s*') LINK_URL = re.compile('<(.*)>') LINK_PROP = re.compile('([\w]+)="([^"]+)') -BUFF_SIZE = 8192 +BUFF_SIZE = 16384 #============================================================================= @@ -146,81 +147,31 @@ def res_template(template, params): #============================================================================= -class ReadFullyStream(object): - def __init__(self, stream): - self.stream = stream +def StreamIter(stream, header1=None, header2=None, size=BUFF_SIZE): + with closing(stream): + if header1: + yield header1 - def read(self, *args, **kwargs): - try: - return self.stream.read(*args, **kwargs) - except: - self.mark_incomplete() - raise + if header2: + yield header2 - def readline(self, *args, **kwargs): - try: - return self.stream.readline(*args, **kwargs) - except: - self.mark_incomplete() - raise - - def mark_incomplete(self): - if (hasattr(self.stream, '_fp') and - hasattr(self.stream._fp, 'mark_incomplete')): - self.stream._fp.mark_incomplete() - - def close(self): - try: - while True: - buff = self.stream.read(BUFF_SIZE) - time.sleep(0) - if not buff: - break - - except Exception as e: - import traceback - traceback.print_exc() - self.mark_incomplete() - finally: - self.stream.close() + while True: + buff = stream.read(size) + if not buff: + break + yield buff #============================================================================= -class StreamIter(six.Iterator): - def __init__(self, stream, header1=None, header2=None, size=8192): - self.stream = stream - self.header1 = header1 - self.header2 = header2 - self.size = size +def chunk_encode_iter(orig_iter): + for chunk in orig_iter: + if not len(chunk): + continue + chunk_len = b'%X\r\n' % len(chunk) + yield chunk_len + yield chunk + yield b'\r\n' - def __iter__(self): - return self - - def __next__(self): - if self.header1: - header = self.header1 - self.header1 = None - return header - elif self.header2: - header = self.header2 - self.header2 = None - return header - - data = self.stream.read(self.size) - if data: - return data - - self.close() - raise StopIteration - - def close(self): - if not self.stream: - return - - try: - self.stream.close() - self.stream = None - except Exception: - pass + yield b'0\r\n\r\n'