1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-24 06:59:52 +01:00

webagg: convert StreamIter to generate, remove unused ReadFullyStream

loaders: add support for RedisResolver as well as PathPrefixResolver
inputreq: reconstruct_request() skips host header if already present
improve test app to include replay
This commit is contained in:
Ilya Kreymer 2016-03-21 11:04:52 -07:00
parent 4cf935abd1
commit 22ead52604
7 changed files with 108 additions and 109 deletions

View File

@ -113,6 +113,9 @@ class DirectWSGIInputRequest(object):
buff.write('\r\n') buff.write('\r\n')
for name, value in iteritems(headers): for name, value in iteritems(headers):
if name.lower() == 'host':
continue
buff.write(name) buff.write(name)
buff.write(': ') buff.write(': ')
buff.write(value) buff.write(value)

View File

@ -1,4 +1,5 @@
from webagg.utils import MementoUtils, StreamIter from webagg.utils import MementoUtils, StreamIter, chunk_encode_iter
from webagg.indexsource import RedisIndexSource
from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_timestamp from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_timestamp
from pywb.utils.timeutils import iso_date_to_datetime, datetime_to_iso_date from pywb.utils.timeutils import iso_date_to_datetime, datetime_to_iso_date
@ -58,7 +59,7 @@ class BaseLoader(object):
if not lenset: if not lenset:
out_headers['Transfer-Encoding'] = 'chunked' out_headers['Transfer-Encoding'] = 'chunked'
streamiter = self._chunk_encode(streamiter) streamiter = chunk_encode_iter(streamiter)
return out_headers, streamiter return out_headers, streamiter
@ -76,17 +77,32 @@ class BaseLoader(object):
return False return False
@staticmethod
def _chunk_encode(orig_iter):
for chunk in orig_iter:
if not len(chunk):
continue
chunk_len = b'%X\r\n' % len(chunk)
yield chunk_len
yield chunk
yield b'\r\n'
yield b'0\r\n\r\n' #=============================================================================
class PrefixResolver(object):
def __init__(self, template):
self.template = template
def __call__(self, filename, cdx):
full_path = self.template
if hasattr(cdx, '_formatter') and cdx._formatter:
full_path = cdx._formatter.format(full_path)
return full_path + filename
#=============================================================================
class RedisResolver(RedisIndexSource):
def __call__(self, filename, cdx):
redis_key = self.redis_key_template
if hasattr(cdx, '_formatter') and cdx._formatter:
redis_key = cdx._formatter.format(redis_key)
res = self.redis.hget(redis_key, filename)
if res:
res = res.decode('utf-8')
return res
#============================================================================= #=============================================================================
@ -96,9 +112,9 @@ class WARCPathLoader(BaseLoader):
if isinstance(paths, str): if isinstance(paths, str):
self.paths = [paths] self.paths = [paths]
self.path_checks = list(self.warc_paths()) self.resolvers = [self._make_resolver(path) for path in self.paths]
self.resolve_loader = ResolvingLoader(self.path_checks, self.resolve_loader = ResolvingLoader(self.resolvers,
no_record_parse=True) no_record_parse=True)
self.cdx_source = cdx_source self.cdx_source = cdx_source
@ -106,20 +122,15 @@ class WARCPathLoader(BaseLoader):
cdx_iter, errs = self.cdx_source(*args, **kwargs) cdx_iter, errs = self.cdx_source(*args, **kwargs)
return cdx_iter return cdx_iter
def warc_paths(self): def _make_resolver(self, path):
for path in self.paths: if hasattr(path, '__call__'):
def check(filename, cdx): return path
try:
if hasattr(cdx, '_formatter') and cdx._formatter:
full_path = cdx._formatter.format(path)
else:
full_path = path
full_path += filename
return full_path
except KeyError:
return None
yield check if path.startswith('redis://'):
return RedisResolver(path)
else:
return PrefixResolver(path)
def load_resource(self, cdx, params): def load_resource(self, cdx, params):
if cdx.get('_cached_result'): if cdx.get('_cached_result'):

View File

@ -12,6 +12,6 @@ venv = $(VIRTUAL_ENV)
endif = endif =
gevent = 100 gevent = 100
gevent-early-monkey-patch = gevent-monkey-patch =
wsgi = webagg.test.live wsgi = webagg.test.live

View File

@ -1,4 +1,44 @@
from gevent.monkey import patch_all; patch_all()
from webagg.test.testutils import LiveServerTests from webagg.test.testutils import LiveServerTests
from webagg.handlers import DefaultResourceHandler
from webagg.app import ResAggApp
from webagg.indexsource import LiveIndexSource, RedisIndexSource
from webagg.aggregator import SimpleAggregator, CacheDirectoryIndexSource
application = LiveServerTests.make_live_app() def simpleapp():
app = ResAggApp()
app.add_route('/live',
DefaultResourceHandler(SimpleAggregator(
{'live': LiveIndexSource()})
)
)
app.add_route('/replay',
DefaultResourceHandler(SimpleAggregator(
{'replay': RedisIndexSource('redis://localhost/2/rec:cdxj')}),
'redis://localhost/2/rec:warc'
)
)
app.add_route('/replay-testdata',
DefaultResourceHandler(SimpleAggregator(
{'test': CacheDirectoryIndexSource('./testdata/')}),
'./testdata/'
)
)
return app.application
application = simpleapp()
if __name__ == "__main__":
# from bottle import run
# run(application, server='gevent', port=8080, fast=True)
from gevent.wsgi import WSGIServer
server = WSGIServer(('', 8080), application)
server.serve_forever()

View File

@ -36,7 +36,7 @@ class TestUpstream(LiveServerTests, BaseTestClass):
def test_live_paths(self): def test_live_paths(self):
res = requests.get(self.base_url + '/') res = requests.get(self.base_url + '/')
assert set(res.json().keys()) == {'/live/postreq', '/live', '/replay/postreq', '/replay'} assert set(res.json().keys()) == {'/live/postreq', '/live'}
def test_upstream_paths(self): def test_upstream_paths(self):
res = self.testapp.get('/') res = self.testapp.get('/')

View File

@ -7,7 +7,7 @@ from multiprocessing import Process
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
from webagg.aggregator import SimpleAggregator, CacheDirectoryIndexSource from webagg.aggregator import SimpleAggregator
from webagg.app import ResAggApp from webagg.app import ResAggApp
from webagg.handlers import DefaultResourceHandler from webagg.handlers import DefaultResourceHandler
from webagg.indexsource import LiveIndexSource from webagg.indexsource import LiveIndexSource
@ -66,12 +66,6 @@ class LiveServerTests(object):
{'live': LiveIndexSource()}) {'live': LiveIndexSource()})
) )
) )
app.add_route('/replay',
DefaultResourceHandler(SimpleAggregator(
{'replay': CacheDirectoryIndexSource('./testdata/')}),
'./testdata/'
)
)
return app.application return app.application
@classmethod @classmethod

View File

@ -1,7 +1,8 @@
import re import re
import six import six
import string import string
import time
from contextlib import closing
from pywb.utils.timeutils import timestamp_to_http_date from pywb.utils.timeutils import timestamp_to_http_date
from pywb.utils.wbexception import BadRequestException from pywb.utils.wbexception import BadRequestException
@ -11,7 +12,7 @@ LINK_SEG_SPLIT = re.compile(';\s*')
LINK_URL = re.compile('<(.*)>') LINK_URL = re.compile('<(.*)>')
LINK_PROP = re.compile('([\w]+)="([^"]+)') LINK_PROP = re.compile('([\w]+)="([^"]+)')
BUFF_SIZE = 8192 BUFF_SIZE = 16384
#============================================================================= #=============================================================================
@ -146,81 +147,31 @@ def res_template(template, params):
#============================================================================= #=============================================================================
class ReadFullyStream(object): def StreamIter(stream, header1=None, header2=None, size=BUFF_SIZE):
def __init__(self, stream): with closing(stream):
self.stream = stream if header1:
yield header1
def read(self, *args, **kwargs): if header2:
try: yield header2
return self.stream.read(*args, **kwargs)
except:
self.mark_incomplete()
raise
def readline(self, *args, **kwargs): while True:
try: buff = stream.read(size)
return self.stream.readline(*args, **kwargs) if not buff:
except: break
self.mark_incomplete() yield buff
raise
def mark_incomplete(self):
if (hasattr(self.stream, '_fp') and
hasattr(self.stream._fp, 'mark_incomplete')):
self.stream._fp.mark_incomplete()
def close(self):
try:
while True:
buff = self.stream.read(BUFF_SIZE)
time.sleep(0)
if not buff:
break
except Exception as e:
import traceback
traceback.print_exc()
self.mark_incomplete()
finally:
self.stream.close()
#============================================================================= #=============================================================================
class StreamIter(six.Iterator): def chunk_encode_iter(orig_iter):
def __init__(self, stream, header1=None, header2=None, size=8192): for chunk in orig_iter:
self.stream = stream if not len(chunk):
self.header1 = header1 continue
self.header2 = header2 chunk_len = b'%X\r\n' % len(chunk)
self.size = size yield chunk_len
yield chunk
yield b'\r\n'
def __iter__(self): yield b'0\r\n\r\n'
return self
def __next__(self):
if self.header1:
header = self.header1
self.header1 = None
return header
elif self.header2:
header = self.header2
self.header2 = None
return header
data = self.stream.read(self.size)
if data:
return data
self.close()
raise StopIteration
def close(self):
if not self.stream:
return
try:
self.stream.close()
self.stream = None
except Exception:
pass