From 398e8f1a7749572d0cea523741e808195bb6cbfe Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 24 Feb 2016 14:22:29 -0800 Subject: [PATCH] inputrequest: add input request handling (direct wsgi headers) or as a prepared post request add timemap link output rename source_name -> source --- aggindexsource.py | 80 ++++++++++++-------- indexsource.py | 43 ++++++++--- inputrequest.py | 136 +++++++++++++++++++++++++++++++++ liverec.py | 2 +- responseloader.py | 168 +++++++++++++++++++++++++++++++++++++---- test_aggindexsource.py | 45 ++++++----- utils.py | 43 +++++++++++ 7 files changed, 441 insertions(+), 76 deletions(-) create mode 100644 inputrequest.py diff --git a/aggindexsource.py b/aggindexsource.py index 12af280a..546032f5 100644 --- a/aggindexsource.py +++ b/aggindexsource.py @@ -2,29 +2,27 @@ from gevent.pool import Pool import gevent import json import time +import os from heapq import merge from collections import deque -from indexsource import BaseIndexSource +from indexsource import BaseIndexSource, FileIndexSource from pywb.utils.wbexception import NotFoundException #============================================================================= class BaseAggIndexSource(BaseIndexSource): - def __init__(self, sources): - self.sources = sources - def do_query(self, name, source, params): try: - cdx_iter = source.load_index(params) + cdx_iter = source.load_index(dict(params)) except NotFoundException as nf: print('Not found in ' + name) cdx_iter = iter([]) def add_name(cdx_iter): for cdx in cdx_iter: - cdx['source_name'] = name + cdx['source'] = name yield cdx return add_name(cdx_iter) @@ -36,6 +34,9 @@ class BaseAggIndexSource(BaseIndexSource): return cdx_iter + def _load_all(self): + raise NotImplemented() + #============================================================================= class TimingOutMixin(object): @@ -63,7 +64,7 @@ class TimingOutMixin(object): return False def get_valid_sources(self, sources): - for name in sources.keys(): + for name in sources: if not self.is_timed_out(name): yield name @@ -79,10 +80,19 @@ class TimingOutMixin(object): #============================================================================= class GeventAggIndexSource(BaseAggIndexSource): def __init__(self, sources, timeout=5.0, size=None): - super(GeventAggIndexSource, self).__init__(sources) + self.sources = sources self.pool = Pool(size=size) self.timeout = timeout + def get_sources(self, params): + srcs_list = params.get('sources') + if not srcs_list: + return self.sources + + sel_sources = tuple(srcs_list.split(',')) + + return [src for src in self.sources if src in sel_sources] + def get_valid_sources(self, sources): return sources.keys() @@ -90,15 +100,18 @@ class GeventAggIndexSource(BaseAggIndexSource): pass def _load_all(self, params): + params['_timeout'] = self.timeout + def do_spawn(n): return self.pool.spawn(self.do_query, n, self.sources[n], params) - jobs = [do_spawn(src) for src in self.get_valid_sources(self.sources)] + sources = self.get_sources(params) + jobs = [do_spawn(src) for src in self.get_valid_sources(sources)] gevent.joinall(jobs, timeout=self.timeout) res = [] - for name, job in zip(self.sources.keys(), jobs): + for name, job in zip(sources, jobs): if job.value: res.append(job.value) else: @@ -113,29 +126,30 @@ class AggIndexSource(TimingOutMixin, GeventAggIndexSource): #============================================================================= -class SimpleAggIndexSource(BaseAggIndexSource): +class DirAggIndexSource(BaseAggIndexSource): + CDX_EXT = ('.cdx', '.cdxj') + + def __init__(self, base_dir): + self.index_template = base_dir + + def _init_files(self, the_dir): + sources = {} + for name in os.listdir(the_dir): + filename = os.path.join(the_dir, name) + + if filename.endswith(self.CDX_EXT): + print('Adding ' + filename) + sources[name] = FileIndexSource(filename) + + return sources + def _load_all(self, params): - return list(map(lambda n: self.do_query(n, self.sources[n], params), - self.sources)) - - -#============================================================================= -class ResourceLoadAgg(object): - def __init__(self, load_index, load_resource): - self.load_index = load_index - self.load_resource = load_resource - - def __call__(self, params): - cdx_iter = self.load_index(params) - for cdx in cdx_iter: - for loader in self.load_resource: - try: - resp = loader(cdx) - if resp: - return resp - except Exception: - pass - - raise Exception('Not Found') + the_dir = self.get_index(params) + try: + sources = self._init_files(the_dir) + except Exception: + raise NotFoundException(the_dir) + return list([self.do_query(src, sources[src], params) + for src in sources.keys()]) diff --git a/indexsource.py b/indexsource.py index 4d6971a9..e332e6bf 100644 --- a/indexsource.py +++ b/indexsource.py @@ -21,10 +21,14 @@ class BaseIndexSource(object): self.index_template = index_template def get_index(self, params): - return self.index_template.format(params.get('coll')) + res = self.index_template.format(**params) + return res + + def load_index(self, params): + raise NotImplemented() def __call__(self, params): - query = CDXQuery(**params) + query = CDXQuery(params) try: cdx_iter = self.load_index(query.params) @@ -34,10 +38,20 @@ class BaseIndexSource(object): cdx_iter = process_cdx(cdx_iter, query) return cdx_iter + def _include_post_query(self, params): + input_req = params.get('_input_req') + if input_req: + orig_url = params['url'] + params['url'] = input_req.include_post_query(params['url']) + return (params['url'] != orig_url) + #============================================================================= class FileIndexSource(BaseIndexSource): def load_index(self, params): + if self._include_post_query(params): + params = CDXQuery(params).params + filename = self.get_index(params) with open(filename, 'rb') as fh: @@ -45,6 +59,8 @@ class FileIndexSource(BaseIndexSource): for line in gen: yield CDXObject(line) + #return do_load(filename) + #============================================================================= class RemoteIndexSource(BaseIndexSource): @@ -53,11 +69,14 @@ class RemoteIndexSource(BaseIndexSource): self.replay_url = replay_url def load_index(self, params): - url = self.get_index(params) - url += '?url=' + params['url'] - r = requests.get(url) + if self._include_post_query(params): + params = CDXQuery(**params).params + + api_url = self.get_index(params) + api_url += '?url=' + params['url'] + r = requests.get(api_url, timeout=params.get('_timeout')) if r.status_code >= 400: - raise NotFoundException(url) + raise NotFoundException(api_url) lines = r.content.strip().split(b'\n') def do_load(lines): @@ -103,8 +122,11 @@ class RedisIndexSource(BaseIndexSource): b'[' + params['key'], b'(' + params['end_key']) - for line in index_list: - yield CDXObject(line) + def do_load(index_list): + for line in index_list: + yield CDXObject(line) + + return do_load(index_list) #============================================================================= @@ -166,7 +188,7 @@ class MementoIndexSource(BaseIndexSource): def get_timemap_links(self, params): url = self.timemap_url + params['url'] - res = requests.get(url) + res = requests.get(url, timeout=params.get('_timeout')) if res.status_code >= 400: raise NotFoundException(url) @@ -182,9 +204,6 @@ class MementoIndexSource(BaseIndexSource): links = self.get_timegate_links(params, closest) def_name = 'timegate' - #if not links: - # return iter([]) - return self.links_to_cdxobject(links, def_name) @staticmethod diff --git a/inputrequest.py b/inputrequest.py new file mode 100644 index 00000000..4e8964e3 --- /dev/null +++ b/inputrequest.py @@ -0,0 +1,136 @@ +from pywb.utils.loaders import extract_client_cookie +from pywb.utils.loaders import extract_post_query, append_post_query +from pywb.utils.loaders import LimitReader +from pywb.utils.statusandheaders import StatusAndHeadersParser + +from six.moves.urllib.parse import urlsplit +from six import StringIO +import six + + +#============================================================================= +class WSGIInputRequest(object): + def __init__(self, env): + self.env = env + + def get_req_method(self): + return self.env['REQUEST_METHOD'].upper() + + def get_req_headers(self, url): + headers = {} + + splits = urlsplit(url) + + for name, value in six.iteritems(self.env): + if name == 'HTTP_HOST': + name = 'Host' + value = splits.netloc + + elif name == 'HTTP_ORIGIN': + name = 'Origin' + value = (splits.scheme + '://' + splits.netloc) + + elif name == 'HTTP_X_CSRFTOKEN': + name = 'X-CSRFToken' + cookie_val = extract_client_cookie(env, 'csrftoken') + if cookie_val: + value = cookie_val + + elif name == 'HTTP_X_FORWARDED_PROTO': + name = 'X-Forwarded-Proto' + value = splits.scheme + + elif name.startswith('HTTP_'): + name = name[5:].title().replace('_', '-') + + elif name in ('CONTENT_LENGTH', 'CONTENT_TYPE'): + name = name.title().replace('_', '-') + + else: + value = None + + if value: + headers[name] = value + + return headers + + def get_req_body(self): + input_ = self.env.get('wsgi.input') + if not input_: + return None + + len_ = self._get_content_length() + enc = self._get_header('Transfer-Encoding') + + if len_: + data = LimitReader(input_, int(len_)) + elif enc: + data = input_ + else: + data = None + + return data + #buf = data.read().decode('utf-8') + #print(buf) + #return StringIO(buf) + + def _get_content_type(self): + return self.env.get('CONTENT_TYPE') + + def _get_content_length(self): + return self.env.get('CONTENT_LENGTH') + + def _get_header(self, name): + return self.env.get('HTTP_' + name.upper().replace('-', '_')) + + def include_post_query(self, url): + if self.get_req_method() != 'POST': + return url + + mime = self._get_content_type() + mime = mime.split(';')[0] if mime else '' + length = self._get_content_length() + stream = self.env['wsgi.input'] + + buffered_stream = StringIO() + + post_query = extract_post_query('POST', mime, length, stream, + buffered_stream=buffered_stream) + + if post_query: + self.env['wsgi.input'] = buffered_stream + url = append_post_query(url, post_query) + + return url + + +#============================================================================= +class POSTInputRequest(WSGIInputRequest): + def __init__(self, env): + self.env = env + + parser = StatusAndHeadersParser([], verify=False) + + self.status_headers = parser.parse(self.env['wsgi.input']) + + def get_req_method(self): + return self.status_headers.protocol + + def get_req_headers(self, url): + headers = {} + for n, v in self.status_headers.headers: + headers[n] = v + + return headers + + def _get_content_type(self): + return self.status_headers.get_header('Content-Type') + + def _get_content_length(self): + return self.status_headers.get_header('Content-Length') + + def _get_header(self, name): + return self.status_headers.get_header(name) + + + diff --git a/liverec.py b/liverec.py index c17d39d0..5d8bacf0 100644 --- a/liverec.py +++ b/liverec.py @@ -95,7 +95,7 @@ class RecordingHTTPConnection(httplib.HTTPConnection): if hasattr(data,'read') and not isinstance(data, array): url = None while True: - buff = data.read(self.BUFF_SIZE) + buff = data.read(BUFF_SIZE) if not buff: break diff --git a/responseloader.py b/responseloader.py index baf9d7bc..17533d40 100644 --- a/responseloader.py +++ b/responseloader.py @@ -9,6 +9,7 @@ from io import BytesIO from bottle import response import uuid +from utils import MementoUtils #============================================================================= @@ -23,24 +24,46 @@ def incr_reader(stream, header=None, size=8192): else: break + try: + stream.close() + except: + pass + #============================================================================= -class WARCPathPrefixLoader(object): - def __init__(self, prefix, cdx_loader): - self.prefix = prefix +class WARCPathLoader(object): + def __init__(self, paths, cdx_source): + self.paths = paths + if isinstance(paths, str): + self.paths = [paths] - def add_prefix(filename, cdx): - return [self.prefix + filename] + self.path_checks = list(self.warc_paths()) - self.resolve_loader = ResolvingLoader([add_prefix], no_record_parse=True) - self.cdx_loader = cdx_loader + self.resolve_loader = ResolvingLoader(self.path_checks, + no_record_parse=True) + self.cdx_source = cdx_source - def __call__(self, cdx): + def warc_paths(self): + for path in self.paths: + def check(filename, cdx): + try: + full_path = path.format(**cdx) + return full_path + except KeyError: + return None + + yield check + + + def __call__(self, cdx, params): if not cdx.get('filename') or cdx.get('offset') is None: return None failed_files = [] - headers, payload = self.resolve_loader.load_headers_and_payload(cdx, failed_files, self.cdx_loader) + headers, payload = (self.resolve_loader. + load_headers_and_payload(cdx, + failed_files, + self.cdx_source)) if headers != payload: headers.stream.close() @@ -50,6 +73,8 @@ class WARCPathPrefixLoader(object): for n, v in record.rec_headers.headers: response.headers[n] = v + response.headers['WARC-Coll'] = cdx.get('source') + return incr_reader(record.stream) @@ -82,24 +107,33 @@ class LiveWebLoader(object): b'content-location', b'x-archive') - def __call__(self, cdx): + def __call__(self, cdx, params): load_url = cdx.get('load_url') if not load_url: return None recorder = HeaderRecorder(self.SKIP_HEADERS) - req_headers = {} + input_req = params['_input_req'] + + req_headers = input_req.get_req_headers(cdx['url']) dt = timestamp_to_datetime(cdx['timestamp']) if not cdx.get('is_live'): req_headers['Accept-Datetime'] = datetime_to_http_date(dt) - upstream_res = remote_request(load_url, + method = input_req.get_req_method() + data = input_req.get_req_body() + + upstream_res = remote_request(url=load_url, + method=method, recorder=recorder, stream=True, - headers=req_headers) + allow_redirects=False, + headers=req_headers, + data=data, + timeout=params.get('_timeout')) resp_headers = recorder.get_header() @@ -109,6 +143,7 @@ class LiveWebLoader(object): #response.headers['WARC-Record-ID'] = self._make_warc_id() response.headers['WARC-Target-URI'] = cdx['url'] response.headers['WARC-Date'] = self._make_date(dt) + response.headers['WARC-Coll'] = cdx.get('source', '') # Try to set content-length, if it is available and valid try: @@ -131,3 +166,110 @@ class LiveWebLoader(object): id_ = uuid.uuid1() return ''.format(id_) + +#============================================================================= +def to_cdxj(cdx_iter, fields): + response.headers['Content-Type'] = 'text/x-cdxj' + return [cdx.to_cdxj(fields) for cdx in cdx_iter] + +def to_json(cdx_iter, fields): + response.headers['Content-Type'] = 'application/x-ndjson' + return [cdx.to_json(fields) for cdx in cdx_iter] + +def to_text(cdx_iter, fields): + response.headers['Content-Type'] = 'text/plain' + return [cdx.to_text(fields) for cdx in cdx_iter] + +def to_link(cdx_iter, fields): + response.headers['Content-Type'] = 'application/link' + return MementoUtils.make_timemap(cdx_iter) + + +#============================================================================= +class IndexLoader(object): + OUTPUTS = { + 'cdxj': to_cdxj, + 'json': to_json, + 'text': to_text, + 'link': to_link, + } + + DEF_OUTPUT = 'cdxj' + + def __init__(self, index_source): + self.index_source = index_source + + def __call__(self, params): + cdx_iter = self.index_source(params) + + output = params.get('output', self.DEF_OUTPUT) + fields = params.get('fields') + + handler = self.OUTPUTS.get(output) + if not handler: + handler = self.OUTPUTS[self.DEF_OUTPUT] + + res = handler(cdx_iter, fields) + return res + + +#============================================================================= +class ResourceLoader(IndexLoader): + def __init__(self, index_source, resource_loaders): + super(ResourceLoader, self).__init__(index_source) + self.resource_loaders = resource_loaders + + def __call__(self, params): + output = params.get('output') + if output != 'resource': + return super(ResourceLoader, self).__call__(params) + + cdx_iter = self.index_source(params) + + any_found = False + + for cdx in cdx_iter: + any_found = True + cdx['coll'] = params.get('coll', '') + + for loader in self.resource_loaders: + try: + resp = loader(cdx, params) + if resp: + return resp + except ArchiveLoadFailed as e: + print(e) + pass + + if any_found: + raise ArchiveLoadFailed('Resource Found, could not be Loaded') + else: + raise ArchiveLoadFailed('No Resource Found') + + +#============================================================================= +class DefaultResourceLoader(ResourceLoader): + def __init__(self, index_source, warc_paths=''): + loaders = [WARCPathLoader(warc_paths, index_source), + LiveWebLoader() + ] + super(DefaultResourceLoader, self).__init__(index_source, loaders) + + +#============================================================================= +class LoaderSeq(object): + def __init__(self, loaders): + self.loaders = loaders + + def __call__(self, params): + for loader in self.loaders: + try: + res = loader(params) + if res: + return res + except ArchiveLoadFailed: + pass + + raise ArchiveLoadFailed('No Resource Found') + + diff --git a/test_aggindexsource.py b/test_aggindexsource.py index d0866c0a..db93dd26 100644 --- a/test_aggindexsource.py +++ b/test_aggindexsource.py @@ -15,7 +15,7 @@ sources = { source = AggIndexSource(sources, timeout=5.0) -def select_json(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source_name']): +def select_json(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source']): return list([json.loads(cdx.to_json(fields)) for cdx in cdxlist]) @@ -24,11 +24,11 @@ def test_agg_index_1(): res = source(dict(url=url, closest='20140126000000', limit=5)) - exp = [{"timestamp": "20140126093743", "load_url": "http://web.archive.org/web/20140126093743id_/http://iana.org/", "source_name": "ia"}, - {"timestamp": "20140126200624", "filename": "iana.warc.gz", "source_name": "local"}, - {"timestamp": "20140123034755", "load_url": "http://web.archive.org/web/20140123034755id_/http://iana.org/", "source_name": "ia"}, - {"timestamp": "20140129175203", "load_url": "http://web.archive.org/web/20140129175203id_/http://iana.org/", "source_name": "ia"}, - {"timestamp": "20140107040552", "load_url": "http://wayback.archive-it.org/all/20140107040552id_/http://iana.org/", "source_name": "ait"} + exp = [{"timestamp": "20140126093743", "load_url": "http://web.archive.org/web/20140126093743id_/http://iana.org/", "source": "ia"}, + {"timestamp": "20140126200624", "filename": "iana.warc.gz", "source": "local"}, + {"timestamp": "20140123034755", "load_url": "http://web.archive.org/web/20140123034755id_/http://iana.org/", "source": "ia"}, + {"timestamp": "20140129175203", "load_url": "http://web.archive.org/web/20140129175203id_/http://iana.org/", "source": "ia"}, + {"timestamp": "20140107040552", "load_url": "http://wayback.archive-it.org/all/20140107040552id_/http://iana.org/", "source": "ait"} ] assert(select_json(res) == exp) @@ -38,12 +38,12 @@ def test_agg_index_2(): url = 'http://example.com/' res = source(dict(url=url, closest='20100512', limit=6)) - exp = [{"timestamp": "20100513010014", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100513010014id_/http://example.com/", "source_name": "bl"}, - {"timestamp": "20100512204410", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100512204410id_/http://example.com/", "source_name": "bl"}, - {"timestamp": "20100513052358", "load_url": "http://web.archive.org/web/20100513052358id_/http://example.com/", "source_name": "ia"}, - {"timestamp": "20100511201151", "load_url": "http://wayback.archive-it.org/all/20100511201151id_/http://example.com/", "source_name": "ait"}, - {"timestamp": "20100514231857", "load_url": "http://wayback.archive-it.org/all/20100514231857id_/http://example.com/", "source_name": "ait"}, - {"timestamp": "20100514231857", "load_url": "http://web.archive.org/web/20100514231857id_/http://example.com/", "source_name": "ia"}] + exp = [{"timestamp": "20100513010014", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100513010014id_/http://example.com/", "source": "bl"}, + {"timestamp": "20100512204410", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100512204410id_/http://example.com/", "source": "bl"}, + {"timestamp": "20100513052358", "load_url": "http://web.archive.org/web/20100513052358id_/http://example.com/", "source": "ia"}, + {"timestamp": "20100511201151", "load_url": "http://wayback.archive-it.org/all/20100511201151id_/http://example.com/", "source": "ait"}, + {"timestamp": "20100514231857", "load_url": "http://wayback.archive-it.org/all/20100514231857id_/http://example.com/", "source": "ait"}, + {"timestamp": "20100514231857", "load_url": "http://web.archive.org/web/20100514231857id_/http://example.com/", "source": "ia"}] assert(select_json(res) == exp) @@ -52,11 +52,22 @@ def test_agg_index_3(): url = 'http://vvork.com/' res = source(dict(url=url, closest='20141001', limit=5)) - exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source_name": "rhiz"}, - {"timestamp": "20141018133107", "load_url": "http://web.archive.org/web/20141018133107id_/http://vvork.com/", "source_name": "ia"}, - {"timestamp": "20141020161243", "load_url": "http://web.archive.org/web/20141020161243id_/http://vvork.com/", "source_name": "ia"}, - {"timestamp": "20140806161228", "load_url": "http://web.archive.org/web/20140806161228id_/http://vvork.com/", "source_name": "ia"}, - {"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source_name": "ait"}] + exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source": "rhiz"}, + {"timestamp": "20141018133107", "load_url": "http://web.archive.org/web/20141018133107id_/http://vvork.com/", "source": "ia"}, + {"timestamp": "20141020161243", "load_url": "http://web.archive.org/web/20141020161243id_/http://vvork.com/", "source": "ia"}, + {"timestamp": "20140806161228", "load_url": "http://web.archive.org/web/20140806161228id_/http://vvork.com/", "source": "ia"}, + {"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source": "ait"}] assert(select_json(res) == exp) + +def test_agg_index_4(): + url = 'http://vvork.com/' + res = source(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait')) + + exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source": "rhiz"}, + {"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source": "ait"}] + + assert(select_json(res) == exp) + + diff --git a/utils.py b/utils.py index 6f9df22d..6d36162c 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,8 @@ import re +import six + +from pywb.utils.timeutils import timestamp_to_http_date + LINK_SPLIT = re.compile(',\s*(?=[<])') LINK_SEG_SPLIT = re.compile(';\s*') @@ -50,3 +54,42 @@ class MementoUtils(object): results['mementos'] = mementos return results + + @staticmethod + def make_timemap_memento_link(cdx, datetime=None, rel='memento', end=',\n'): + + url = cdx.get('load_url') + if not url: + url = 'filename://' + cdx.get('filename') + + memento = '<{0}>; rel="{1}"; datetime="{2}"; src="{3}"' + end + + if not datetime: + datetime = timestamp_to_http_date(cdx['timestamp']) + + return memento.format(url, rel, datetime, cdx.get('source', '')) + + + @staticmethod + def make_timemap(cdx_iter): + # get first memento as it'll be used for 'from' field + try: + first_cdx = six.next(cdx_iter) + from_date = timestamp_to_http_date(first_cdx['timestamp']) + except StopIteration: + first_cdx = None + + # first memento link + yield MementoUtils.make_timemap_memento_link(first_cdx, datetime=from_date) + + prev_cdx = None + + for cdx in cdx_iter: + if prev_cdx: + yield MementoUtils.make_timemap_memento_link(prev_cdx) + + prev_cdx = cdx + + # last memento link, if any + if prev_cdx: + yield MementoUtils.make_timemap_memento_link(prev_cdx, end='')