From 0823ff4bd0bfa8de9350b434e2793cbe073eb838 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sun, 6 Mar 2016 09:10:17 -0800 Subject: [PATCH] added 'upstream' handler for connecting to another webagg when 'upstream_url' is set output 'is_live' as string in live index --- test/test_handlers.py | 2 +- webagg/app.py | 3 +++ webagg/handlers.py | 5 +++-- webagg/indexsource.py | 17 ++++++++++++----- webagg/responseloader.py | 35 +++++++++++++++++++++++++++++++++++ 5 files changed, 54 insertions(+), 8 deletions(-) diff --git a/test/test_handlers.py b/test/test_handlers.py index 043b6aea..b4da3ace 100644 --- a/test/test_handlers.py +++ b/test/test_handlers.py @@ -113,7 +113,7 @@ class TestResAgg(object): res = to_json_list(resp.text) res[0]['timestamp'] = '2016' - assert(res == [{'url': 'http://httpbin.org/get', 'urlkey': 'org,httpbin)/get', 'is_live': True, + assert(res == [{'url': 'http://httpbin.org/get', 'urlkey': 'org,httpbin)/get', 'is_live': 'true', 'load_url': 'http://httpbin.org/get', 'source': 'live', 'timestamp': '2016'}]) def test_live_resource(self): diff --git a/webagg/app.py b/webagg/app.py index 5a9bae15..437c1105 100644 --- a/webagg/app.py +++ b/webagg/app.py @@ -46,6 +46,9 @@ def list_routes(): #============================================================================= def err_handler(exc): + if bottle.debug: + print(exc) + traceback.print_exc() response.status = exc.status_code response.content_type = JSON_CT err_msg = json.dumps({'message': exc.body}) diff --git a/webagg/handlers.py b/webagg/handlers.py index da2ed837..b604bd62 100644 --- a/webagg/handlers.py +++ b/webagg/handlers.py @@ -1,4 +1,4 @@ -from webagg.responseloader import WARCPathLoader, LiveWebLoader +from webagg.responseloader import WARCPathLoader, LiveWebLoader, UpstreamProxyLoader from webagg.utils import MementoUtils from pywb.utils.wbexception import BadRequestException, WbException from pywb.utils.wbexception import NotFoundException @@ -118,7 +118,8 @@ class ResourceHandler(IndexHandler): class DefaultResourceHandler(ResourceHandler): def __init__(self, index_source, warc_paths=''): loaders = [WARCPathLoader(warc_paths, index_source), - LiveWebLoader() + UpstreamProxyLoader(), + LiveWebLoader(), ] super(DefaultResourceHandler, self).__init__(index_source, loaders) diff --git a/webagg/indexsource.py b/webagg/indexsource.py index 1637bc81..32fd3804 100644 --- a/webagg/indexsource.py +++ b/webagg/indexsource.py @@ -51,9 +51,10 @@ class FileIndexSource(BaseIndexSource): #============================================================================= class RemoteIndexSource(BaseIndexSource): - def __init__(self, api_url, replay_url): + def __init__(self, api_url, replay_url, url_field='load_url'): self.api_url_template = api_url self.replay_url = replay_url + self.url_field = url_field def load_index(self, params): api_url = res_template(self.api_url_template, params) @@ -65,13 +66,19 @@ class RemoteIndexSource(BaseIndexSource): def do_load(lines): for line in lines: cdx = CDXObject(line) - cdx['load_url'] = self.replay_url.format( - timestamp=cdx['timestamp'], - url=cdx['url']) + cdx[self.url_field] = self.replay_url.format( + timestamp=cdx['timestamp'], + url=cdx['url']) yield cdx return do_load(lines) + @staticmethod + def upstream_webagg(base_url): + api_url = base_url + '/index?url={url}' + proxy_url = base_url + '/resource?url={url}&closest={timestamp}' + return RemoteIndexSource(api_url, proxy_url, 'upstream_url') + def __str__(self): return 'remote' @@ -84,7 +91,7 @@ class LiveIndexSource(BaseIndexSource): cdx['timestamp'] = timestamp_now() cdx['url'] = params['url'] cdx['load_url'] = params['url'] - cdx['is_live'] = True + cdx['is_live'] = 'true' def live(): yield cdx diff --git a/webagg/responseloader.py b/webagg/responseloader.py index b0f2ba5b..48459345 100644 --- a/webagg/responseloader.py +++ b/webagg/responseloader.py @@ -1,5 +1,6 @@ from webagg.liverec import BaseRecorder from webagg.liverec import request as remote_request +from requests import request from webagg.utils import MementoUtils @@ -159,6 +160,40 @@ class HeaderRecorder(BaseRecorder): self.target_ip = ip[0] +#============================================================================= +class UpstreamProxyLoader(BaseLoader): + def _load_resource(self, cdx, params): + load_url = cdx.get('upstream_url') + if not load_url: + return None, None + + input_req = params['_input_req'] + + method = input_req.get_req_method() + data = input_req.get_req_body() + req_headers = input_req.get_req_headers() + + try: + upstream_res = request(url=load_url, + method=method, + stream=True, + allow_redirects=False, + headers=req_headers, + data=data, + timeout=params.get('_timeout')) + except Exception as e: + import traceback + traceback.print_exc() + raise LiveResourceException(load_url) + + out_headers = upstream_res.headers + + return out_headers, StreamIter(upstream_res.raw) + + def __str__(self): + return 'UpstreamProxyLoader' + + #============================================================================= class LiveWebLoader(BaseLoader): SKIP_HEADERS = (b'link',