1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

add aggregate index source and tests!

This commit is contained in:
Ilya Kreymer 2016-02-22 13:30:12 -08:00
parent 37198767ed
commit 1a0b2fba17
7 changed files with 388 additions and 170 deletions

141
aggindexsource.py Normal file
View File

@ -0,0 +1,141 @@
from gevent.pool import Pool
import gevent
import json
import time
from heapq import merge
from collections import deque
from indexsource import BaseIndexSource
from pywb.utils.wbexception import NotFoundException
#=============================================================================
class BaseAggIndexSource(BaseIndexSource):
def __init__(self, sources):
self.sources = sources
def do_query(self, name, source, params):
try:
cdx_iter = source.load_index(params)
except NotFoundException as nf:
print('Not found in ' + name)
cdx_iter = iter([])
def add_name(cdx_iter):
for cdx in cdx_iter:
cdx['source_name'] = name
yield cdx
return add_name(cdx_iter)
def load_index(self, params):
iter_list = self._load_all(params)
cdx_iter = merge(*(iter_list))
return cdx_iter
#=============================================================================
class TimingOutMixin(object):
def __init__(self, *args, **kwargs):
super(TimingOutMixin, self).__init__(*args, **kwargs)
self.t_count = kwargs.get('t_count', 3)
self.t_dura = kwargs.get('t_duration', 20)
self.timeouts = {}
def is_timed_out(self, name):
timeout_deq = self.timeouts.get(name)
if not timeout_deq:
return False
the_time = time.time()
for t in list(timeout_deq):
if (the_time - t) > self.t_dura:
timeout_deq.popleft()
if len(timeout_deq) >= self.t_count:
print('Skipping {0}, {1} timeouts in {2} seconds'.
format(name, self.t_count, self.t_dura))
return True
return False
def get_valid_sources(self, sources):
for name in sources.keys():
if not self.is_timed_out(name):
yield name
def track_source_error(self, name):
the_time = time.time()
if name not in self.timeouts:
self.timeouts[name] = deque()
self.timeouts[name].append(the_time)
print(name + ' timed out!')
#=============================================================================
class GeventAggIndexSource(BaseAggIndexSource):
def __init__(self, sources, timeout=5.0, size=None):
super(GeventAggIndexSource, self).__init__(sources)
self.pool = Pool(size=size)
self.timeout = timeout
def get_valid_sources(self, sources):
return sources.keys()
def track_source_error(self, name):
pass
def _load_all(self, params):
def do_spawn(n):
return self.pool.spawn(self.do_query, n, self.sources[n], params)
jobs = [do_spawn(src) for src in self.get_valid_sources(self.sources)]
gevent.joinall(jobs, timeout=self.timeout)
res = []
for name, job in zip(self.sources.keys(), jobs):
if job.value:
res.append(job.value)
else:
self.track_source_error(name)
return res
#=============================================================================
class AggIndexSource(TimingOutMixin, GeventAggIndexSource):
pass
#=============================================================================
class SimpleAggIndexSource(BaseAggIndexSource):
def _load_all(self, params):
return list(map(lambda n: self.do_query(n, self.sources[n], params),
self.sources))
#=============================================================================
class ResourceLoadAgg(object):
def __init__(self, load_index, load_resource):
self.load_index = load_index
self.load_resource = load_resource
def __call__(self, params):
cdx_iter = self.load_index(params)
for cdx in cdx_iter:
for loader in self.load_resource:
try:
resp = loader(cdx)
if resp:
return resp
except Exception:
pass
raise Exception('Not Found')

View File

@ -3,10 +3,12 @@ import redis
from pywb.utils.binsearch import iter_range
from pywb.utils.timeutils import timestamp_to_http_date, http_date_to_timestamp
from pywb.utils.timeutils import timestamp_to_sec, timestamp_now
from pywb.utils.canonicalize import calc_search_range
from pywb.utils.canonicalize import canonicalize, calc_search_range
from pywb.utils.wbexception import NotFoundException
from pywb.cdx.cdxobject import CDXObject
from pywb.cdx.cdxops import cdx_sort_closest, cdx_limit
from pywb.cdx.query import CDXQuery
from pywb.cdx.cdxops import process_cdx
import requests
@ -21,6 +23,17 @@ class BaseIndexSource(object):
def get_index(self, params):
return self.index_template.format(params.get('coll'))
def __call__(self, params):
query = CDXQuery(**params)
try:
cdx_iter = self.load_index(query.params)
except NotFoundException as nf:
cdx_iter = iter([])
cdx_iter = process_cdx(cdx_iter, query)
return cdx_iter
#=============================================================================
class FileIndexSource(BaseIndexSource):
@ -28,7 +41,7 @@ class FileIndexSource(BaseIndexSource):
filename = self.get_index(params)
with open(filename, 'rb') as fh:
gen = iter_range(fh, params['start_key'], params['end_key'])
gen = iter_range(fh, params['key'], params['end_key'])
for line in gen:
yield CDXObject(line)
@ -43,21 +56,28 @@ class RemoteIndexSource(BaseIndexSource):
url = self.get_index(params)
url += '?url=' + params['url']
r = requests.get(url)
if r.status_code >= 400:
raise NotFoundException(url)
lines = r.content.strip().split(b'\n')
for line in lines:
cdx = CDXObject(line)
cdx['load_url'] = self.replay_url.format(timestamp=cdx['timestamp'], url=cdx['url'])
yield cdx
def do_load(lines):
for line in lines:
cdx = CDXObject(line)
cdx['load_url'] = self.replay_url.format(timestamp=cdx['timestamp'], url=cdx['url'])
yield cdx
return do_load(lines)
#=============================================================================
class LiveIndexSource(BaseIndexSource):
def load_index(self, params):
cdx = CDXObject()
cdx['urlkey'] = params.get('start_key').decode('utf-8')
cdx['urlkey'] = params.get('key').decode('utf-8')
cdx['timestamp'] = timestamp_now()
cdx['url'] = params['url']
cdx['load_url'] = params['url']
cdx['is_live'] = True
def live():
yield cdx
@ -80,7 +100,7 @@ class RedisIndexSource(BaseIndexSource):
def load_index(self, params):
z_key = self.get_index(params)
index_list = self.redis.zrangebylex(z_key,
b'[' + params['start_key'],
b'[' + params['key'],
b'(' + params['end_key'])
for line in index_list:
@ -94,66 +114,84 @@ class MementoIndexSource(BaseIndexSource):
self.timemap_url = timemap_url
self.replay_url = replay_url
def make_iter(self, links, def_name):
original, link_iter = MementoUtils.links_to_json(links, def_name)
def links_to_cdxobject(self, link_header, def_name, sort=False):
results = MementoUtils.parse_links(link_header, def_name)
for cdx in link_iter():
cdx['load_url'] = self.replay_url.format(timestamp=cdx['timestamp'], url=original)
#meta = MementoUtils.meta_field('timegate', results)
#if meta:
# yield meta
#meta = MementoUtils.meta_field('timemap', results)
#if meta:
# yield meta
#meta = MementoUtils.meta_field('original', results)
#if meta:
# yield meta
original = results['original']['url']
key = canonicalize(original)
mementos = results['mementos']
if sort:
mementos = sorted(mementos)
for val in mementos:
dt = val.get('datetime')
if not dt:
continue
ts = http_date_to_timestamp(dt)
cdx = CDXObject()
cdx['urlkey'] = key
cdx['timestamp'] = ts
cdx['url'] = original
cdx['mem_rel'] = val.get('rel', '')
cdx['memento_url'] = val['url']
load_url = self.replay_url.format(timestamp=cdx['timestamp'],
url=original)
cdx['load_url'] = load_url
yield cdx
def load_timegate(self, params, closest):
def get_timegate_links(self, params, closest):
url = self.timegate_url.format(coll=params.get('coll')) + params['url']
accept_dt = timestamp_to_http_date(closest)
res = requests.head(url, headers={'Accept-Datetime': accept_dt})
return self.make_iter(res.headers.get('Link'), 'timegate')
if res.status_code >= 400:
raise NotFoundException(url)
def load_timemap(self, params):
return res.headers.get('Link')
def get_timemap_links(self, params):
url = self.timemap_url + params['url']
r = requests.get(url)
return self.make_iter(r.text, 'timemap')
res = requests.get(url)
if res.status_code >= 400:
raise NotFoundException(url)
return res.text
def load_index(self, params):
closest = params.get('closest')
if not closest:
return self.load_timemap(params)
links = self.get_timemap_links(params)
def_name = 'timemap'
else:
return self.load_timegate(params, closest)
links = self.get_timegate_links(params, closest)
def_name = 'timegate'
#if not links:
# return iter([])
return self.links_to_cdxobject(links, def_name)
@staticmethod
def from_timegate_url(timegate_url, type_='link'):
def from_timegate_url(timegate_url, path='link'):
return MementoIndexSource(timegate_url,
timegate_url + 'timemap/' + type_ + '/',
timegate_url + 'timemap/' + path + '/',
timegate_url + '{timestamp}id_/{url}')
def query_index(source, params):
url = params.get('url', '')
if not params.get('matchType'):
if url.startswith('*.'):
params['url'] = url[2:]
params['matchType'] = 'domain'
elif url.endswith('*'):
params['url'] = url[:-1]
params['matchType'] = 'prefix'
else:
params['matchType'] = 'exact'
start, end = calc_search_range(url=params['url'],
match_type=params['matchType'])
params['start_key'] = start.encode('utf-8')
params['end_key'] = end.encode('utf-8')
res = source.load_index(params)
limit = int(params.get('limit', 10))
closest = params.get('closest')
if closest:
res = cdx_sort_closest(closest, res, limit)
elif limit:
res = cdx_limit(res, limit)
return res

View File

@ -157,6 +157,7 @@ class BaseRecorder(object):
def finish_response(self, incomplete=False):
pass
#=================================================================
class ReadFullyStream(object):
def __init__(self, stream):

View File

@ -2,7 +2,8 @@ from liverec import BaseRecorder
from liverec import request as remote_request
from pywb.warc.recordloader import ArcWarcRecordLoader, ArchiveLoadFailed
from pywb.utils.timeutils import timestamp_to_datetime
from pywb.utils.timeutils import timestamp_to_datetime, datetime_to_http_date
from pywb.warc.resolvingloader import ResolvingLoader
from io import BytesIO
from bottle import response
@ -25,22 +26,26 @@ def incr_reader(stream, header=None, size=8192):
#=============================================================================
class WARCPathPrefixLoader(object):
def __init__(self, prefix):
def __init__(self, prefix, cdx_loader):
self.prefix = prefix
self.record_loader = ArcWarcRecordLoader()
def add_prefix(filename, cdx):
return [self.prefix + filename]
self.resolve_loader = ResolvingLoader([add_prefix], no_record_parse=True)
self.cdx_loader = cdx_loader
def __call__(self, cdx):
filename = cdx.get('filename')
offset = cdx.get('offset')
length = cdx.get('length', -1)
if not cdx.get('filename') or cdx.get('offset') is None:
return None
if filename is None or offset is None:
raise Exception
failed_files = []
headers, payload = self.resolve_loader.load_headers_and_payload(cdx, failed_files, self.cdx_loader)
record = self.record_loader.load(self.prefix + filename,
offset,
length,
no_record_parse=True)
if headers != payload:
headers.stream.close()
record = payload
for n, v in record.rec_headers.headers:
response.headers[n] = v
@ -75,40 +80,50 @@ class LiveWebLoader(object):
SKIP_HEADERS = (b'link',
b'memento-datetime',
b'content-location',
b'x-archive',
b'set-cookie')
b'x-archive')
def __call__(self, cdx):
load_url = cdx.get('load_url')
if not load_url:
raise Exception
return None
recorder = HeaderRecorder(self.SKIP_HEADERS)
upstream_res = remote_request(load_url, recorder=recorder, stream=True,
headers={'Accept-Encoding': 'identity'})
req_headers = {}
dt = timestamp_to_datetime(cdx['timestamp'])
if not cdx.get('is_live'):
req_headers['Accept-Datetime'] = datetime_to_http_date(dt)
upstream_res = remote_request(load_url,
recorder=recorder,
stream=True,
headers=req_headers)
resp_headers = recorder.get_header()
response.headers['Content-Type'] = 'application/http; msgtype=response'
response.headers['WARC-Type'] = 'response'
response.headers['WARC-Record-ID'] = self._make_warc_id()
#response.headers['WARC-Type'] = 'response'
#response.headers['WARC-Record-ID'] = self._make_warc_id()
response.headers['WARC-Target-URI'] = cdx['url']
response.headers['WARC-Date'] = self._make_date(cdx['timestamp'])
response.headers['WARC-Date'] = self._make_date(dt)
# Try to set content-length, if it is available and valid
try:
content_len = int(upstream_res.headers.get('content-length', 0))
if content_len > 0:
content_len += len(recorder.get_header())
content_len += len(resp_headers)
response.headers['Content-Length'] = content_len
except:
pass
raise
return incr_reader(upstream_res.raw, header=recorder.get_header())
return incr_reader(upstream_res.raw, header=resp_headers)
@staticmethod
def _make_date(ts):
return timestamp_to_datetime(ts).strftime('%Y-%m-%dT%H:%M:%SZ')
def _make_date(dt):
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
@staticmethod
def _make_warc_id(id_=None):

62
test_aggindexsource.py Normal file
View File

@ -0,0 +1,62 @@
from gevent import monkey; monkey.patch_all()
from aggindexsource import AggIndexSource
from indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource
import json
sources = {
'local': FileIndexSource('sample.cdxj'),
'ia': MementoIndexSource.from_timegate_url('http://web.archive.org/web/'),
'ait': MementoIndexSource.from_timegate_url('http://wayback.archive-it.org/all/'),
'bl': MementoIndexSource.from_timegate_url('http://www.webarchive.org.uk/wayback/archive/'),
'rhiz': MementoIndexSource.from_timegate_url('http://webenact.rhizome.org/vvork/', path='*')
}
source = AggIndexSource(sources, timeout=5.0)
def select_json(cdxlist, fields=['timestamp', 'load_url', 'filename', 'source_name']):
return list([json.loads(cdx.to_json(fields)) for cdx in cdxlist])
def test_agg_index_1():
url = 'http://iana.org/'
res = source(dict(url=url, closest='20140126000000', limit=5))
exp = [{"timestamp": "20140126093743", "load_url": "http://web.archive.org/web/20140126093743id_/http://iana.org/", "source_name": "ia"},
{"timestamp": "20140126200624", "filename": "iana.warc.gz", "source_name": "local"},
{"timestamp": "20140123034755", "load_url": "http://web.archive.org/web/20140123034755id_/http://iana.org/", "source_name": "ia"},
{"timestamp": "20140129175203", "load_url": "http://web.archive.org/web/20140129175203id_/http://iana.org/", "source_name": "ia"},
{"timestamp": "20140107040552", "load_url": "http://wayback.archive-it.org/all/20140107040552id_/http://iana.org/", "source_name": "ait"}
]
assert(select_json(res) == exp)
def test_agg_index_2():
url = 'http://example.com/'
res = source(dict(url=url, closest='20100512', limit=6))
exp = [{"timestamp": "20100513010014", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100513010014id_/http://example.com/", "source_name": "bl"},
{"timestamp": "20100512204410", "load_url": "http://www.webarchive.org.uk/wayback/archive/20100512204410id_/http://example.com/", "source_name": "bl"},
{"timestamp": "20100513052358", "load_url": "http://web.archive.org/web/20100513052358id_/http://example.com/", "source_name": "ia"},
{"timestamp": "20100511201151", "load_url": "http://wayback.archive-it.org/all/20100511201151id_/http://example.com/", "source_name": "ait"},
{"timestamp": "20100514231857", "load_url": "http://wayback.archive-it.org/all/20100514231857id_/http://example.com/", "source_name": "ait"},
{"timestamp": "20100514231857", "load_url": "http://web.archive.org/web/20100514231857id_/http://example.com/", "source_name": "ia"}]
assert(select_json(res) == exp)
def test_agg_index_3():
url = 'http://vvork.com/'
res = source(dict(url=url, closest='20141001', limit=5))
exp = [{"timestamp": "20141006184357", "load_url": "http://webenact.rhizome.org/vvork/20141006184357id_/http://www.vvork.com/", "source_name": "rhiz"},
{"timestamp": "20141018133107", "load_url": "http://web.archive.org/web/20141018133107id_/http://vvork.com/", "source_name": "ia"},
{"timestamp": "20141020161243", "load_url": "http://web.archive.org/web/20141020161243id_/http://vvork.com/", "source_name": "ia"},
{"timestamp": "20140806161228", "load_url": "http://web.archive.org/web/20140806161228id_/http://vvork.com/", "source_name": "ia"},
{"timestamp": "20131004231540", "load_url": "http://wayback.archive-it.org/all/20131004231540id_/http://vvork.com/", "source_name": "ait"}]
assert(select_json(res) == exp)

View File

@ -1,6 +1,5 @@
from indexloader import FileIndexSource, RemoteIndexSource, MementoIndexSource, RedisIndexSource
from indexloader import LiveIndexSource
from indexloader import query_index
from indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource, RedisIndexSource
from indexsource import LiveIndexSource
from pywb.utils.timeutils import timestamp_now
@ -42,11 +41,10 @@ remote_sources = [
# Url Match -- Local Loaders
# ============================================================================
@pytest.mark.parametrize("source1", local_sources, ids=["file", "redis"])
def test_local_cdxj_loader(source1):
@pytest.mark.parametrize("source", local_sources, ids=["file", "redis"])
def test_local_cdxj_loader(source):
url = 'http://www.iana.org/_css/2013.1/fonts/Inconsolata.otf'
res = query_index(source1, dict(url=url,
limit=3))
res = source(dict(url=url, limit=3))
expected = """\
org,iana)/_css/2013.1/fonts/inconsolata.otf 20140126200826 iana.warc.gz
@ -58,12 +56,12 @@ org,iana)/_css/2013.1/fonts/inconsolata.otf 20140126200930 iana.warc.gz"""
# Closest -- Local Loaders
# ============================================================================
@pytest.mark.parametrize("source1", local_sources, ids=["file", "redis"])
def test_local_closest_loader(source1):
@pytest.mark.parametrize("source", local_sources, ids=["file", "redis"])
def test_local_closest_loader(source):
url = 'http://www.iana.org/_css/2013.1/fonts/Inconsolata.otf'
res = query_index(source1, dict(url=url,
closest='20140126200930',
limit=3))
res = source(dict(url=url,
closest='20140126200930',
limit=3))
expected = """\
org,iana)/_css/2013.1/fonts/inconsolata.otf 20140126200930 iana.warc.gz
@ -75,9 +73,9 @@ org,iana)/_css/2013.1/fonts/inconsolata.otf 20140126200826 iana.warc.gz"""
# Prefix -- Local Loaders
# ============================================================================
@pytest.mark.parametrize("source1", local_sources, ids=["file", "redis"])
def test_file_prefix_loader(source1):
res = query_index(source1, dict(url='http://iana.org/domains/root/*'))
@pytest.mark.parametrize("source", local_sources, ids=["file", "redis"])
def test_file_prefix_loader(source):
res = source(dict(url='http://iana.org/domains/root/*'))
expected = """\
org,iana)/domains/root/db 20140126200927 iana.warc.gz
@ -89,10 +87,10 @@ org,iana)/domains/root/servers 20140126201227 iana.warc.gz"""
# Url Match -- Remote Loaders
# ============================================================================
@pytest.mark.parametrize("source2", remote_sources, ids=["remote_cdx", "memento"])
def test_remote_loader(source2):
@pytest.mark.parametrize("source", remote_sources, ids=["remote_cdx", "memento"])
def test_remote_loader(source):
url = 'http://instagram.com/amaliaulman'
res = query_index(source2, dict(url=url))
res = source(dict(url=url))
expected = """\
com,instagram)/amaliaulman 20141014150552 http://webenact.rhizome.org/all/20141014150552id_/http://instagram.com/amaliaulman
@ -105,10 +103,10 @@ com,instagram)/amaliaulman 20141014171636 http://webenact.rhizome.org/all/201410
# Url Match -- Remote Loaders
# ============================================================================
@pytest.mark.parametrize("source2", remote_sources, ids=["remote_cdx", "memento"])
def test_remote_closest_loader(source2):
@pytest.mark.parametrize("source", remote_sources, ids=["remote_cdx", "memento"])
def test_remote_closest_loader(source):
url = 'http://instagram.com/amaliaulman'
res = query_index(source2, dict(url=url, closest='20141014162332', limit=1))
res = source(dict(url=url, closest='20141014162332', limit=1))
expected = """\
com,instagram)/amaliaulman 20141014162333 http://webenact.rhizome.org/all/20141014162333id_/http://instagram.com/amaliaulman"""
@ -116,12 +114,24 @@ com,instagram)/amaliaulman 20141014162333 http://webenact.rhizome.org/all/201410
assert(key_ts_res(res, 'load_url') == expected)
# Url Match -- Memento
# ============================================================================
@pytest.mark.parametrize("source", remote_sources, ids=["remote_cdx", "memento"])
def test_remote_closest_loader(source):
url = 'http://instagram.com/amaliaulman'
res = source(dict(url=url, closest='20141014162332', limit=1))
expected = """\
com,instagram)/amaliaulman 20141014162333 http://webenact.rhizome.org/all/20141014162333id_/http://instagram.com/amaliaulman"""
assert(key_ts_res(res, 'load_url') == expected)
# Live Index -- No Load!
# ============================================================================
def test_live():
url = 'http://example.com/'
source = LiveIndexSource()
res = query_index(source, dict(url=url))
res = source(dict(url=url))
expected = 'com,example)/ {0} http://example.com/'.format(timestamp_now())
@ -130,5 +140,26 @@ def test_live():
# Errors -- Not Found All
# ============================================================================
@pytest.mark.parametrize("source", local_sources + remote_sources, ids=["file", "redis", "remote_cdx", "memento"])
def test_all_not_found(source):
url = 'http://x-not-found-x.notfound/'
res = source(dict(url=url, limit=3))
expected = ''
assert(key_ts_res(res) == expected)
# ============================================================================
def test_another_remote_not_found():
source = MementoIndexSource.from_timegate_url('http://www.webarchive.org.uk/wayback/archive/')
url = 'http://x-not-found-x.notfound/'
res = source(dict(url=url, limit=3))
expected = ''
assert(key_ts_res(res) == expected)

View File

@ -1,8 +1,4 @@
import re, json
from pywb.utils.canonicalize import canonicalize
from pywb.utils.timeutils import timestamp_to_sec, http_date_to_timestamp
from pywb.cdx.cdxobject import CDXObject
import re
LINK_SPLIT = re.compile(',\s*(?=[<])')
LINK_SEG_SPLIT = re.compile(';\s*')
@ -54,69 +50,3 @@ class MementoUtils(object):
results['mementos'] = mementos
return results
@staticmethod
def links_to_json(link_header, def_name='timemap', sort=False):
results = MementoUtils.parse_links(link_header, def_name)
#meta = MementoUtils.meta_field('timegate', results)
#if meta:
# yield meta
#meta = MementoUtils.meta_field('timemap', results)
#if meta:
# yield meta
#meta = MementoUtils.meta_field('original', results)
#if meta:
# yield meta
original = results['original']['url']
key = canonicalize(original)
mementos = results['mementos']
if sort:
mementos = sorted(mementos)
def link_iter():
for val in mementos:
dt = val.get('datetime')
if not dt:
continue
ts = http_date_to_timestamp(dt)
line = CDXObject()
line['urlkey'] = key
line['timestamp'] = ts
line['url'] = original
line['mem_rel'] = val.get('rel', '')
line['memento_url'] = val['url']
yield line
return original, link_iter
@staticmethod
def meta_field(name, results):
v = results.get(name)
if v:
c = CDXObject()
c['key'] = '@' + name
c['url'] = v['url']
return c
#=================================================================
def cdx_sort_closest(closest, cdx_json):
closest_sec = timestamp_to_sec(closest)
def get_key(cdx):
sec = timestamp_to_sec(cdx['timestamp'])
return abs(closest_sec - sec)
cdx_sorted = sorted(cdx_json, key=get_key)
return cdx_sorted