From 0370470e685c15db096c003f81cd6d80f78e657e Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 15 Apr 2016 02:21:39 +0000 Subject: [PATCH] urlrewrite: http range: support skipping record for range requests not starting at 0- and performing async request, support converting unbounded 0- to non-ranged and back --- urlrewrite/rewriteinputreq.py | 43 +++++++++++++- urlrewrite/rewriterapp.py | 107 +++++++++++++++++++++++++++++----- 2 files changed, 136 insertions(+), 14 deletions(-) diff --git a/urlrewrite/rewriteinputreq.py b/urlrewrite/rewriteinputreq.py index 10c929fc..fec5797b 100644 --- a/urlrewrite/rewriteinputreq.py +++ b/urlrewrite/rewriteinputreq.py @@ -3,10 +3,15 @@ from pywb.utils.loaders import extract_client_cookie from six import iteritems from six.moves.urllib.parse import urlsplit +import re #============================================================================= class RewriteInputRequest(DirectWSGIInputRequest): + RANGE_ARG_RX = re.compile('.*.googlevideo.com/videoplayback.*([&?]range=(\d+)-(\d+))') + + RANGE_HEADER = re.compile('bytes=(\d+)-(\d+)?') + def __init__(self, env, urlkey, url, rewriter): super(RewriteInputRequest, self).__init__(env) self.urlkey = urlkey @@ -38,7 +43,7 @@ class RewriteInputRequest(DirectWSGIInputRequest): elif name == 'HTTP_X_CSRFTOKEN': name = 'X-CSRFToken' - cookie_val = extract_client_cookie(env, 'csrftoken') + cookie_val = extract_client_cookie(self.env, 'csrftoken') if cookie_val: value = cookie_val @@ -86,3 +91,39 @@ class RewriteInputRequest(DirectWSGIInputRequest): return value + def extract_range(self): + use_206 = False + start = None + end = None + url = self.url + + range_h = self.env.get('HTTP_RANGE') + + if range_h: + m = self.RANGE_HEADER.match(range_h) + if m: + start = m.group(1) + end = m.group(2) + use_206 = True + + else: + m = self.RANGE_ARG_RX.match(url) + if m: + start = m.group(2) + end = m.group(3) + url = url[:m.start(1)] + url[m.end(1):] + use_206 = False + + if not start: + return None + + start = int(start) + + if end: + end = int(end) + else: + end = '' + + result = (url, start, end, use_206) + return result + diff --git a/urlrewrite/rewriterapp.py b/urlrewrite/rewriterapp.py index 4d9836f4..b0d1e8f9 100644 --- a/urlrewrite/rewriterapp.py +++ b/urlrewrite/rewriterapp.py @@ -17,6 +17,7 @@ from urlrewrite.rewriteinputreq import RewriteInputRequest from urlrewrite.templateview import JinjaEnv, HeadInsertView, TopFrameView from io import BytesIO +import gevent # ============================================================================ @@ -65,22 +66,33 @@ class RewriterApp(object): inputreq = RewriteInputRequest(request.environ, urlkey, url, self.content_rewriter) - req_data = inputreq.reconstruct_request(url) + mod_url = None + use_206 = False + rangeres = None - headers = {'Content-Length': len(req_data), - 'Content-Type': 'application/request'} + readd_range = False + async_record_url = None - if wb_url.is_latest_replay(): - closest = 'now' - else: - closest = wb_url.timestamp + if kwargs.get('type') == 'record': + rangeres = inputreq.extract_range() - upstream_url = self.get_upstream_url(url, wb_url, closest, kwargs) + if rangeres: + mod_url, start, end, use_206 = rangeres - r = requests.post(upstream_url, - data=BytesIO(req_data), - headers=headers, - stream=True) + # if bytes=0- Range request, + # simply remove the range and still proxy + if start == 0 and not end and use_206: + url = mod_url + wb_url.url = mod_url + inputreq.url = mod_url + + del request.environ['HTTP_RANGE'] + readd_range = True + else: + async_record_url = mod_url + + r = self._do_req(inputreq, url, wb_url, kwargs, + async_record_url is not None) if r.status_code >= 400: error = None @@ -98,6 +110,16 @@ class RewriterApp(object): data = dict(url=url, args=kwargs, error=error) raise HTTPError(r.status_code, exception=data) + if async_record_url: + #print('ASYNC REC', async_record_url) + request.environ.pop('HTTP_RANGE', '') + gevent.spawn(self._do_async_req, + inputreq, + async_record_url, + wb_url, + kwargs, + False) + record = self.loader.parse_record_stream(r.raw) cdx = CDXObject() @@ -107,6 +129,16 @@ class RewriterApp(object): self._add_custom_params(cdx, r.headers, kwargs) + if readd_range: + content_length = (record.status_headers. + get_header('Content-Length')) + try: + content_length = int(content_length) + record.status_headers.add_range(0, content_length, + content_length) + except (ValueError, TypeError): + pass + if self.is_ajax(): head_insert_func = None else: @@ -133,6 +165,54 @@ class RewriterApp(object): return gen + def _do_async_req(self, *args): + count = 0 + #print('ASYNC') + try: + r = self._do_req(*args) + while True: + buff = r.raw.read(8192) + count += len(buff) + if not buff: + return + except: + import traceback + traceback.print_exc() + + finally: + #print('CLOSING') + #print('READ ASYNC', count) + try: + r.raw.close() + except: + pass + + + def _do_req(self, inputreq, url, wb_url, kwargs, skip): + req_data = inputreq.reconstruct_request(url) + + headers = {'Content-Length': len(req_data), + 'Content-Type': 'application/request'} + + if skip: + headers['Recorder-Skip'] = '1' + + if wb_url.is_latest_replay(): + closest = 'now' + else: + closest = wb_url.timestamp + + upstream_url = self.get_upstream_url(url, wb_url, closest, kwargs) + r = requests.post(upstream_url, + data=BytesIO(req_data), + headers=headers, + stream=True) + + return r + + + + def get_host_prefix(self): return request.urlparts.scheme + '://' + request.urlparts.netloc @@ -151,7 +231,7 @@ class RewriterApp(object): if referrer.startswith(full_prefix): referrer = referrer[len(full_prefix):] - request.environ['HTTP_REFERER'] = referrer + request.environ['HTTP_REFERER'] = WbUrl(referrer).url return True return False @@ -168,6 +248,7 @@ class RewriterApp(object): raise NotImplemented() def _add_custom_params(self, cdx, headers, kwargs): + cdx['is_live'] = 'true' pass def get_top_frame_params(self, wb_url, kwargs):