1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

inputrequest: add input request handling (direct wsgi headers) or as a prepared post request

add timemap link output
rename source_name -> source
This commit is contained in:
Ilya Kreymer 2016-02-24 14:22:29 -08:00
parent 1a0b2fba17
commit 398e8f1a77
7 changed files with 441 additions and 76 deletions

View File

@ -2,29 +2,27 @@ from gevent.pool import Pool
import gevent
import json
import time
import os
from heapq import merge
from collections import deque
from indexsource import BaseIndexSource
from indexsource import BaseIndexSource, FileIndexSource
from pywb.utils.wbexception import NotFoundException
#=============================================================================
class BaseAggIndexSource(BaseIndexSource):
def __init__(self, sources):
self.sources = sources
def do_query(self, name, source, params):
try:
cdx_iter = source.load_index(params)
cdx_iter = source.load_index(dict(params))
except NotFoundException as nf:
print('Not found in ' + name)
cdx_iter = iter([])
def add_name(cdx_iter):
for cdx in cdx_iter:
cdx['source_name'] = name
cdx['source'] = name
yield cdx
return add_name(cdx_iter)
@ -36,6 +34,9 @@ class BaseAggIndexSource(BaseIndexSource):
return cdx_iter
def _load_all(self):
raise NotImplemented()
#=============================================================================
class TimingOutMixin(object):
@ -63,7 +64,7 @@ class TimingOutMixin(object):
return False
def get_valid_sources(self, sources):
for name in sources.keys():
for name in sources:
if not self.is_timed_out(name):
yield name
@ -79,10 +80,19 @@ class TimingOutMixin(object):
#=============================================================================
class GeventAggIndexSource(BaseAggIndexSource):
def __init__(self, sources, timeout=5.0, size=None):
super(GeventAggIndexSource, self).__init__(sources)
self.sources = sources
self.pool = Pool(size=size)
self.timeout = timeout
def get_sources(self, params):
srcs_list = params.get('sources')
if not srcs_list:
return self.sources
sel_sources = tuple(srcs_list.split(','))
return [src for src in self.sources if src in sel_sources]
def get_valid_sources(self, sources):
return sources.keys()
@ -90,15 +100,18 @@ class GeventAggIndexSource(BaseAggIndexSource):
pass
def _load_all(self, params):
params['_timeout'] = self.timeout
def do_spawn(n):
return self.pool.spawn(self.do_query, n, self.sources[n], params)
jobs = [do_spawn(src) for src in self.get_valid_sources(self.sources)]
sources = self.get_sources(params)
jobs = [do_spawn(src) for src in self.get_valid_sources(sources)]
gevent.joinall(jobs, timeout=self.timeout)
res = []
for name, job in zip(self.sources.keys(), jobs):
for name, job in zip(sources, jobs):
if job.value:
res.append(job.value)
else:
@ -113,29 +126,30 @@ class AggIndexSource(TimingOutMixin, GeventAggIndexSource):
#=============================================================================
class SimpleAggIndexSource(BaseAggIndexSource):
class DirAggIndexSource(BaseAggIndexSource):
CDX_EXT = ('.cdx', '.cdxj')
def __init__(self, base_dir):
self.index_template = base_dir
def _init_files(self, the_dir):
sources = {}
for name in os.listdir(the_dir):
filename = os.path.join(the_dir, name)
if filename.endswith(self.CDX_EXT):
print('Adding ' + filename)
sources[name] = FileIndexSource(filename)
return sources
def _load_all(self, params):
return list(map(lambda n: self.do_query(n, self.sources[n], params),
self.sources))
#=============================================================================
class ResourceLoadAgg(object):
def __init__(self, load_index, load_resource):
self.load_index = load_index
self.load_resource = load_resource
def __call__(self, params):
cdx_iter = self.load_index(params)
for cdx in cdx_iter:
for loader in self.load_resource:
try:
resp = loader(cdx)
if resp:
return resp
except Exception:
pass
raise Exception('Not Found')
the_dir = self.get_index(params)
try:
sources = self._init_files(the_dir)
except Exception:
raise NotFoundException(the_dir)
return list([self.do_query(src, sources[src], params)
for src in sources.keys()])

View File

@ -21,10 +21,14 @@ class BaseIndexSource(object):
self.index_template = index_template
def get_index(self, params):
return self.index_template.format(params.get('coll'))
res = self.index_template.format(**params)
return res
def load_index(self, params):
raise NotImplemented()
def __call__(self, params):
query = CDXQuery(**params)
query = CDXQuery(params)
try:
cdx_iter = self.load_index(query.params)
@ -34,10 +38,20 @@ class BaseIndexSource(object):
cdx_iter = process_cdx(cdx_iter, query)
return cdx_iter
def _include_post_query(self, params):
input_req = params.get('_input_req')
if input_req:
orig_url = params['url']
params['url'] = input_req.include_post_query(params['url'])
return (params['url'] != orig_url)
#=============================================================================
class FileIndexSource(BaseIndexSource):
def load_index(self, params):
if self._include_post_query(params):
params = CDXQuery(params).params
filename = self.get_index(params)
with open(filename, 'rb') as fh:
@ -45,6 +59,8 @@ class FileIndexSource(BaseIndexSource):
for line in gen:
yield CDXObject(line)
#return do_load(filename)
#=============================================================================
class RemoteIndexSource(BaseIndexSource):
@ -53,11 +69,14 @@ class RemoteIndexSource(BaseIndexSource):
self.replay_url = replay_url
def load_index(self, params):
url = self.get_index(params)
url += '?url=' + params['url']
r = requests.get(url)
if self._include_post_query(params):
params = CDXQuery(**params).params
api_url = self.get_index(params)
api_url += '?url=' + params['url']
r = requests.get(api_url, timeout=params.get('_timeout'))
if r.status_code >= 400:
raise NotFoundException(url)
raise NotFoundException(api_url)
lines = r.content.strip().split(b'\n')
def do_load(lines):
@ -103,8 +122,11 @@ class RedisIndexSource(BaseIndexSource):
b'[' + params['key'],
b'(' + params['end_key'])
for line in index_list:
yield CDXObject(line)
def do_load(index_list):
for line in index_list:
yield CDXObject(line)
return do_load(index_list)
#=============================================================================
@ -166,7 +188,7 @@ class MementoIndexSource(BaseIndexSource):
def get_timemap_links(self, params):
url = self.timemap_url + params['url']
res = requests.get(url)
res = requests.get(url, timeout=params.get('_timeout'))
if res.status_code >= 400:
raise NotFoundException(url)
@ -182,9 +204,6 @@ class MementoIndexSource(BaseIndexSource):
links = self.get_timegate_links(params, closest)
def_name = 'timegate'
#if not links:
# return iter([])
return self.links_to_cdxobject(links, def_name)
@staticmethod

136
inputrequest.py Normal file
View File

@ -0,0 +1,136 @@
from pywb.utils.loaders import extract_client_cookie
from pywb.utils.loaders import extract_post_query, append_post_query
from pywb.utils.loaders import LimitReader
from pywb.utils.statusandheaders import StatusAndHeadersParser
from six.moves.urllib.parse import urlsplit
from six import StringIO
import six
#=============================================================================
class WSGIInputRequest(object):
def __init__(self, env):
self.env = env
def get_req_method(self):
return self.env['REQUEST_METHOD'].upper()
def get_req_headers(self, url):
headers = {}
splits = urlsplit(url)
for name, value in six.iteritems(self.env):
if name == 'HTTP_HOST':
name = 'Host'
value = splits.netloc
elif name == 'HTTP_ORIGIN':
name = 'Origin'
value = (splits.scheme + '://' + splits.netloc)
elif name == 'HTTP_X_CSRFTOKEN':
name = 'X-CSRFToken'
cookie_val = extract_client_cookie(env, 'csrftoken')
if cookie_val:
value = cookie_val
elif name == 'HTTP_X_FORWARDED_PROTO':
name = 'X-Forwarded-Proto'
value = splits.scheme
elif name.startswith('HTTP_'):
name = name[5:].title().replace('_', '-')
elif name in ('CONTENT_LENGTH', 'CONTENT_TYPE'):
name = name.title().replace('_', '-')
else:
value = None
if value:
headers[name] = value
return headers
def get_req_body(self):
input_ = self.env.get('wsgi.input')
if not input_:
return None
len_ = self._get_content_length()
enc = self._get_header('Transfer-Encoding')
if len_:
data = LimitReader(input_, int(len_))
elif enc:
data = input_
else:
data = None
return data
#buf = data.read().decode('utf-8')
#print(buf)
#return StringIO(buf)
def _get_content_type(self):
return self.env.get('CONTENT_TYPE')
def _get_content_length(self):
return self.env.get('CONTENT_LENGTH')
def _get_header(self, name):
return self.env.get('HTTP_' + name.upper().replace('-', '_'))
def include_post_query(self, url):
if self.get_req_method() != 'POST':
return url
mime = self._get_content_type()
mime = mime.split(';')[0] if mime else ''
length = self._get_content_length()
stream = self.env['wsgi.input']
buffered_stream = StringIO()
post_query = extract_post_query('POST', mime, length, stream,
buffered_stream=buffered_stream)
if post_query:
self.env['wsgi.input'] = buffered_stream
url = append_post_query(url, post_query)
return url
#=============================================================================
class POSTInputRequest(WSGIInputRequest):
def __init__(self, env):
self.env = env
parser = StatusAndHeadersParser([], verify=False)
self.status_headers = parser.parse(self.env['wsgi.input'])
def get_req_method(self):
return self.status_headers.protocol
def get_req_headers(self, url):
headers = {}
for n, v in self.status_headers.headers:
headers[n] = v
return headers
def _get_content_type(self):
return self.status_headers.get_header('Content-Type')
def _get_content_length(self):
return self.status_headers.get_header('Content-Length')
def _get_header(self, name):
return self.status_headers.get_header(name)

View File

@ -95,7 +95,7 @@ class RecordingHTTPConnection(httplib.HTTPConnection):
if hasattr(data,'read') and not isinstance(data, array):
url = None
while True:
buff = data.read(self.BUFF_SIZE)
buff = data.read(BUFF_SIZE)
if not buff:
break

View File

@ -9,6 +9,7 @@ from io import BytesIO
from bottle import response
import uuid
from utils import MementoUtils
#=============================================================================
@ -23,24 +24,46 @@ def incr_reader(stream, header=None, size=8192):
else:
break
try:
stream.close()
except:
pass
#=============================================================================
class WARCPathPrefixLoader(object):
def __init__(self, prefix, cdx_loader):
self.prefix = prefix
class WARCPathLoader(object):
def __init__(self, paths, cdx_source):
self.paths = paths
if isinstance(paths, str):
self.paths = [paths]
def add_prefix(filename, cdx):
return [self.prefix + filename]
self.path_checks = list(self.warc_paths())
self.resolve_loader = ResolvingLoader([add_prefix], no_record_parse=True)
self.cdx_loader = cdx_loader
self.resolve_loader = ResolvingLoader(self.path_checks,
no_record_parse=True)
self.cdx_source = cdx_source
def __call__(self, cdx):
def warc_paths(self):
for path in self.paths:
def check(filename, cdx):
try:
full_path = path.format(**cdx)
return full_path
except KeyError:
return None
yield check
def __call__(self, cdx, params):
if not cdx.get('filename') or cdx.get('offset') is None:
return None
failed_files = []
headers, payload = self.resolve_loader.load_headers_and_payload(cdx, failed_files, self.cdx_loader)
headers, payload = (self.resolve_loader.
load_headers_and_payload(cdx,
failed_files,
self.cdx_source))
if headers != payload:
headers.stream.close()
@ -50,6 +73,8 @@ class WARCPathPrefixLoader(object):
for n, v in record.rec_headers.headers:
response.headers[n] = v
response.headers['WARC-Coll'] = cdx.get('source')
return incr_reader(record.stream)
@ -82,24 +107,33 @@ class LiveWebLoader(object):
b'content-location',
b'x-archive')
def __call__(self, cdx):
def __call__(self, cdx, params):
load_url = cdx.get('load_url')
if not load_url:
return None
recorder = HeaderRecorder(self.SKIP_HEADERS)
req_headers = {}
input_req = params['_input_req']
req_headers = input_req.get_req_headers(cdx['url'])
dt = timestamp_to_datetime(cdx['timestamp'])
if not cdx.get('is_live'):
req_headers['Accept-Datetime'] = datetime_to_http_date(dt)
upstream_res = remote_request(load_url,
method = input_req.get_req_method()
data = input_req.get_req_body()
upstream_res = remote_request(url=load_url,
method=method,
recorder=recorder,
stream=True,
headers=req_headers)
allow_redirects=False,
headers=req_headers,
data=data,
timeout=params.get('_timeout'))
resp_headers = recorder.get_header()
@ -109,6 +143,7 @@ class LiveWebLoader(object):
#response.headers['WARC-Record-ID'] = self._make_warc_id()
response.headers['WARC-Target-URI'] = cdx['url']
response.headers['WARC-Date'] = self._make_date(dt)
response.headers['WARC-Coll'] = cdx.get('source', '')
# Try to set content-length, if it is available and valid
try:
@ -131,3 +166,110 @@ class LiveWebLoader(object):
id_ = uuid.uuid1()
return '<urn:uuid:{0}>'.format(id_)
#=============================================================================
def to_cdxj(cdx_iter, fields):
response.headers['Content-Type'] = 'text/x-cdxj'
return [cdx.to_cdxj(fields) for cdx in cdx_iter]
def to_json(cdx_iter, fields):
response.headers['Content-Type'] = 'application/x-ndjson'
return [cdx.to_json(fields) for cdx in cdx_iter]
def to_text(cdx_iter, fields):
response.headers['Content-Type'] = 'text/plain'
return [cdx.to_text(fields) for cdx in cdx_iter]
def to_link(cdx_iter, fields):
response.headers['Content-Type'] = 'application/link'
return MementoUtils.make_timemap(cdx_iter)
#=============================================================================
class IndexLoader(object):
OUTPUTS = {
'cdxj': to_cdxj,
'json': to_json,
'text': to_text,
'link': to_link,
}
DEF_OUTPUT = 'cdxj'
def __init__(self, index_source):
self.index_source = index_source
def __call__(self, params):
cdx_iter = self.index_source(params)
output = params.get('output', self.DEF_OUTPUT)
fields = params.get('fields')
handler = self.OUTPUTS.get(output)
if not handler:
handler = self.OUTPUTS[self.DEF_OUTPUT]
res = handler(cdx_iter, fields)
return res
#=============================================================================
class ResourceLoader(IndexLoader):
def __init__(self, index_source, resource_loaders):
super(ResourceLoader, self).__init__(index_source)
self.resource_loaders = resource_loaders
def __call__(self, params):
output = params.get('output')
if output != 'resource':
return super(ResourceLoader, self).__call__(params)
cdx_iter = self.index_source(params)
any_found = False
for cdx in cdx_iter:
any_found = True
cdx['coll'] = params.get('coll', '')
for loader in self.resource_loaders:
try:
resp = loader(cdx, params)
if resp:
return resp
except ArchiveLoadFailed as e:
print(e)
pass
if any_found:
raise ArchiveLoadFailed('Resource Found, could not be Loaded')
else:
raise ArchiveLoadFailed('No Resource Found')
#=============================================================================
class DefaultResourceLoader(ResourceLoader):
def __init__(self, index_source, warc_paths=''):
loaders = [WARCPathLoader(warc_paths, index_source),
LiveWebLoader()
]
super(DefaultResourceLoader, self).__init__(index_source, loaders)
#=============================================================================
class LoaderSeq(object):
def __init__(self, loaders):
self.loaders = loaders
def __call__(self, params):
for loader in self.loaders:
try:
res = loader(params)
if res:
return res
except ArchiveLoadFailed:
pass
raise ArchiveLoadFailed('No Resource Found')

View File

@ -15,7 +15,7 @@ sources = {
source = AggIndexSource(sources, timeout=5.0)
def select_json(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source_name']):
def select_json(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source']):
return list([json.loads(cdx.to_json(fields)) for cdx in cdxlist])
@ -24,11 +24,11 @@ def test_agg_index_1():
res = source(dict(url=url, closest='20140126000000', limit=5))
exp = [{"timestamp": "20140126093743", "load_url": "http://web.archive.org/web/20140126093743id_/http://iana.org/", "source_name": "ia"},
{"timestamp": "20140126200624", "filename": "iana.warc.gz", "source_name": "local"},
{"timestamp": "20140123034755", "load_url": "http://web.archive.org/web/20140123034755id_/http://iana.org/", "source_name": "ia"},
{"timestamp": "20140129175203", "load_url": "http://web.archive.org/web/20140129175203id_/http://iana.org/", "source_name": "ia"},
{"timestamp": "20140107040552", "load_url": "http://wayback.archive-it.org/all/20140107040552id_/http://iana.org/", "source_name": "ait"}
exp = [{"timestamp": "20140126093743", "load_url": "http://web.archive.org/web/20140126093743id_/http://iana.org/", "source": "ia"},
{"timestamp": "20140126200624", "filename": "iana.warc.gz", "source": "local"},
{"timestamp": "20140123034755", "load_url": "http://web.archive.org/web/20140123034755id_/http://iana.org/", "source": "ia"},
{"timestamp": "20140129175203", "load_url": "http://web.archive.org/web/20140129175203id_/http://iana.org/", "source": "ia"},
{"timestamp": "20140107040552", "load_url": "http://wayback.archive-it.org/all/20140107040552id_/http://iana.org/", "source": "ait"}
]
assert(select_json(res) == exp)
@ -38,12 +38,12 @@ def test_agg_index_2():
url = 'http://example.com/'
res = source(dict(url=url, closest='20100512', limit=6))
exp = [{"timestamp": "20100513010014", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100513010014id_/http://example.com/", "source_name": "bl"},
{"timestamp": "20100512204410", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100512204410id_/http://example.com/", "source_name": "bl"},
{"timestamp": "20100513052358", "load_url": "http://web.archive.org/web/20100513052358id_/http://example.com/", "source_name": "ia"},
{"timestamp": "20100511201151", "load_url": "http://wayback.archive-it.org/all/20100511201151id_/http://example.com/", "source_name": "ait"},
{"timestamp": "20100514231857", "load_url": "http://wayback.archive-it.org/all/20100514231857id_/http://example.com/", "source_name": "ait"},
{"timestamp": "20100514231857", "load_url": "http://web.archive.org/web/20100514231857id_/http://example.com/", "source_name": "ia"}]
exp = [{"timestamp": "20100513010014", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100513010014id_/http://example.com/", "source": "bl"},
{"timestamp": "20100512204410", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100512204410id_/http://example.com/", "source": "bl"},
{"timestamp": "20100513052358", "load_url": "http://web.archive.org/web/20100513052358id_/http://example.com/", "source": "ia"},
{"timestamp": "20100511201151", "load_url": "http://wayback.archive-it.org/all/20100511201151id_/http://example.com/", "source": "ait"},
{"timestamp": "20100514231857", "load_url": "http://wayback.archive-it.org/all/20100514231857id_/http://example.com/", "source": "ait"},
{"timestamp": "20100514231857", "load_url": "http://web.archive.org/web/20100514231857id_/http://example.com/", "source": "ia"}]
assert(select_json(res) == exp)
@ -52,11 +52,22 @@ def test_agg_index_3():
url = 'http://vvork.com/'
res = source(dict(url=url, closest='20141001', limit=5))
exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source_name": "rhiz"},
{"timestamp": "20141018133107", "load_url": "http://web.archive.org/web/20141018133107id_/http://vvork.com/", "source_name": "ia"},
{"timestamp": "20141020161243", "load_url": "http://web.archive.org/web/20141020161243id_/http://vvork.com/", "source_name": "ia"},
{"timestamp": "20140806161228", "load_url": "http://web.archive.org/web/20140806161228id_/http://vvork.com/", "source_name": "ia"},
{"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source_name": "ait"}]
exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source": "rhiz"},
{"timestamp": "20141018133107", "load_url": "http://web.archive.org/web/20141018133107id_/http://vvork.com/", "source": "ia"},
{"timestamp": "20141020161243", "load_url": "http://web.archive.org/web/20141020161243id_/http://vvork.com/", "source": "ia"},
{"timestamp": "20140806161228", "load_url": "http://web.archive.org/web/20140806161228id_/http://vvork.com/", "source": "ia"},
{"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source": "ait"}]
assert(select_json(res) == exp)
def test_agg_index_4():
url = 'http://vvork.com/'
res = source(dict(url=url, closest='20141001', limit=2, sources='rhiz,ait'))
exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source": "rhiz"},
{"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source": "ait"}]
assert(select_json(res) == exp)

View File

@ -1,4 +1,8 @@
import re
import six
from pywb.utils.timeutils import timestamp_to_http_date
LINK_SPLIT = re.compile(',\s*(?=[<])')
LINK_SEG_SPLIT = re.compile(';\s*')
@ -50,3 +54,42 @@ class MementoUtils(object):
results['mementos'] = mementos
return results
@staticmethod
def make_timemap_memento_link(cdx, datetime=None, rel='memento', end=',\n'):
url = cdx.get('load_url')
if not url:
url = 'filename://' + cdx.get('filename')
memento = '<{0}>; rel="{1}"; datetime="{2}"; src="{3}"' + end
if not datetime:
datetime = timestamp_to_http_date(cdx['timestamp'])
return memento.format(url, rel, datetime, cdx.get('source', ''))
@staticmethod
def make_timemap(cdx_iter):
# get first memento as it'll be used for 'from' field
try:
first_cdx = six.next(cdx_iter)
from_date = timestamp_to_http_date(first_cdx['timestamp'])
except StopIteration:
first_cdx = None
# first memento link
yield MementoUtils.make_timemap_memento_link(first_cdx, datetime=from_date)
prev_cdx = None
for cdx in cdx_iter:
if prev_cdx:
yield MementoUtils.make_timemap_memento_link(prev_cdx)
prev_cdx = cdx
# last memento link, if any
if prev_cdx:
yield MementoUtils.make_timemap_memento_link(prev_cdx, end='')