1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-24 06:59:52 +01:00

loaders: return full WARC record in response, no need for upstream response handler

add UpstreamAggIndexSource to simplify upstream aggregator config, add test for upstream config
bottle app: wrap in a ResAppAgg, allow multiple bottle apps
py2: non-gevent concurrency not supported
This commit is contained in:
Ilya Kreymer 2016-03-06 23:10:30 -08:00
parent 0823ff4bd0
commit c1895ae70f
6 changed files with 241 additions and 213 deletions

View File

@ -8,9 +8,12 @@ from webagg.indexsource import MementoIndexSource, FileIndexSource, LiveIndexSou
from webagg.aggregator import GeventTimeoutAggregator, SimpleAggregator from webagg.aggregator import GeventTimeoutAggregator, SimpleAggregator
from webagg.aggregator import DirectoryIndexSource from webagg.aggregator import DirectoryIndexSource
from webagg.app import add_route, application from webagg.app import ResAggApp
from webagg.utils import MementoUtils from webagg.utils import MementoUtils
from pywb.utils.statusandheaders import StatusAndHeadersParser
from io import BytesIO
import webtest import webtest
import bottle import bottle
@ -30,32 +33,32 @@ testapp = None
def setup_module(self): def setup_module(self):
live_source = SimpleAggregator({'live': LiveIndexSource()}) live_source = SimpleAggregator({'live': LiveIndexSource()})
live_handler = DefaultResourceHandler(live_source) live_handler = DefaultResourceHandler(live_source)
add_route('/live', live_handler) app = ResAggApp()
app.add_route('/live', live_handler)
source1 = GeventTimeoutAggregator(sources) source1 = GeventTimeoutAggregator(sources)
handler1 = DefaultResourceHandler(source1, to_path('testdata/')) handler1 = DefaultResourceHandler(source1, to_path('testdata/'))
add_route('/many', handler1) app.add_route('/many', handler1)
source2 = SimpleAggregator({'post': FileIndexSource(to_path('testdata/post-test.cdxj'))}) source2 = SimpleAggregator({'post': FileIndexSource(to_path('testdata/post-test.cdxj'))})
handler2 = DefaultResourceHandler(source2, to_path('testdata/')) handler2 = DefaultResourceHandler(source2, to_path('testdata/'))
add_route('/posttest', handler2) app.add_route('/posttest', handler2)
source3 = SimpleAggregator({'example': FileIndexSource(to_path('testdata/example.cdxj'))}) source3 = SimpleAggregator({'example': FileIndexSource(to_path('testdata/example.cdxj'))})
handler3 = DefaultResourceHandler(source3, to_path('testdata/')) handler3 = DefaultResourceHandler(source3, to_path('testdata/'))
add_route('/fallback', HandlerSeq([handler3, app.add_route('/fallback', HandlerSeq([handler3,
handler2, handler2,
live_handler])) live_handler]))
add_route('/seq', HandlerSeq([handler3, app.add_route('/seq', HandlerSeq([handler3,
handler2])) handler2]))
add_route('/empty', HandlerSeq([])) app.add_route('/empty', HandlerSeq([]))
add_route('/invalid', DefaultResourceHandler([SimpleAggregator({'invalid': 'should not be a callable'})])) app.add_route('/invalid', DefaultResourceHandler([SimpleAggregator({'invalid': 'should not be a callable'})]))
application.debug = True
global testapp global testapp
testapp = webtest.TestApp(application) testapp = webtest.TestApp(app.application)
def to_json_list(text): def to_json_list(text):
@ -66,6 +69,15 @@ class TestResAgg(object):
def setup(self): def setup(self):
self.testapp = testapp self.testapp = testapp
def _check_uri_date(self, resp, uri, dt):
buff = BytesIO(resp.body)
status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(buff)
assert status_headers.get_header('WARC-Target-URI') == uri
if dt == True:
assert status_headers.get_header('WARC-Date') != ''
else:
assert status_headers.get_header('WARC-Date') == dt
def test_list_routes(self): def test_list_routes(self):
resp = self.testapp.get('/') resp = self.testapp.get('/')
res = resp.json res = resp.json
@ -120,9 +132,9 @@ class TestResAgg(object):
headers = {'foo': 'bar'} headers = {'foo': 'bar'}
resp = self.testapp.get('/live/resource?url=http://httpbin.org/get?foo=bar', headers=headers) resp = self.testapp.get('/live/resource?url=http://httpbin.org/get?foo=bar', headers=headers)
assert resp.headers['WARC-Coll'] == 'live' assert resp.headers['Source-Coll'] == 'live'
assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/get?foo=bar'
assert resp.headers['WARC-Date'] != '' self._check_uri_date(resp, 'http://httpbin.org/get?foo=bar', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/get?foo=bar', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/get?foo=bar', 'original')
assert resp.headers['Memento-Datetime'] != '' assert resp.headers['Memento-Datetime'] != ''
@ -136,9 +148,9 @@ class TestResAgg(object):
resp = self.testapp.post('/live/resource?url=http://httpbin.org/post', resp = self.testapp.post('/live/resource?url=http://httpbin.org/post',
OrderedDict([('foo', 'bar')])) OrderedDict([('foo', 'bar')]))
assert resp.headers['WARC-Coll'] == 'live' assert resp.headers['Source-Coll'] == 'live'
assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/post'
assert resp.headers['WARC-Date'] != '' self._check_uri_date(resp, 'http://httpbin.org/post', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/post', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/post', 'original')
assert resp.headers['Memento-Datetime'] != '' assert resp.headers['Memento-Datetime'] != ''
@ -151,9 +163,10 @@ class TestResAgg(object):
def test_agg_select_mem_1(self): def test_agg_select_mem_1(self):
resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20141001') resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20141001')
assert resp.headers['WARC-Coll'] == 'rhiz' assert resp.headers['Source-Coll'] == 'rhiz'
assert resp.headers['WARC-Target-URI'] == 'http://www.vvork.com/'
assert resp.headers['WARC-Date'] == '2014-10-06T18:43:57Z' self._check_uri_date(resp, 'http://www.vvork.com/', '2014-10-06T18:43:57Z')
assert b'HTTP/1.1 200 OK' in resp.body assert b'HTTP/1.1 200 OK' in resp.body
assert resp.headers['Link'] == MementoUtils.make_link('http://www.vvork.com/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://www.vvork.com/', 'original')
@ -164,9 +177,10 @@ class TestResAgg(object):
def test_agg_select_mem_2(self): def test_agg_select_mem_2(self):
resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20151231') resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=20151231')
assert resp.headers['WARC-Coll'] == 'ia' assert resp.headers['Source-Coll'] == 'ia'
assert resp.headers['WARC-Target-URI'] == 'http://vvork.com/'
assert resp.headers['WARC-Date'] == '2016-01-10T13:48:55Z' self._check_uri_date(resp, 'http://vvork.com/', '2016-01-10T13:48:55Z')
assert b'HTTP/1.1 200 OK' in resp.body assert b'HTTP/1.1 200 OK' in resp.body
assert resp.headers['Link'] == MementoUtils.make_link('http://vvork.com/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://vvork.com/', 'original')
@ -177,9 +191,9 @@ class TestResAgg(object):
def test_agg_select_live(self): def test_agg_select_live(self):
resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=2016') resp = self.testapp.get('/many/resource?url=http://vvork.com/&closest=2016')
assert resp.headers['WARC-Coll'] == 'live' assert resp.headers['Source-Coll'] == 'live'
assert resp.headers['WARC-Target-URI'] == 'http://vvork.com/'
assert resp.headers['WARC-Date'] != '' self._check_uri_date(resp, 'http://vvork.com/', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://vvork.com/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://vvork.com/', 'original')
assert resp.headers['Memento-Datetime'] != '' assert resp.headers['Memento-Datetime'] != ''
@ -189,9 +203,9 @@ class TestResAgg(object):
def test_agg_select_local(self): def test_agg_select_local(self):
resp = self.testapp.get('/many/resource?url=http://iana.org/&closest=20140126200624') resp = self.testapp.get('/many/resource?url=http://iana.org/&closest=20140126200624')
assert resp.headers['WARC-Coll'] == 'local' assert resp.headers['Source-Coll'] == 'local'
assert resp.headers['WARC-Target-URI'] == 'http://www.iana.org/'
assert resp.headers['WARC-Date'] == '2014-01-26T20:06:24Z' self._check_uri_date(resp, 'http://www.iana.org/', '2014-01-26T20:06:24Z')
assert resp.headers['Link'] == MementoUtils.make_link('http://www.iana.org/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://www.iana.org/', 'original')
assert resp.headers['Memento-Datetime'] == 'Sun, 26 Jan 2014 20:06:24 GMT' assert resp.headers['Memento-Datetime'] == 'Sun, 26 Jan 2014 20:06:24 GMT'
@ -208,9 +222,9 @@ Host: iana.org
resp = self.testapp.post('/many/resource/postreq?url=http://iana.org/&closest=20140126200624', req_data) resp = self.testapp.post('/many/resource/postreq?url=http://iana.org/&closest=20140126200624', req_data)
assert resp.headers['WARC-Coll'] == 'local' assert resp.headers['Source-Coll'] == 'local'
assert resp.headers['WARC-Target-URI'] == 'http://www.iana.org/'
assert resp.headers['WARC-Date'] == '2014-01-26T20:06:24Z' self._check_uri_date(resp, 'http://www.iana.org/', '2014-01-26T20:06:24Z')
assert resp.headers['Link'] == MementoUtils.make_link('http://www.iana.org/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://www.iana.org/', 'original')
assert resp.headers['Memento-Datetime'] == 'Sun, 26 Jan 2014 20:06:24 GMT' assert resp.headers['Memento-Datetime'] == 'Sun, 26 Jan 2014 20:06:24 GMT'
@ -227,9 +241,9 @@ Host: httpbin.org
resp = self.testapp.post('/many/resource/postreq?url=http://httpbin.org/get?foo=bar&closest=now', req_data) resp = self.testapp.post('/many/resource/postreq?url=http://httpbin.org/get?foo=bar&closest=now', req_data)
assert resp.headers['WARC-Coll'] == 'live' assert resp.headers['Source-Coll'] == 'live'
assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/get?foo=bar'
assert resp.headers['WARC-Date'] != '' self._check_uri_date(resp, 'http://httpbin.org/get?foo=bar', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/get?foo=bar', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/get?foo=bar', 'original')
assert resp.headers['Memento-Datetime'] != '' assert resp.headers['Memento-Datetime'] != ''
@ -252,9 +266,9 @@ foo=bar&test=abc"""
resp = self.testapp.post('/posttest/resource/postreq?url=http://httpbin.org/post', req_data) resp = self.testapp.post('/posttest/resource/postreq?url=http://httpbin.org/post', req_data)
assert resp.headers['WARC-Coll'] == 'post' assert resp.headers['Source-Coll'] == 'post'
assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/post'
assert resp.headers['WARC-Date'] != '' self._check_uri_date(resp, 'http://httpbin.org/post', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/post', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/post', 'original')
assert resp.headers['Memento-Datetime'] != '' assert resp.headers['Memento-Datetime'] != ''
@ -271,8 +285,10 @@ foo=bar&test=abc"""
resp = self.testapp.post('/fallback/resource?url=http://httpbin.org/post', req_data) resp = self.testapp.post('/fallback/resource?url=http://httpbin.org/post', req_data)
assert resp.headers['WARC-Coll'] == 'post' assert resp.headers['Source-Coll'] == 'post'
assert resp.headers['WARC-Target-URI'] == 'http://httpbin.org/post'
self._check_uri_date(resp, 'http://httpbin.org/post', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/post', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://httpbin.org/post', 'original')
assert b'HTTP/1.1 200 OK' in resp.body assert b'HTTP/1.1 200 OK' in resp.body
@ -285,8 +301,10 @@ foo=bar&test=abc"""
def test_agg_seq_fallback_1(self): def test_agg_seq_fallback_1(self):
resp = self.testapp.get('/fallback/resource?url=http://www.iana.org/') resp = self.testapp.get('/fallback/resource?url=http://www.iana.org/')
assert resp.headers['WARC-Coll'] == 'live' assert resp.headers['Source-Coll'] == 'live'
assert resp.headers['WARC-Target-URI'] == 'http://www.iana.org/'
self._check_uri_date(resp, 'http://www.iana.org/', True)
assert resp.headers['Link'] == MementoUtils.make_link('http://www.iana.org/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://www.iana.org/', 'original')
assert b'HTTP/1.1 200 OK' in resp.body assert b'HTTP/1.1 200 OK' in resp.body
@ -296,9 +314,9 @@ foo=bar&test=abc"""
def test_agg_seq_fallback_2(self): def test_agg_seq_fallback_2(self):
resp = self.testapp.get('/fallback/resource?url=http://www.example.com/') resp = self.testapp.get('/fallback/resource?url=http://www.example.com/')
assert resp.headers['WARC-Coll'] == 'example' assert resp.headers['Source-Coll'] == 'example'
assert resp.headers['WARC-Date'] == '2016-02-25T04:23:29Z'
assert resp.headers['WARC-Target-URI'] == 'http://example.com/' self._check_uri_date(resp, 'http://example.com/', '2016-02-25T04:23:29Z')
assert resp.headers['Link'] == MementoUtils.make_link('http://example.com/', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://example.com/', 'original')
assert resp.headers['Memento-Datetime'] == 'Thu, 25 Feb 2016 04:23:29 GMT' assert resp.headers['Memento-Datetime'] == 'Thu, 25 Feb 2016 04:23:29 GMT'
@ -318,11 +336,14 @@ foo=bar&test=abc"""
def test_agg_local_revisit(self): def test_agg_local_revisit(self):
resp = self.testapp.get('/many/resource?url=http://www.example.com/&closest=20140127171251&sources=local') resp = self.testapp.get('/many/resource?url=http://www.example.com/&closest=20140127171251&sources=local')
assert resp.headers['WARC-Coll'] == 'local' assert resp.headers['Source-Coll'] == 'local'
assert resp.headers['WARC-Target-URI'] == 'http://example.com'
assert resp.headers['WARC-Date'] == '2014-01-27T17:12:51Z' buff = BytesIO(resp.body)
assert resp.headers['WARC-Refers-To-Target-URI'] == 'http://example.com' status_headers = StatusAndHeadersParser(['WARC/1.0']).parse(buff)
assert resp.headers['WARC-Refers-To-Date'] == '2014-01-27T17:12:00Z' assert status_headers.get_header('WARC-Target-URI') == 'http://example.com'
assert status_headers.get_header('WARC-Date') == '2014-01-27T17:12:51Z'
assert status_headers.get_header('WARC-Refers-To-Target-URI') == 'http://example.com'
assert status_headers.get_header('WARC-Refers-To-Date') == '2014-01-27T17:12:00Z'
assert resp.headers['Link'] == MementoUtils.make_link('http://example.com', 'original') assert resp.headers['Link'] == MementoUtils.make_link('http://example.com', 'original')
assert resp.headers['Memento-Datetime'] == 'Mon, 27 Jan 2014 17:12:51 GMT' assert resp.headers['Memento-Datetime'] == 'Mon, 27 Jan 2014 17:12:51 GMT'

View File

@ -9,6 +9,7 @@ from .testutils import json_list, to_path
import json import json
import pytest import pytest
import time import time
import six
from webagg.handlers import IndexHandler from webagg.handlers import IndexHandler
@ -39,8 +40,13 @@ agg_nf = {'simple': SimpleAggregator(nf),
'processes': ThreadedTimeoutAggregator(nf, timeout=5.0, use_processes=True), 'processes': ThreadedTimeoutAggregator(nf, timeout=5.0, use_processes=True),
} }
#def pytest_generate_tests(metafunc): if six.PY2:
# metafunc.parametrize("agg", list(aggs.values()), ids=list(aggs.keys())) del aggs['threaded']
del aggs['processes']
del agg_tm['threaded']
del agg_tm['processes']
del agg_nf['threaded']
del agg_nf['processes']
@pytest.mark.parametrize("agg", list(aggs.values()), ids=list(aggs.keys())) @pytest.mark.parametrize("agg", list(aggs.values()), ids=list(aggs.keys()))

View File

@ -1,7 +1,7 @@
from webagg.liverec import request as remote_request from webagg.liverec import request as remote_request
from webagg.inputrequest import DirectWSGIInputRequest, POSTInputRequest from webagg.inputrequest import DirectWSGIInputRequest, POSTInputRequest
from bottle import route, request, response, default_app, abort from bottle import route, request, response, abort, Bottle
import bottle import bottle
import traceback import traceback
@ -11,49 +11,46 @@ JSON_CT = 'application/json; charset=utf-8'
#============================================================================= #=============================================================================
route_dict = {} class ResAggApp(object):
def __init__(self, *args, **kwargs):
self.application = Bottle()
self.application.default_error_handler = self.err_handler
self.route_dict = {}
@self.application.route('/')
def list_routes():
return self.route_dict
#============================================================================= def add_route(self, path, handler):
def add_route(path, handler): @self.application.route([path, path + '/<mode:path>'], 'ANY')
@route([path, path + '/<mode:path>'], 'ANY') @wrap_error
@wrap_error def direct_input_request(mode=''):
def direct_input_request(mode=''): params = dict(request.query)
params = dict(request.query) params['mode'] = mode
params['mode'] = mode params['_input_req'] = DirectWSGIInputRequest(request.environ)
params['_input_req'] = DirectWSGIInputRequest(request.environ) return handler(params)
return handler(params)
@route([path + '/postreq', path + '/<mode:path>/postreq'], 'POST') @self.application.route([path + '/postreq', path + '/<mode:path>/postreq'], 'POST')
@wrap_error @wrap_error
def post_fullrequest(mode=''): def post_fullrequest(mode=''):
params = dict(request.query) params = dict(request.query)
params['mode'] = mode params['mode'] = mode
params['_input_req'] = POSTInputRequest(request.environ) params['_input_req'] = POSTInputRequest(request.environ)
return handler(params) return handler(params)
global route_dict handler_dict = handler.get_supported_modes()
handler_dict = handler.get_supported_modes() self.route_dict[path] = handler_dict
route_dict[path] = handler_dict self.route_dict[path + '/postreq'] = handler_dict
route_dict[path + '/postreq'] = handler_dict
def err_handler(self, exc):
#============================================================================= if bottle.debug:
@route('/') print(exc)
def list_routes(): traceback.print_exc()
return route_dict response.status = exc.status_code
response.content_type = JSON_CT
err_msg = json.dumps({'message': exc.body})
#============================================================================= response.headers['ResErrors'] = err_msg
def err_handler(exc): return err_msg
if bottle.debug:
print(exc)
traceback.print_exc()
response.status = exc.status_code
response.content_type = JSON_CT
err_msg = json.dumps({'message': exc.body})
response.headers['ResErrors'] = err_msg
return err_msg
#============================================================================= #=============================================================================
@ -99,8 +96,3 @@ def wrap_error(func):
return wrap_func return wrap_func
#=============================================================================
application = default_app()
application.default_error_handler = err_handler

View File

@ -1,4 +1,4 @@
from webagg.responseloader import WARCPathLoader, LiveWebLoader, UpstreamProxyLoader from webagg.responseloader import WARCPathLoader, LiveWebLoader
from webagg.utils import MementoUtils from webagg.utils import MementoUtils
from pywb.utils.wbexception import BadRequestException, WbException from pywb.utils.wbexception import BadRequestException, WbException
from pywb.utils.wbexception import NotFoundException from pywb.utils.wbexception import NotFoundException
@ -118,7 +118,7 @@ class ResourceHandler(IndexHandler):
class DefaultResourceHandler(ResourceHandler): class DefaultResourceHandler(ResourceHandler):
def __init__(self, index_source, warc_paths=''): def __init__(self, index_source, warc_paths=''):
loaders = [WARCPathLoader(warc_paths, index_source), loaders = [WARCPathLoader(warc_paths, index_source),
UpstreamProxyLoader(), # UpstreamProxyLoader(),
LiveWebLoader(), LiveWebLoader(),
] ]
super(DefaultResourceHandler, self).__init__(index_source, loaders) super(DefaultResourceHandler, self).__init__(index_source, loaders)

View File

@ -66,23 +66,33 @@ class RemoteIndexSource(BaseIndexSource):
def do_load(lines): def do_load(lines):
for line in lines: for line in lines:
cdx = CDXObject(line) cdx = CDXObject(line)
cdx[self.url_field] = self.replay_url.format( self._set_load_url(cdx)
timestamp=cdx['timestamp'],
url=cdx['url'])
yield cdx yield cdx
return do_load(lines) return do_load(lines)
@staticmethod def _set_load_url(self, cdx):
def upstream_webagg(base_url): cdx[self.url_field] = self.replay_url.format(
api_url = base_url + '/index?url={url}' timestamp=cdx['timestamp'],
proxy_url = base_url + '/resource?url={url}&closest={timestamp}' url=cdx['url'])
return RemoteIndexSource(api_url, proxy_url, 'upstream_url')
def __str__(self): def __str__(self):
return 'remote' return 'remote'
#=============================================================================
class UpstreamAggIndexSource(RemoteIndexSource):
def __init__(self, base_url):
api_url = base_url + '/index?url={url}'
proxy_url = base_url + '/resource?url={url}&closest={timestamp}'
super(UpstreamAggIndexSource, self).__init__(api_url, proxy_url, 'filename')
def _set_load_url(self, cdx):
super(UpstreamAggIndexSource, self)._set_load_url(cdx)
cdx['offset'] = '0'
cdx.pop('load_url', '')
#============================================================================= #=============================================================================
class LiveIndexSource(BaseIndexSource): class LiveIndexSource(BaseIndexSource):
def load_index(self, params): def load_index(self, params):

View File

@ -1,34 +1,44 @@
from webagg.liverec import BaseRecorder from webagg.liverec import BaseRecorder
from webagg.liverec import request as remote_request from webagg.liverec import request as remote_request
from requests import request
from webagg.utils import MementoUtils from webagg.utils import MementoUtils
from requests import session
from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_http_date from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_http_date
from pywb.utils.timeutils import iso_date_to_datetime from pywb.utils.timeutils import iso_date_to_datetime
from pywb.utils.wbexception import LiveResourceException from pywb.utils.wbexception import LiveResourceException
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.warc.resolvingloader import ResolvingLoader from pywb.warc.resolvingloader import ResolvingLoader
from io import BytesIO from io import BytesIO
import uuid import uuid
import six import six
import itertools
#============================================================================= #=============================================================================
class StreamIter(six.Iterator): class StreamIter(six.Iterator):
def __init__(self, stream, header=None, size=8192): def __init__(self, stream, header1=None, header2=None, size=8192):
self.stream = stream self.stream = stream
self.header = header self.header1 = header1
self.header2 = header2
self.size = size self.size = size
def __iter__(self): def __iter__(self):
return self return self
def __next__(self): def __next__(self):
if self.header: if self.header1:
header = self.header header = self.header1
self.header = None self.header1 = None
return header
elif self.header2:
header = self.header2
self.header2 = None
return header return header
data = self.stream.read(self.size) data = self.stream.read(self.size)
@ -52,22 +62,44 @@ class StreamIter(six.Iterator):
#============================================================================= #=============================================================================
class BaseLoader(object): class BaseLoader(object):
def __call__(self, cdx, params): def __call__(self, cdx, params):
out_headers, res = self._load_resource(cdx, params) entry = self._load_resource(cdx, params)
if not res: if not entry:
return None, None return None, None
out_headers['WARC-Coll'] = cdx.get('source', '') warc_headers, other_headers_buff, stream = entry
out_headers = {}
out_headers['Source-Coll'] = cdx.get('source', '')
out_headers['Link'] = MementoUtils.make_link( out_headers['Link'] = MementoUtils.make_link(
out_headers['WARC-Target-URI'], warc_headers.get_header('WARC-Target-URI'),
'original') 'original')
memento_dt = iso_date_to_datetime(out_headers['WARC-Date']) out_headers['Content-Type'] = 'application/warc-record'
memento_dt = iso_date_to_datetime(warc_headers.get_header('WARC-Date'))
out_headers['Memento-Datetime'] = datetime_to_http_date(memento_dt) out_headers['Memento-Datetime'] = datetime_to_http_date(memento_dt)
return out_headers, res
def _load_resource(self, cdx, params): #pragma: no cover warc_headers_buff = warc_headers.to_bytes()
raise NotImplemented()
self._set_content_len(warc_headers.get_header('Content-Length'),
out_headers,
len(warc_headers_buff))
return out_headers, StreamIter(stream,
header1=warc_headers_buff,
header2=other_headers_buff)
def _set_content_len(self, content_len_str, headers, existing_len):
# Try to set content-length, if it is available and valid
try:
content_len = int(content_len_str)
except (KeyError, TypeError):
content_len = -1
if content_len >= 0:
content_len += existing_len
headers['Content-Length'] = str(content_len)
#============================================================================= #=============================================================================
@ -104,7 +136,7 @@ class WARCPathLoader(BaseLoader):
def _load_resource(self, cdx, params): def _load_resource(self, cdx, params):
if not cdx.get('filename') or cdx.get('offset') is None: if not cdx.get('filename') or cdx.get('offset') is None:
return None, None return None
cdx._formatter = params.get('_formatter') cdx._formatter = params.get('_formatter')
failed_files = [] failed_files = []
@ -112,88 +144,29 @@ class WARCPathLoader(BaseLoader):
load_headers_and_payload(cdx, load_headers_and_payload(cdx,
failed_files, failed_files,
self.cdx_index_source)) self.cdx_index_source))
warc_headers = payload.rec_headers
record = payload
out_headers = {}
for n, v in record.rec_headers.headers:
out_headers[n] = v
if headers != payload: if headers != payload:
out_headers['WARC-Target-URI'] = headers.rec_headers.get_header('WARC-Target-URI') warc_headers.replace_header('WARC-Refers-To-Target-URI',
out_headers['WARC-Date'] = headers.rec_headers.get_header('WARC-Date') payload.rec_headers.get_header('WARC-Target-URI'))
out_headers['WARC-Refers-To-Target-URI'] = payload.rec_headers.get_header('WARC-Target-URI')
out_headers['WARC-Refers-To-Date'] = payload.rec_headers.get_header('WARC-Date') warc_headers.replace_header('WARC-Refers-To-Date',
payload.rec_headers.get_header('WARC-Date'))
warc_headers.replace_header('WARC-Target-URI',
headers.rec_headers.get_header('WARC-Target-URI'))
warc_headers.replace_header('WARC-Date',
headers.rec_headers.get_header('WARC-Date'))
headers.stream.close() headers.stream.close()
return out_headers, StreamIter(record.stream) return (warc_headers, None, payload.stream)
def __str__(self): def __str__(self):
return 'WARCPathLoader' return 'WARCPathLoader'
#=============================================================================
class HeaderRecorder(BaseRecorder):
def __init__(self, skip_list=None):
self.buff = BytesIO()
self.skip_list = skip_list
self.skipped = []
self.target_ip = None
def write_response_header_line(self, line):
if self.accept_header(line):
self.buff.write(line)
def get_header(self):
return self.buff.getvalue()
def accept_header(self, line):
if self.skip_list and line.lower().startswith(self.skip_list):
self.skipped.append(line)
return False
return True
def finish_request(self, socket):
ip = socket.getpeername()
if ip:
self.target_ip = ip[0]
#=============================================================================
class UpstreamProxyLoader(BaseLoader):
def _load_resource(self, cdx, params):
load_url = cdx.get('upstream_url')
if not load_url:
return None, None
input_req = params['_input_req']
method = input_req.get_req_method()
data = input_req.get_req_body()
req_headers = input_req.get_req_headers()
try:
upstream_res = request(url=load_url,
method=method,
stream=True,
allow_redirects=False,
headers=req_headers,
data=data,
timeout=params.get('_timeout'))
except Exception as e:
import traceback
traceback.print_exc()
raise LiveResourceException(load_url)
out_headers = upstream_res.headers
return out_headers, StreamIter(upstream_res.raw)
def __str__(self):
return 'UpstreamProxyLoader'
#============================================================================= #=============================================================================
class LiveWebLoader(BaseLoader): class LiveWebLoader(BaseLoader):
SKIP_HEADERS = (b'link', SKIP_HEADERS = (b'link',
@ -204,7 +177,7 @@ class LiveWebLoader(BaseLoader):
def _load_resource(self, cdx, params): def _load_resource(self, cdx, params):
load_url = cdx.get('load_url') load_url = cdx.get('load_url')
if not load_url: if not load_url:
return None, None return None
recorder = HeaderRecorder(self.SKIP_HEADERS) recorder = HeaderRecorder(self.SKIP_HEADERS)
@ -236,31 +209,28 @@ class LiveWebLoader(BaseLoader):
headers=req_headers, headers=req_headers,
data=data, data=data,
timeout=params.get('_timeout')) timeout=params.get('_timeout'))
except Exception: except Exception as e:
raise LiveResourceException(load_url) raise LiveResourceException(load_url)
resp_headers = recorder.get_header() http_headers_buff = recorder.get_headers_buff()
out_headers = {} warc_headers = {}
out_headers['Content-Type'] = 'application/http; msgtype=response'
out_headers['WARC-Type'] = 'response' warc_headers['WARC-Type'] = 'response'
out_headers['WARC-Record-ID'] = self._make_warc_id() warc_headers['WARC-Record-ID'] = self._make_warc_id()
out_headers['WARC-Target-URI'] = cdx['url'] warc_headers['WARC-Target-URI'] = cdx['url']
out_headers['WARC-Date'] = self._make_date(dt) warc_headers['WARC-Date'] = self._make_date(dt)
if recorder.target_ip: if recorder.target_ip:
out_headers['WARC-IP-Address'] = recorder.target_ip warc_headers['WARC-IP-Address'] = recorder.target_ip
# Try to set content-length, if it is available and valid warc_headers['Content-Type'] = 'application/http; msgtype=response'
try:
content_len = int(upstream_res.headers.get('content-length', 0))
if content_len > 0:
content_len += len(resp_headers)
out_headers['Content-Length'] = content_len
except (KeyError, TypeError):
pass
return out_headers, StreamIter(upstream_res.raw, header=resp_headers) self._set_content_len(upstream_res.headers.get('Content-Length', -1),
warc_headers,
len(http_headers_buff))
warc_headers = StatusAndHeaders('WARC/1.0', warc_headers.items())
return (warc_headers, http_headers_buff, upstream_res.raw)
@staticmethod @staticmethod
def _make_date(dt): def _make_date(dt):
@ -275,3 +245,32 @@ class LiveWebLoader(BaseLoader):
def __str__(self): def __str__(self):
return 'LiveWebLoader' return 'LiveWebLoader'
#=============================================================================
class HeaderRecorder(BaseRecorder):
def __init__(self, skip_list=None):
self.buff = BytesIO()
self.skip_list = skip_list
self.skipped = []
self.target_ip = None
def write_response_header_line(self, line):
if self.accept_header(line):
self.buff.write(line)
def get_headers_buff(self):
return self.buff.getvalue()
def accept_header(self, line):
if self.skip_list and line.lower().startswith(self.skip_list):
self.skipped.append(line)
return False
return True
def finish_request(self, socket):
ip = socket.getpeername()
if ip:
self.target_ip = ip[0]