From d7c74b68de98c2e849b13b221342141c207a5f1b Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 28 May 2016 15:01:33 -0700 Subject: [PATCH] video loader support: add VideoLoader, which uses youtube-dl to create a metadata record of video info. Activated with explicit content_type param 'application/vnd.youtube-dl_formats+json' --- recorder/recorderapp.py | 12 ++++-- recorder/test/test_recorder.py | 43 +++++++++++++++++++-- recorder/warcwriter.py | 40 ++++++++++++-------- urlrewrite/rewriterapp.py | 60 +++++++++++++++++++----------- urlrewrite/test/simpleapp.py | 17 ++++----- webagg/aggregator.py | 4 ++ webagg/handlers.py | 3 +- webagg/indexsource.py | 1 + webagg/responseloader.py | 68 ++++++++++++++++++++++++++++++---- webagg/test/test_handlers.py | 42 +++++++++++++++++++++ 10 files changed, 229 insertions(+), 61 deletions(-) diff --git a/recorder/recorderapp.py b/recorder/recorderapp.py index 73d8500f..d61e3df1 100644 --- a/recorder/recorderapp.py +++ b/recorder/recorderapp.py @@ -57,10 +57,16 @@ class RecorderApp(object): req_head, req_pay, resp_head, resp_pay, params = result - req = self.writer.create_req_record(req_head, req_pay, 'request') - resp = self.writer.create_resp_record(resp_head, resp_pay, 'response') + resp_type, resp = self.writer.read_resp_record(resp_head, resp_pay) + + if resp_type == 'response': + req = self.writer.create_req_record(req_head, req_pay) + + self.writer.write_req_resp(req, resp, params) + + else: + self.writer.write_record(resp, params) - self.writer.write_req_resp(req, resp, params) finally: try: diff --git a/recorder/test/test_recorder.py b/recorder/test/test_recorder.py index 9c4acef6..5320800f 100644 --- a/recorder/test/test_recorder.py +++ b/recorder/test/test_recorder.py @@ -25,7 +25,7 @@ from pywb.warc.recordloader import ArcWarcRecordLoader from pywb.warc.cdxindexer import write_cdx_index from pywb.warc.archiveiterator import ArchiveIterator -from six.moves.urllib.parse import quote, unquote +from six.moves.urllib.parse import quote, unquote, urlencode from io import BytesIO import time import json @@ -67,7 +67,7 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) return dedup_index - def _test_warc_write(self, recorder_app, host, path, other_params=''): + def _test_warc_write(self, recorder_app, host, path, other_params='', link_url=''): url = 'http://' + host + path req_url = '/live/resource/postreq?url=' + url + other_params testapp = webtest.TestApp(recorder_app) @@ -78,7 +78,10 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert resp.headers['WebAgg-Source-Coll'] == 'live' - assert resp.headers['Link'] == MementoUtils.make_link(unquote(url), 'original') + if not link_url: + link_url = unquote(url) + + assert resp.headers['Link'] == MementoUtils.make_link(link_url, 'original') assert resp.headers['Memento-Datetime'] != '' return resp @@ -303,7 +306,6 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert len(res) == 2 def test_record_param_user_coll_write_dupe_no_revisit(self): - warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') dedup_index = self._get_dedup_index(dupe_policy=WriteDupePolicy()) @@ -524,4 +526,37 @@ class TestRecorder(LiveServerTests, FakeRedisTests, TempDirTests, BaseTestClass) assert status_headers.get_header('Content-Type') == 'text/plain' assert status_headers.get_header('Content-Length') == str(len(buff)) + def test_record_video_metadata(self): + warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/') + + dedup_index = self._get_dedup_index() + + writer = PerRecordWARCWriter(warc_path, dedup_index=dedup_index) + recorder_app = RecorderApp(self.upstream_url, writer) + + params = {'param.recorder.user': 'USER', + 'param.recorder.coll': 'VIDEO', + 'content_type': 'application/vnd.youtube-dl_formats+json' + } + + resp = self._test_warc_write(recorder_app, + 'www.youtube.com', '/v/BfBgWtAIbRc', '&' + urlencode(params), + link_url='metadata://www.youtube.com/v/BfBgWtAIbRc') + + r = FakeStrictRedis.from_url('redis://localhost/2') + + warcs = r.hgetall('USER:VIDEO:warc') + assert len(warcs) == 1 + + filename = list(warcs.values())[0] + + with open(filename, 'rb') as fh: + decomp = DecompressingBufferedReader(fh) + record = ArcWarcRecordLoader().parse_record_stream(decomp) + + status_headers = record.rec_headers + assert status_headers.get_header('WARC-Type') == 'metadata' + assert status_headers.get_header('Content-Type') == 'application/vnd.youtube-dl_formats+json' + assert status_headers.get_header('WARC-Block-Digest') != '' + assert status_headers.get_header('WARC-Block-Digest') == status_headers.get_header('WARC-Payload-Digest') diff --git a/recorder/warcwriter.py b/recorder/warcwriter.py index 39d791f1..7fce2dd1 100644 --- a/recorder/warcwriter.py +++ b/recorder/warcwriter.py @@ -94,13 +94,9 @@ class BaseWARCWriter(object): url = resp.rec_headers.get('WARC-Target-URI') dt = resp.rec_headers.get('WARC-Date') - if not req.rec_headers.get('WARC-Record-ID'): - req.rec_headers['WARC-Record-ID'] = self._make_warc_id() - + #req.rec_headers['Content-Type'] = req.content_type req.rec_headers['WARC-Target-URI'] = url req.rec_headers['WARC-Date'] = dt - req.rec_headers['WARC-Type'] = 'request' - #req.rec_headers['Content-Type'] = req.content_type resp_id = resp.rec_headers.get('WARC-Record-ID') if resp_id: @@ -114,37 +110,47 @@ class BaseWARCWriter(object): params['_formatter'] = ParamFormatter(params, name=self.rec_source_name) self._do_write_req_resp(req, resp, params) - def create_req_record(self, req_headers, payload, type_, content_type=''): + def create_req_record(self, req_headers, payload): len_ = payload.tell() payload.seek(0) warc_headers = req_headers + warc_headers['WARC-Type'] = 'request' + if not warc_headers.get('WARC-Record-ID'): + warc_headers['WARC-Record-ID'] = self._make_warc_id() + status_headers = self.parser.parse(payload) - record = ArcWarcRecord('warc', type_, warc_headers, payload, - status_headers, content_type, len_) + record = ArcWarcRecord('warc', 'request', warc_headers, payload, + status_headers, '', len_) self._set_header_buff(record) return record - def create_resp_record(self, resp_headers, payload, type_, content_type=''): + def read_resp_record(self, resp_headers, payload): len_ = payload.tell() payload.seek(0) warc_headers = self.parser.parse(payload) warc_headers = CaseInsensitiveDict(warc_headers.headers) - status_headers = self.parser.parse(payload) + record_type = warc_headers.get('WARC-Type', 'response') - record = ArcWarcRecord('warc', type_, warc_headers, payload, - status_headers, content_type, len_) + if record_type == 'response': + status_headers = self.parser.parse(payload) + else: + status_headers = None - self._set_header_buff(record) + record = ArcWarcRecord('warc', record_type, warc_headers, payload, + status_headers, '', len_) + + if record_type == 'response': + self._set_header_buff(record) self.ensure_digest(record) - return record + return record_type, record def create_warcinfo_record(self, filename, **kwargs): warc_headers = {} @@ -220,7 +226,11 @@ class BaseWARCWriter(object): self._header(out, n, v) - content_type = record.content_type + content_type = record.rec_headers.get('Content-Type') + + if not content_type: + content_type = record.content_type + if not content_type: content_type = self.WARC_RECORDS.get(record.rec_headers['WARC-Type']) diff --git a/urlrewrite/rewriterapp.py b/urlrewrite/rewriterapp.py index 7522cd97..b1938407 100644 --- a/urlrewrite/rewriterapp.py +++ b/urlrewrite/rewriterapp.py @@ -13,6 +13,8 @@ from pywb.cdx.cdxobject import CDXObject from pywb.warc.recordloader import ArcWarcRecordLoader from pywb.framework.wbrequestresponse import WbResponse +from six.moves.urllib.parse import urlencode + from urlrewrite.rewriteinputreq import RewriteInputRequest from urlrewrite.templateview import JinjaEnv, HeadInsertView, TopFrameView, BaseInsertView @@ -31,10 +33,13 @@ class UpstreamException(WbException): # ============================================================================ class RewriterApp(object): + VIDEO_INFO_CONTENT_TYPE = 'application/vnd.youtube-dl_formats+json' + def __init__(self, framed_replay=False, jinja_env=None, config=None): self.loader = ArcWarcRecordLoader() config = config or {} + self.paths = config['url_templates'] self.framed_replay = framed_replay self.frame_mod = '' @@ -76,8 +81,6 @@ class RewriterApp(object): def render_content(self, wb_url, kwargs, environ): wb_url = WbUrl(wb_url) - #if wb_url.mod == 'vi_': - # return self._get_video_info(wbrequest) host_prefix = self.get_host_prefix(environ) rel_prefix = self.get_rel_prefix(environ) @@ -95,13 +98,12 @@ class RewriterApp(object): self.unrewrite_referrer(environ) - url = wb_url.url - urlkey = canonicalize(url) + urlkey = canonicalize(wb_url.url) - inputreq = RewriteInputRequest(environ, urlkey, url, + inputreq = RewriteInputRequest(environ, urlkey, wb_url.url, self.content_rewriter) - inputreq.include_post_query(url) + inputreq.include_post_query(wb_url.url) mod_url = None use_206 = False @@ -119,7 +121,6 @@ class RewriterApp(object): # if bytes=0- Range request, # simply remove the range and still proxy if start == 0 and not end and use_206: - url = mod_url wb_url.url = mod_url inputreq.url = mod_url @@ -133,10 +134,10 @@ class RewriterApp(object): setcookie_headers = None if self.cookie_tracker: cookie_key = self.get_cookie_key(kwargs) - res = self.cookie_tracker.get_cookie_headers(url, cookie_key) + res = self.cookie_tracker.get_cookie_headers(wb_url.url, cookie_key) inputreq.extra_cookie, setcookie_headers = res - r = self._do_req(inputreq, url, wb_url, kwargs, skip) + r = self._do_req(inputreq, wb_url, kwargs, skip) if r.status_code >= 400: error = None @@ -152,7 +153,7 @@ class RewriterApp(object): error = '' details = dict(args=kwargs, error=error) - raise UpstreamException(r.status_code, url=url, details=details) + raise UpstreamException(r.status_code, url=wb_url.url, details=details) if async_record_url: environ.pop('HTTP_RANGE', '') @@ -168,7 +169,7 @@ class RewriterApp(object): cdx = CDXObject() cdx['urlkey'] = urlkey cdx['timestamp'] = http_date_to_timestamp(r.headers.get('Memento-Datetime')) - cdx['url'] = url + cdx['url'] = wb_url.url self._add_custom_params(cdx, r.headers, kwargs) @@ -246,8 +247,8 @@ class RewriterApp(object): return WbResponse.text_response(error_html, content_type='text/html') - def _do_req(self, inputreq, url, wb_url, kwargs, skip): - req_data = inputreq.reconstruct_request(url) + def _do_req(self, inputreq, wb_url, kwargs, skip): + req_data = inputreq.reconstruct_request(wb_url.url) headers = {'Content-Length': len(req_data), 'Content-Type': 'application/request'} @@ -260,7 +261,15 @@ class RewriterApp(object): else: closest = wb_url.timestamp - upstream_url = self.get_upstream_url(url, wb_url, closest, kwargs) + params = {} + params['url'] = wb_url.url + params['closest'] = closest + + if wb_url.mod == 'vi_': + params['content_type'] = self.VIDEO_INFO_CONTENT_TYPE + + upstream_url = self.get_upstream_url(wb_url, kwargs, params) + r = requests.post(upstream_url, data=BytesIO(req_data), headers=headers, @@ -269,11 +278,14 @@ class RewriterApp(object): return r def do_query(self, wb_url, kwargs): - upstream_url = self.get_upstream_url(wb_url.url, wb_url, 'now', kwargs) - upstream_url = upstream_url.replace('/resource/postreq', '/index') + params = {} + params['url'] = wb_url.url + params['output'] = 'json' + params['from'] = wb_url.timestamp + params['to'] = wb_url.end_timestamp - upstream_url += '&output=json' - upstream_url += '&from=' + wb_url.timestamp + '&to=' + wb_url.end_timestamp + upstream_url = self.get_upstream_url(wb_url, kwargs, params) + upstream_url = upstream_url.replace('/resource/postreq', '/index') r = requests.get(upstream_url) @@ -362,8 +374,15 @@ class RewriterApp(object): return False - def get_upstream_url(self, url, wb_url, closest, kwargs): - raise NotImplemented() + def get_base_url(self, wb_url, kwargs): + type = kwargs.get('type') + return self.paths[type] + + def get_upstream_url(self, wb_url, kwargs, params): + base_url = self.get_base_url(wb_url, kwargs) + #params['filter'] = tuple(params['filter']) + base_url += '&' + urlencode(params, True) + return base_url def get_cookie_key(self, kwargs): raise NotImplemented() @@ -378,7 +397,6 @@ class RewriterApp(object): def handle_custom_response(self, environ, wb_url, full_prefix, host_prefix, kwargs): if wb_url.is_query(): return self.handle_query(environ, wb_url, kwargs) - #return self.do_query(wb_url, kwargs) if self.framed_replay and wb_url.mod == self.frame_mod: extra_params = self.get_top_frame_params(wb_url, kwargs) diff --git a/urlrewrite/test/simpleapp.py b/urlrewrite/test/simpleapp.py index b741962d..e0137b99 100644 --- a/urlrewrite/test/simpleapp.py +++ b/urlrewrite/test/simpleapp.py @@ -16,13 +16,15 @@ from urlrewrite.cookies import CookieTracker # ============================================================================ class RWApp(RewriterApp): def __init__(self, upstream_urls, cookie_key_templ, redis): - self.upstream_urls = upstream_urls + config = {} + config['url_templates'] = upstream_urls + self.cookie_key_templ = cookie_key_templ self.app = Bottle() self.block_loader = LocalFileLoader() self.init_routes() - super(RWApp, self).__init__(True) + super(RWApp, self).__init__(True, config=config) self.cookie_tracker = CookieTracker(redis) @@ -34,11 +36,6 @@ class RWApp(RewriterApp): traceback.print_exc() return self.orig_error_handler(exc) - def get_upstream_url(self, url, wb_url, closest, kwargs): - type = kwargs.get('type') - return self.upstream_urls[type].format(url=quote(url), - closest=closest) - def get_cookie_key(self, kwargs): return self.cookie_key_templ.format(**kwargs) @@ -58,9 +55,9 @@ class RWApp(RewriterApp): @staticmethod def create_app(replay_port=8080, record_port=8010): - upstream_urls = {'live': 'http://localhost:%s/live/resource/postreq?url={url}&closest={closest}' % replay_port, - 'record': 'http://localhost:%s/live/resource/postreq?url={url}&closest={closest}' % record_port, - 'replay': 'http://localhost:%s/replay/resource/postreq?url={url}&closest={closest}' % replay_port, + upstream_urls = {'live': 'http://localhost:%s/live/resource/postreq?' % replay_port, + 'record': 'http://localhost:%s/live/resource/postreq?' % record_port, + 'replay': 'http://localhost:%s/replay/resource/postreq?' % replay_port, } r = redis.StrictRedis.from_url('redis://localhost/2') diff --git a/webagg/aggregator.py b/webagg/aggregator.py index 8ddbe04f..9ca59b52 100644 --- a/webagg/aggregator.py +++ b/webagg/aggregator.py @@ -30,6 +30,10 @@ class BaseAggregator(object): if params.get('closest') == 'now': params['closest'] = timestamp_now() + content_type = params.get('content_type') + if content_type: + params['filter'] = '=mime:' + content_type + query = CDXQuery(params) cdx_iter, errs = self.load_index(query.params) diff --git a/webagg/handlers.py b/webagg/handlers.py index e7e1acf0..a8e067f3 100644 --- a/webagg/handlers.py +++ b/webagg/handlers.py @@ -1,4 +1,4 @@ -from webagg.responseloader import WARCPathLoader, LiveWebLoader +from webagg.responseloader import WARCPathLoader, LiveWebLoader, VideoLoader from webagg.utils import MementoUtils from pywb.utils.wbexception import BadRequestException, WbException from pywb.utils.wbexception import NotFoundException @@ -165,6 +165,7 @@ class DefaultResourceHandler(ResourceHandler): def __init__(self, index_source, warc_paths=''): loaders = [WARCPathLoader(warc_paths, index_source), LiveWebLoader(), + VideoLoader() ] super(DefaultResourceHandler, self).__init__(index_source, loaders) diff --git a/webagg/indexsource.py b/webagg/indexsource.py index 505e00d0..a52bb11a 100644 --- a/webagg/indexsource.py +++ b/webagg/indexsource.py @@ -92,6 +92,7 @@ class LiveIndexSource(BaseIndexSource): cdx['url'] = params['url'] cdx['load_url'] = res_template(self.proxy_url, params) cdx['is_live'] = 'true' + cdx['mime'] = params.get('content_type', '') def live(): yield cdx diff --git a/webagg/responseloader.py b/webagg/responseloader.py index 7da983cb..d38c27b2 100644 --- a/webagg/responseloader.py +++ b/webagg/responseloader.py @@ -13,11 +13,12 @@ from pywb.warc.resolvingloader import ResolvingLoader from six.moves.urllib.parse import urlsplit -#from io import BytesIO +from io import BytesIO import uuid import six import itertools +import json from requests.models import PreparedRequest import urllib3 @@ -105,6 +106,12 @@ class BaseLoader(object): #print(msg) raise WbException(msg) + @staticmethod + def _make_warc_id(id_=None): + if not id_: + id_ = uuid.uuid1() + return ''.format(id_) + #============================================================================= class PrefixResolver(object): @@ -230,6 +237,9 @@ class LiveWebLoader(BaseLoader): if not load_url: return None + if params.get('content_type') == VideoLoader.CONTENT_TYPE: + return None + input_req = params['_input_req'] req_headers = input_req.get_req_headers() @@ -340,12 +350,56 @@ class LiveWebLoader(BaseLoader): warc_headers = StatusAndHeaders('WARC/1.0', warc_headers.items()) return (warc_headers, http_headers_buff, upstream_res) - @staticmethod - def _make_warc_id(id_=None): - if not id_: - id_ = uuid.uuid1() - return ''.format(id_) - def __str__(self): return 'LiveWebLoader' + +#============================================================================= +class VideoLoader(BaseLoader): + CONTENT_TYPE = 'application/vnd.youtube-dl_formats+json' + + def __init__(self): + try: + from youtube_dl import YoutubeDL as YoutubeDL + except ImportError: + self.ydl = None + return + + self.ydl = YoutubeDL(dict(simulate=True, + youtube_include_dash_manifest=False)) + + self.ydl.add_default_info_extractors() + + def load_resource(self, cdx, params): + load_url = cdx.get('load_url') + if not load_url: + return None + + if params.get('content_type') != self.CONTENT_TYPE: + return None + + if not self.ydl: + return None + + info = self.ydl.extract_info(load_url) + info_buff = json.dumps(info) + info_buff = info_buff.encode('utf-8') + + warc_headers = {} + + schema, rest = load_url.split('://', 1) + target_url = 'metadata://' + rest + + dt = timestamp_to_datetime(cdx['timestamp']) + + warc_headers['WARC-Type'] = 'metadata' + warc_headers['WARC-Record-ID'] = self._make_warc_id() + warc_headers['WARC-Target-URI'] = target_url + warc_headers['WARC-Date'] = datetime_to_iso_date(dt) + warc_headers['Content-Type'] = self.CONTENT_TYPE + warc_headers['Content-Length'] = str(len(info_buff)) + + warc_headers = StatusAndHeaders('WARC/1.0', warc_headers.items()) + + return warc_headers, None, BytesIO(info_buff) + diff --git a/webagg/test/test_handlers.py b/webagg/test/test_handlers.py index 84bf4e5a..603f109d 100644 --- a/webagg/test/test_handlers.py +++ b/webagg/test/test_handlers.py @@ -14,6 +14,7 @@ from webagg.utils import MementoUtils from pywb.utils.statusandheaders import StatusAndHeadersParser from pywb.utils.bufferedreaders import ChunkedDataReader from io import BytesIO +from six.moves.urllib.parse import urlencode import webtest from fakeredis import FakeStrictRedis @@ -330,6 +331,47 @@ foo=bar&test=abc""" assert resp.headers['WebAgg-Source-Coll'] == 'example' + def test_live_video_loader(self): + params = {'url': 'http://www.youtube.com/v/BfBgWtAIbRc', + 'content_type': 'application/vnd.youtube-dl_formats+json' + } + + resp = self.testapp.get('/live/resource', params=params) + + assert resp.headers['WebAgg-Source-Coll'] == 'live' + + self._check_uri_date(resp, 'metadata://www.youtube.com/v/BfBgWtAIbRc', True) + + assert resp.headers['Link'] == MementoUtils.make_link('metadata://www.youtube.com/v/BfBgWtAIbRc', 'original') + assert resp.headers['Memento-Datetime'] != '' + + assert b'WARC-Type: metadata' in resp.body + assert b'Content-Type: application/vnd.youtube-dl_formats+json' in resp.body + + def test_live_video_loader_post(self): + req_data = """\ +GET /v/BfBgWtAIbRc HTTP/1.1 +accept-encoding: gzip, deflate +accept: */* +host: www.youtube.com\ +""" + + params = {'url': 'http://www.youtube.com/v/BfBgWtAIbRc', + 'content_type': 'application/vnd.youtube-dl_formats+json' + } + + resp = self.testapp.post('/live/resource/postreq?&' + urlencode(params), req_data) + + assert resp.headers['WebAgg-Source-Coll'] == 'live' + + self._check_uri_date(resp, 'metadata://www.youtube.com/v/BfBgWtAIbRc', True) + + assert resp.headers['Link'] == MementoUtils.make_link('metadata://www.youtube.com/v/BfBgWtAIbRc', 'original') + assert resp.headers['Memento-Datetime'] != '' + + assert b'WARC-Type: metadata' in resp.body + assert b'Content-Type: application/vnd.youtube-dl_formats+json' in resp.body + def test_error_redis_file_not_found(self): f = FakeStrictRedis.from_url('redis://localhost/2') f.hset('test:warc', 'example.warc.gz', './testdata/example2.warc.gz')