1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

warcserver: aggregator + live url rewriter

- add support for conditional RulesAggregator
- add LiveRewriterIndexSource which can rewrite live web urls via regex
This commit is contained in:
Ilya Kreymer 2018-04-09 21:22:35 -07:00
parent b7bf693885
commit 0bab36b26e
7 changed files with 154 additions and 12 deletions

View File

@ -128,7 +128,7 @@ class ResourceHandler(IndexHandler):
return out_headers, resp, errs return out_headers, resp, errs
except (WbException, ArchiveLoadFailed) as e: except (WbException, ArchiveLoadFailed) as e:
last_exc = e last_exc = e
if logger.isEnabledFor(logging.DEBUG): if True or logger.isEnabledFor(logging.DEBUG):
traceback.print_exc() traceback.print_exc()
errs[str(loader)] = str(e) errs[str(loader)] = str(e)

View File

@ -4,6 +4,7 @@ import gevent
import json import json
import time import time
import os import os
import re
from warcio.timeutils import timestamp_now from warcio.timeutils import timestamp_now
@ -392,3 +393,33 @@ class BaseRedisMultiKeyIndexSource(BaseAggregator, RedisIndexSource):
class RedisMultiKeyIndexSource(SeqAggMixin, BaseRedisMultiKeyIndexSource): class RedisMultiKeyIndexSource(SeqAggMixin, BaseRedisMultiKeyIndexSource):
pass pass
#=============================================================================
class BaseRulesAggregator(BaseSourceListAggregator):
def __init__(self, sources, **kwargs):
super(BaseRulesAggregator, self).__init__(sources, **kwargs)
rules = kwargs.get('rules', [])
self.rules = []
for rule in rules:
match = rule['match']
name = rule['name']
self.rules.append((re.compile(match), name))
def get_all_sources(self, params):
url = params['url']
for rx, name in self.rules:
if rx.match(url):
source = self.sources.get(name)
if source:
return {name: source}
return []
#=============================================================================
class RulesAggregator(SeqAggMixin, BaseRulesAggregator):
pass

View File

@ -196,8 +196,7 @@ class RemoteIndexSource(BaseIndexSource):
#============================================================================= #=============================================================================
class LiveIndexSource(BaseIndexSource): class LiveIndexSource(BaseIndexSource):
def __init__(self, proxy_url='{url}'): def __init__(self):
self.proxy_url = proxy_url
self._init_sesh(DefaultAdapters.live_adapter) self._init_sesh(DefaultAdapters.live_adapter)
def load_index(self, params): def load_index(self, params):
@ -209,7 +208,7 @@ class LiveIndexSource(BaseIndexSource):
cdx['urlkey'] = params.get('key').decode('utf-8') cdx['urlkey'] = params.get('key').decode('utf-8')
cdx['timestamp'] = timestamp_now() cdx['timestamp'] = timestamp_now()
cdx['url'] = params['url'] cdx['url'] = params['url']
cdx['load_url'] = res_template(self.proxy_url, params) cdx['load_url'] = self.get_load_url(params)
cdx['is_live'] = 'true' cdx['is_live'] = 'true'
mime = params.get('content_type', '') mime = params.get('content_type', '')
@ -231,6 +230,9 @@ class LiveIndexSource(BaseIndexSource):
return iter([cdx]) return iter([cdx])
def get_load_url(self, params):
return params['url']
def __repr__(self): def __repr__(self):
return '{0}()'.format(self.__class__.__name__) return '{0}()'.format(self.__class__.__name__)
@ -259,6 +261,26 @@ class LiveIndexSource(BaseIndexSource):
return cls() return cls()
#=============================================================================
class LiveRewriteIndexSource(LiveIndexSource):
def __init__(self, match, replace):
super(LiveRewriteIndexSource, self).__init__()
self.rx = re.compile(match)
self.replace = replace
def get_load_url(self, params):
res = self.rx.sub(self.replace, params['url'])
print(res)
return res
@classmethod
def init_from_config(cls, config):
if config['type'] != 'live_rw':
return
return cls(config['match'], config['replace'])
#============================================================================= #=============================================================================
class RedisIndexSource(BaseIndexSource): class RedisIndexSource(BaseIndexSource):
def __init__(self, redis_url=None, redis=None, key_template=None, **kwargs): def __init__(self, redis_url=None, redis=None, key_template=None, **kwargs):
@ -579,3 +601,4 @@ class WBMementoIndexSource(MementoIndexSource):
@classmethod @classmethod
def _init_id(cls): def _init_id(cls):
return 'wb-memento' return 'wb-memento'

View File

@ -1,7 +1,7 @@
from pywb.warcserver.index.indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource, RedisIndexSource from pywb.warcserver.index.indexsource import FileIndexSource, RemoteIndexSource, MementoIndexSource, RedisIndexSource
from pywb.warcserver.index.indexsource import LiveIndexSource, WBMementoIndexSource from pywb.warcserver.index.indexsource import LiveIndexSource, WBMementoIndexSource, LiveRewriteIndexSource
from pywb.warcserver.index.aggregator import SimpleAggregator from pywb.warcserver.index.aggregator import SimpleAggregator, RulesAggregator
from warcio.timeutils import timestamp_now from warcio.timeutils import timestamp_now
@ -144,6 +144,51 @@ com,instagram)/amaliaulman 20141014162333 http://webenact.rhizome.org/all/201410
assert(key_ts_res(res, 'load_url') == expected) assert(key_ts_res(res, 'load_url') == expected)
assert(errs == {}) assert(errs == {})
def test_live_rewrite(self):
url = 'http://example.com/some/path?A=B'
source = LiveRewriteIndexSource(match='https?:\/\/example.com\/(.*)',
replace=r'https://other-host.example.org/\1')
res, errs = self.query_single_source(source, dict(url=url))
assert list(res)[0]['load_url'] == 'https://other-host.example.org/some/path?A=B'
assert(errs == {})
def test_cond_aggregator_rewrite(self):
match_url = 'https?:\/\/example.com\/(.*)'
live_rw = LiveRewriteIndexSource(match=match_url,
replace=r'https://other-host.example.org/\1')
rules = [dict(match=match_url, name='live_rw'),
dict(match='.*', name='live')
]
rules_agg = RulesAggregator(sources={'live': LiveIndexSource(),
'live_rw': live_rw},
rules=rules)
# Rewrite Matching url
res, errs = rules_agg({'url': 'http://example.com/some/path?A=B'})
cdx = list(res)[0]
# assert rewriting source used
assert cdx['load_url'] == 'https://other-host.example.org/some/path?A=B'
assert cdx['source'] == 'live_rw'
assert(errs == {})
# Don't rewrite other urls
res, errs = rules_agg({'url': 'http://example.net/some/path?A=B'})
cdx = list(res)[0]
# assert live source used
assert cdx['load_url'] == 'http://example.net/some/path?A=B'
assert cdx['source'] == 'live'
assert(errs == {})
# Errors -- Not Found All # Errors -- Not Found All
def test_all_not_found(self, all_source): def test_all_not_found(self, all_source):
url = 'http://x-not-found-x.notfound/' url = 'http://x-not-found-x.notfound/'

View File

@ -5,6 +5,7 @@ import os
from pywb.warcserver.index.indexsource import RemoteIndexSource, LiveIndexSource, MementoIndexSource from pywb.warcserver.index.indexsource import RemoteIndexSource, LiveIndexSource, MementoIndexSource
from pywb.warcserver.index.indexsource import WBMementoIndexSource, FileIndexSource from pywb.warcserver.index.indexsource import WBMementoIndexSource, FileIndexSource
from pywb.warcserver.index.aggregator import BaseSourceListAggregator, DirectoryIndexSource from pywb.warcserver.index.aggregator import BaseSourceListAggregator, DirectoryIndexSource
from pywb.warcserver.index.aggregator import RulesAggregator
from pywb.warcserver.handlers import ResourceHandler, HandlerSeq from pywb.warcserver.handlers import ResourceHandler, HandlerSeq
@ -50,7 +51,7 @@ class TestWarcServer(TempDirTests, BaseTestClass):
return handler.index_source.sources return handler.index_source.sources
def test_list_static(self): def test_list_static(self):
assert len(self.loader.list_fixed_routes()) == 13 assert len(self.loader.list_fixed_routes()) == 14
def test_list_dynamic(self): def test_list_dynamic(self):
assert set(self.loader.list_dynamic_routes()) == set(['auto1', 'auto2']) assert set(self.loader.list_dynamic_routes()) == set(['auto1', 'auto2'])
@ -125,3 +126,18 @@ class TestWarcServer(TempDirTests, BaseTestClass):
assert len(sources) == 1 assert len(sources) == 1
assert isinstance(sources['live'], LiveIndexSource) assert isinstance(sources['live'], LiveIndexSource)
def test_rules_agg(self):
handler = self.loader.fixed_routes.get('many_rules')
assert(handler)
assert isinstance(handler.index_source, RulesAggregator)
res = handler.index_source.get_all_sources({'url': 'http://example.com/path'})
assert len(res) == 1
assert list(res.keys()) == ['local']
res = handler.index_source.get_all_sources({'url': 'http://httpbin.org/'})
assert len(res) == 1
assert list(res.keys()) == ['liveweb']

View File

@ -1,3 +1,5 @@
debug: true
collections: collections:
# Live Index # Live Index
@ -45,6 +47,23 @@ collections:
timeout: 10 timeout: 10
# many sources, conditional rules
many_rules:
index_group:
local:
path: ./local/indexes
archive_paths: ./local/data
type: file
liveweb: live
rules:
- match: 'https?:\/\/example\.com\/.*'
name: local
- match: '.*'
name: liveweb
# Local Dir CDX # Local Dir CDX
local: local:
index: ./local/indexes index: ./local/indexes

View File

@ -3,13 +3,14 @@ from pywb.utils.loaders import load_yaml_config, load_overlay_config
from pywb.warcserver.basewarcserver import BaseWarcServer from pywb.warcserver.basewarcserver import BaseWarcServer
from pywb.warcserver.index.aggregator import CacheDirectoryIndexSource, RedisMultiKeyIndexSource from pywb.warcserver.index.aggregator import CacheDirectoryIndexSource, RedisMultiKeyIndexSource
from pywb.warcserver.index.aggregator import GeventTimeoutAggregator, SimpleAggregator from pywb.warcserver.index.aggregator import GeventTimeoutAggregator, SimpleAggregator, RulesAggregator
from pywb.warcserver.handlers import DefaultResourceHandler, HandlerSeq from pywb.warcserver.handlers import DefaultResourceHandler, HandlerSeq
from pywb.warcserver.index.indexsource import FileIndexSource, RemoteIndexSource from pywb.warcserver.index.indexsource import FileIndexSource, RemoteIndexSource
from pywb.warcserver.index.indexsource import MementoIndexSource, RedisIndexSource from pywb.warcserver.index.indexsource import MementoIndexSource, WBMementoIndexSource
from pywb.warcserver.index.indexsource import LiveIndexSource, WBMementoIndexSource from pywb.warcserver.index.indexsource import LiveIndexSource, LiveRewriteIndexSource
from pywb.warcserver.index.indexsource import RedisIndexSource
from pywb.warcserver.index.zipnum import ZipNumIndexSource from pywb.warcserver.index.zipnum import ZipNumIndexSource
from pywb import DEFAULT_CONFIG from pywb import DEFAULT_CONFIG
@ -27,6 +28,7 @@ SOURCE_LIST = [LiveIndexSource,
FileIndexSource, FileIndexSource,
RemoteIndexSource, RemoteIndexSource,
ZipNumIndexSource, ZipNumIndexSource,
LiveRewriteIndexSource,
] ]
@ -137,6 +139,7 @@ class WarcServer(BaseWarcServer):
handler = self.load_coll(name, coll_config) handler = self.load_coll(name, coll_config)
except: except:
print('Invalid Collection: ' + name) print('Invalid Collection: ' + name)
raise
if self.debug: if self.debug:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -178,7 +181,8 @@ class WarcServer(BaseWarcServer):
raise Exception('no index, index_group or sequence found') raise Exception('no index, index_group or sequence found')
timeout = int(coll_config.get('timeout', 0)) timeout = int(coll_config.get('timeout', 0))
agg = init_index_agg(index_group, True, timeout) rules = coll_config.get('rules')
agg = init_index_agg(index_group, True, timeout, rules=rules)
if not archive_paths: if not archive_paths:
archive_paths = self.config.get('archive_paths') archive_paths = self.config.get('archive_paths')
@ -232,11 +236,15 @@ def register_source(source_cls, end=False):
# ============================================================================ # ============================================================================
def init_index_agg(source_configs, use_gevent=False, timeout=0, source_list=None): def init_index_agg(source_configs, use_gevent=False, timeout=0, source_list=None,
rules=None):
sources = {} sources = {}
for n, v in iteritems(source_configs): for n, v in iteritems(source_configs):
sources[n] = init_index_source(v, source_list=source_list) sources[n] = init_index_source(v, source_list=source_list)
if rules:
return RulesAggregator(sources, rules=rules)
if use_gevent: if use_gevent:
return GeventTimeoutAggregator(sources, timeout=timeout) return GeventTimeoutAggregator(sources, timeout=timeout)
else: else: