1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

s3 and zipnum fixes: (#253)

* s3 and zipnum fixes:
- update s3 to use boto3
- ensure zipnum indexes (.idx, .summary) are picked up automatically via DirectoryAggregator
- ensure showNumPages query always return a json object, ignoring output=
- add tests for auto-configured zipnum indexes

* reqs: add boto3 dependency, init boto Config only if avail

* s3 loader: first try with credentials, then with no-cred config
archive paths: don't add anything if path is fully qualified (contains '://')

* s3 loader: on first load, if credentialed load fails, try uncredentialed
fix typo
tests: add zinum auto collection tests

* zipnum page count query: don't add 'source' field to page count query (if 'url' key not present in dict)

* s3 loader: fix no-range load, add test, update skip check to boto3

* fix spacing

* boto -> boto3 rename error message, cleanup comments
This commit is contained in:
Ilya Kreymer 2017-10-11 15:33:57 -07:00 committed by GitHub
parent 22ff4bd976
commit 54b265aaa8
13 changed files with 163 additions and 69 deletions

View File

@ -5,12 +5,6 @@ MAINTAINER Ilya Kreymer <ikreymer at gmail.com>
RUN mkdir /uwsgi RUN mkdir /uwsgi
COPY uwsgi.ini /uwsgi/ COPY uwsgi.ini /uwsgi/
#RUN pip install gevent==1.1.2 certauth youtube-dl boto uwsgi urllib3
#RUN pip install git+https://github.com/t0m/pyamf.git@python3
#RUN pip install webassets pyyaml brotlipy
#RUN pip install six chardet 'requests<2.12' redis jinja2 'surt>=0.3.0' webencodings portalocker
#RUN mkdir /pywb
WORKDIR /pywb WORKDIR /pywb
ADD requirements.txt . ADD requirements.txt .

View File

@ -17,7 +17,7 @@ install:
- "pip install coverage pytest-cov coveralls" - "pip install coverage pytest-cov coveralls"
- "pip install cffi" - "pip install cffi"
- "pip install pyopenssl" - "pip install pyopenssl"
- "pip install certauth boto youtube-dl" - "pip install certauth boto3 youtube-dl"
build_script: build_script:
- "python setup.py install" - "python setup.py install"

View File

@ -1,6 +1,6 @@
certauth certauth
youtube-dl youtube-dl
boto boto3
uwsgi uwsgi
git+https://github.com/t0m/pyamf.git@python3 git+https://github.com/t0m/pyamf.git@python3
git+https://github.com/esnme/ultrajson.git git+https://github.com/esnme/ultrajson.git

View File

@ -61,7 +61,7 @@ class FrontEndApp(object):
static_path = config.get('static_url_path', 'pywb/static/').replace('/', os.path.sep) static_path = config.get('static_url_path', 'pywb/static/').replace('/', os.path.sep)
self.static_handler = StaticHandler(static_path) self.static_handler = StaticHandler(static_path)
self.all_coll = config.get('all_coll', None) self.cdx_api_endpoint = config.get('cdx_api_endpoint', '/cdx')
self._init_routes() self._init_routes()
@ -90,9 +90,9 @@ class FrontEndApp(object):
coll_prefix = '/<coll>' coll_prefix = '/<coll>'
self.url_map.add(Rule('/', endpoint=self.serve_home)) self.url_map.add(Rule('/', endpoint=self.serve_home))
self.url_map.add(Rule(coll_prefix + self.cdx_api_endpoint, endpoint=self.serve_cdx))
self.url_map.add(Rule(coll_prefix + '/', endpoint=self.serve_coll_page)) self.url_map.add(Rule(coll_prefix + '/', endpoint=self.serve_coll_page))
self.url_map.add(Rule(coll_prefix + '/timemap/<timemap_output>/<path:url>', endpoint=self.serve_content)) self.url_map.add(Rule(coll_prefix + '/timemap/<timemap_output>/<path:url>', endpoint=self.serve_content))
self.url_map.add(Rule(coll_prefix + '/cdx', endpoint=self.serve_cdx))
if self.recorder_path: if self.recorder_path:
self.url_map.add(Rule(coll_prefix + self.RECORD_ROUTE + '/<path:url>', endpoint=self.serve_record)) self.url_map.add(Rule(coll_prefix + self.RECORD_ROUTE + '/<path:url>', endpoint=self.serve_record))
@ -197,7 +197,8 @@ class FrontEndApp(object):
content = view.render_to_string(environ, content = view.render_to_string(environ,
wb_prefix=wb_prefix, wb_prefix=wb_prefix,
metadata=metadata) metadata=metadata,
coll=coll)
return WbResponse.text_response(content, content_type='text/html; charset="utf-8"') return WbResponse.text_response(content, content_type='text/html; charset="utf-8"')

View File

@ -22,7 +22,9 @@ from io import open, BytesIO
from warcio.limitreader import LimitReader from warcio.limitreader import LimitReader
try: try:
from boto import connect_s3 import boto3
from botocore import UNSIGNED
from botocore.client import Config
s3_avail = True s3_avail = True
except ImportError: #pragma: no cover except ImportError: #pragma: no cover
s3_avail = False s3_avail = False
@ -325,14 +327,14 @@ class HttpLoader(BaseLoader):
#================================================================= #=================================================================
class S3Loader(BaseLoader): class S3Loader(BaseLoader):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.s3conn = None self.client = None
self.aws_access_key_id = kwargs.get('aws_access_key_id') self.aws_access_key_id = kwargs.get('aws_access_key_id')
self.aws_secret_access_key = kwargs.get('aws_secret_access_key') self.aws_secret_access_key = kwargs.get('aws_secret_access_key')
def load(self, url, offset, length): def load(self, url, offset, length):
if not s3_avail: #pragma: no cover if not s3_avail: #pragma: no cover
raise IOError('To load from s3 paths, ' + raise IOError('To load from s3 paths, ' +
'you must install boto: pip install boto') 'you must install boto3: pip install boto3')
aws_access_key_id = self.aws_access_key_id aws_access_key_id = self.aws_access_key_id
aws_secret_access_key = self.aws_secret_access_key aws_secret_access_key = self.aws_secret_access_key
@ -346,24 +348,45 @@ class S3Loader(BaseLoader):
else: else:
bucket_name = parts.netloc bucket_name = parts.netloc
if not self.s3conn: key = parts.path[1:]
try:
self.s3conn = connect_s3(aws_access_key_id, aws_secret_access_key)
except Exception: #pragma: no cover
self.s3conn = connect_s3(anon=True)
bucket = self.s3conn.get_bucket(bucket_name)
key = bucket.get_key(parts.path)
if offset == 0 and length == -1: if offset == 0 and length == -1:
headers = {} range_ = ''
else: else:
headers = {'Range': BlockLoader._make_range_header(offset, length)} range_ = BlockLoader._make_range_header(offset, length)
# Read range def s3_load(anon=False):
key.open_read(headers=headers) if not self.client:
return key if anon:
config = Config(signature_version=UNSIGNED)
else:
config = None
client = boto3.client('s3', aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
config=config)
else:
client = self.client
res = client.get_object(Bucket=bucket_name,
Key=key,
Range=range_)
if not self.client:
self.client = client
return res
try:
obj = s3_load(anon=False)
except Exception:
if not self.client:
obj = s3_load(anon=True)
else:
raise
return obj['Body']
#================================================================= #=================================================================

View File

@ -93,7 +93,7 @@ test_cdx_dir = get_test_dir() + 'cdx/'
def test_s3_read_1(): def test_s3_read_1():
pytest.importorskip('boto') pytest.importorskip('boto3')
res = BlockLoader().load('s3://commoncrawl/crawl-data/CC-MAIN-2015-11/segments/1424936462700.28/warc/CC-MAIN-20150226074102-00159-ip-10-28-5-156.ec2.internal.warc.gz', res = BlockLoader().load('s3://commoncrawl/crawl-data/CC-MAIN-2015-11/segments/1424936462700.28/warc/CC-MAIN-20150226074102-00159-ip-10-28-5-156.ec2.internal.warc.gz',
offset=53235662, offset=53235662,
@ -106,6 +106,19 @@ def test_s3_read_1():
assert reader.readline() == b'WARC/1.0\r\n' assert reader.readline() == b'WARC/1.0\r\n'
assert reader.readline() == b'WARC-Type: response\r\n' assert reader.readline() == b'WARC-Type: response\r\n'
def test_s3_read_2():
pytest.importorskip('boto3')
res = BlockLoader().load('s3://commoncrawl/crawl-data/CC-MAIN-2015-11/index.html')
buff = res.read()
assert len(buff) == 2082
reader = DecompressingBufferedReader(BytesIO(buff))
assert reader.readline() == b'<!DOCTYPE html>\n'
# Error # Error
def test_err_no_such_file(): def test_err_no_such_file():
# no such file # no such file

View File

@ -16,7 +16,7 @@ def to_cdxj(cdx_iter, fields):
return content_type, (cdx.to_cdxj(fields) for cdx in cdx_iter) return content_type, (cdx.to_cdxj(fields) for cdx in cdx_iter)
def to_json(cdx_iter, fields): def to_json(cdx_iter, fields):
content_type = 'application/x-ndjson' content_type = 'text/x-ndjson'
return content_type, (cdx.to_json(fields) for cdx in cdx_iter) return content_type, (cdx.to_json(fields) for cdx in cdx_iter)
def to_text(cdx_iter, fields): def to_text(cdx_iter, fields):

View File

@ -17,6 +17,7 @@ from pywb.utils.format import ParamFormatter, res_template
from pywb.warcserver.index.indexsource import FileIndexSource, RedisIndexSource from pywb.warcserver.index.indexsource import FileIndexSource, RedisIndexSource
from pywb.warcserver.index.cdxops import process_cdx from pywb.warcserver.index.cdxops import process_cdx
from pywb.warcserver.index.query import CDXQuery from pywb.warcserver.index.query import CDXQuery
from pywb.warcserver.index.zipnum import ZipNumIndexSource
import six import six
import glob import glob
@ -55,6 +56,9 @@ class BaseAggregator(object):
err_list = [(name, repr(wbe))] err_list = [(name, repr(wbe))]
def add_name(cdx, name): def add_name(cdx, name):
if not cdx.get('url'):
return cdx
if cdx.get('source'): if cdx.get('source'):
cdx['source'] = name + ':' + cdx['source'] cdx['source'] = name + ':' + cdx['source']
else: else:
@ -245,10 +249,11 @@ class GeventTimeoutAggregator(TimeoutMixin, GeventMixin, BaseSourceListAggregato
#============================================================================= #=============================================================================
class BaseDirectoryIndexSource(BaseAggregator): class BaseDirectoryIndexSource(BaseAggregator):
def __init__(self, base_prefix, base_dir='', name=''): def __init__(self, base_prefix, base_dir='', name='', config=None):
self.base_prefix = base_prefix self.base_prefix = base_prefix
self.base_dir = base_dir self.base_dir = base_dir
self.name = name self.name = name
self.config = config
def _iter_sources(self, params): def _iter_sources(self, params):
the_dir = res_template(self.base_dir, params) the_dir = res_template(self.base_dir, params)
@ -269,7 +274,10 @@ class BaseDirectoryIndexSource(BaseAggregator):
for name in os.listdir(the_dir): for name in os.listdir(the_dir):
filename = os.path.join(the_dir, name) filename = os.path.join(the_dir, name)
if filename.endswith(FileIndexSource.CDX_EXT): is_cdx = filename.endswith(FileIndexSource.CDX_EXT)
is_zip = filename.endswith(ZipNumIndexSource.IDX_EXT)
if is_cdx or is_zip:
#print('Adding ' + filename) #print('Adding ' + filename)
rel_path = os.path.relpath(the_dir, self.base_prefix) rel_path = os.path.relpath(the_dir, self.base_prefix)
if rel_path == '.': if rel_path == '.':
@ -280,7 +288,12 @@ class BaseDirectoryIndexSource(BaseAggregator):
if self.name: if self.name:
full_name = self.name + ':' + full_name full_name = self.name + ':' + full_name
yield full_name, FileIndexSource(filename) if is_cdx:
index_src = FileIndexSource(filename)
else:
index_src = ZipNumIndexSource(filename, self.config)
yield full_name, index_src
def __repr__(self): def __repr__(self):
return '{0}(file://{1})'.format(self.__class__.__name__, return '{0}(file://{1})'.format(self.__class__.__name__,

View File

@ -7,8 +7,8 @@ from pywb.utils.canonicalize import calc_search_range
class CDXQuery(object): class CDXQuery(object):
def __init__(self, params): def __init__(self, params):
self.params = params self.params = params
url = self.url alt_url = self.params.get('alt_url')
url = self.params.get('alt_url', url) url = alt_url or self.url
if not self.params.get('matchType'): if not self.params.get('matchType'):
if url.startswith('*.'): if url.startswith('*.'):
url = self.params['url'] = url[2:] url = self.params['url'] = url[2:]
@ -19,6 +19,9 @@ class CDXQuery(object):
else: else:
self.params['matchType'] = 'exact' self.params['matchType'] = 'exact'
if alt_url:
self.params['alt_url'] = url
start, end = calc_search_range(url=url, start, end = calc_search_range(url=url,
match_type=self.params['matchType'], match_type=self.params['matchType'],
url_canon=self.params.get('_url_canon')) url_canon=self.params.get('_url_canon'))

View File

@ -175,7 +175,7 @@ def test_zip_prefix_load():
cdx_iter, err = results cdx_iter, err = results
results = list(cdx_iter) results = list(cdx_iter)
assert len(results) == 1, results assert len(results) == 1, results
assert results[0] == {"blocks": 38, "pages": 4, "pageSize": 10, "source": "zip"} assert results[0] == {"blocks": 38, "pages": 4, "pageSize": 10}
# Test simple query # Test simple query

View File

@ -1,8 +1,9 @@
from io import BytesIO
import os import os
import collections import collections
import itertools import itertools
import logging import logging
from io import BytesIO
import datetime import datetime
import json import json
import six import six
@ -20,8 +21,8 @@ from pywb.utils.loaders import BlockLoader, read_last_line
from pywb.utils.binsearch import iter_range, linearsearch, search from pywb.utils.binsearch import iter_range, linearsearch, search
#================================================================= # ============================================================================
class ZipBlocks: class ZipBlocks(object):
def __init__(self, part, offset, length, count): def __init__(self, part, offset, length, count):
self.part = part self.part = part
self.offset = offset self.offset = offset
@ -29,7 +30,19 @@ class ZipBlocks:
self.count = count self.count = count
#================================================================= # ============================================================================
class AlwaysJsonResponse(dict):
def to_json(self, *args):
return json.dumps(self)
def to_text(self, *args):
return json.dumps(self)
def to_cdxj(self, *args):
return json.dumps(self)
# ============================================================================
#TODO: see if these could be combined with warc path resolvers #TODO: see if these could be combined with warc path resolvers
class LocMapResolver(object): class LocMapResolver(object):
@ -76,7 +89,7 @@ class LocMapResolver(object):
return self.loc_map[part] return self.loc_map[part]
#================================================================= # ============================================================================
class LocPrefixResolver(object): class LocPrefixResolver(object):
""" Use a prefix lookup, where the prefix can either be a fixed """ Use a prefix lookup, where the prefix can either be a fixed
string or can be a regex replacement of the index summary path string or can be a regex replacement of the index summary path
@ -95,10 +108,11 @@ class LocPrefixResolver(object):
return [self.prefix + part] return [self.prefix + part]
#================================================================= # ============================================================================
class ZipNumIndexSource(BaseIndexSource): class ZipNumIndexSource(BaseIndexSource):
DEFAULT_RELOAD_INTERVAL = 10 # in minutes DEFAULT_RELOAD_INTERVAL = 10 # in minutes
DEFAULT_MAX_BLOCKS = 10 DEFAULT_MAX_BLOCKS = 10
IDX_EXT = ('.idx', '.summary')
def __init__(self, summary, config=None): def __init__(self, summary, config=None):
self.max_blocks = self.DEFAULT_MAX_BLOCKS self.max_blocks = self.DEFAULT_MAX_BLOCKS
@ -118,7 +132,6 @@ class ZipNumIndexSource(BaseIndexSource):
reload_ival = config.get('reload_interval', reload_ival) reload_ival = config.get('reload_interval', reload_ival)
if isinstance(loc, dict): if isinstance(loc, dict):
self.loc_resolver = LocPrefixResolver(summary, loc) self.loc_resolver = LocPrefixResolver(summary, loc)
else: else:
@ -132,23 +145,6 @@ class ZipNumIndexSource(BaseIndexSource):
self.blk_loader = BlockLoader(cookie_maker=cookie_maker) self.blk_loader = BlockLoader(cookie_maker=cookie_maker)
# @staticmethod
# def reload_timed(timestamp, val, delta, func):
# now = datetime.datetime.now()
# if now - timestamp >= delta:
# func()
# return now
# return None
#
# def reload_loc(self):
# reload_time = self.reload_timed(self.loc_update_time,
# self.loc_map,
# self.reload_interval,
# self.load_loc)
#
# if reload_time:
# self.loc_update_time = reload_time
def load_index(self, params): def load_index(self, params):
self.loc_resolver.load_loc() self.loc_resolver.load_loc()
return self._do_load_cdx(self.summary, CDXQuery(params)) return self._do_load_cdx(self.summary, CDXQuery(params))
@ -177,12 +173,12 @@ class ZipNumIndexSource(BaseIndexSource):
return gen_cdx() return gen_cdx()
def _page_info(self, pages, pagesize, blocks): def _page_info(self, pages, pagesize, blocks):
info = dict(pages=pages, info = AlwaysJsonResponse(
pages=pages,
pageSize=pagesize, pageSize=pagesize,
blocks=blocks) blocks=blocks)
#return json.dumps(info) + '\n'
return info return info
def compute_page_range(self, reader, query): def compute_page_range(self, reader, query):
@ -338,7 +334,6 @@ class ZipNumIndexSource(BaseIndexSource):
a line iterator which decompresses and returns one line at a time, a line iterator which decompresses and returns one line at a time,
bounded by query.key and query.end_key bounded by query.key and query.end_key
""" """
if (logging.getLogger().getEffectiveLevel() <= logging.DEBUG): if (logging.getLogger().getEffectiveLevel() <= logging.DEBUG):
msg = 'Loading {b.count} blocks from {loc}:{b.offset}+{b.length}' msg = 'Loading {b.count} blocks from {loc}:{b.offset}+{b.length}'
logging.debug(msg.format(b=blocks, loc=location)) logging.debug(msg.format(b=blocks, loc=location))
@ -391,7 +386,7 @@ class ZipNumIndexSource(BaseIndexSource):
if value.startswith('file://'): if value.startswith('file://'):
value = value[7:] value = value[7:]
if is_zipnum or value.endswith(('.idx', '.summary')): if is_zipnum or value.endswith(cls.IDX_EXT):
return cls(value, None) return cls(value, None)
@classmethod @classmethod

View File

@ -78,9 +78,10 @@ class WarcServer(BaseWarcServer):
templ = self.config.get(name) templ = self.config.get(name)
def get_full_path(path): def get_full_path(path):
path = os.path.join(self.AUTO_COLL_TEMPL, path, '') if '://' not in path:
if abs_path and '://' not in path: path = os.path.join(self.AUTO_COLL_TEMPL, path, '')
path = os.path.join(abs_path, path) if abs_path:
path = os.path.join(abs_path, path)
return path return path
if isinstance(templ, str): if isinstance(templ, str):
@ -94,7 +95,8 @@ class WarcServer(BaseWarcServer):
return return
dir_source = CacheDirectoryIndexSource(base_prefix=self.root_dir, dir_source = CacheDirectoryIndexSource(base_prefix=self.root_dir,
base_dir=self.index_paths) base_dir=self.index_paths,
config=self.config)
return DefaultResourceHandler(dir_source, self.archive_paths) return DefaultResourceHandler(dir_source, self.archive_paths)

View File

@ -0,0 +1,50 @@
from .base_config_test import BaseConfigTest, CollsDirMixin
from pywb.manager.manager import main as manager
from pywb.warcserver.index.cdxobject import CDXObject
import shutil
from pywb import get_test_dir
import os
import json
# ============================================================================
class TestZipnumAutoDir(CollsDirMixin, BaseConfigTest):
@classmethod
def setup_class(cls):
super(TestZipnumAutoDir, cls).setup_class('config_test.yaml')
manager(['init', 'testzip'])
cls.archive_dir = os.path.join(cls.root_dir, '_test_colls', 'testzip', 'archive')
cls.index_dir = os.path.join(cls.root_dir, '_test_colls', 'testzip', 'indexes')
zip_cdx = os.path.join(get_test_dir(), 'zipcdx')
shutil.copy(os.path.join(zip_cdx, 'zipnum-sample.idx'), cls.index_dir)
shutil.copy(os.path.join(zip_cdx, 'zipnum-sample.cdx.gz'), cls.index_dir)
shutil.copy(os.path.join(zip_cdx, 'zipnum-sample.loc'), cls.index_dir)
shutil.copy(os.path.join(get_test_dir(), 'warcs', 'iana.warc.gz'), cls.archive_dir)
def test_cdxj_query(self):
res = self.testapp.get('/testzip/cdx?url=iana.org/domains/*')
assert len(res.text.rstrip().split('\n')) == 9
def test_num_pages_query(self):
res = self.testapp.get('/testzip/cdx?url=http://iana.org/domains/&matchType=domain&showNumPages=true&pageSize=4')
res.content_type = 'text/json'
assert(res.json == {"blocks": 38, "pages": 10, "pageSize": 4})
def test_paged_index_query(self):
res = self.testapp.get('/testzip/cdx?url=http://iana.org/domains/&matchType=domain&output=json&showPagedIndex=true&pageSize=4&page=1')
lines = [json.loads(line) for line in res.text.rstrip().split('\n')]
assert lines[0] == {"urlkey": "org,iana)/_css/2013.1/fonts/opensans-bold.ttf 20140126200912", "part": "zipnum", "offset": 1150, "length": 235, "lineno": 5}
assert lines[1] == {"urlkey": "org,iana)/_css/2013.1/fonts/opensans-bold.ttf 20140126201240", "part": "zipnum", "offset": 1385, "length": 307, "lineno": 6}
assert lines[2] == {"urlkey": "org,iana)/_css/2013.1/fonts/opensans-regular.ttf 20140126200654", "part": "zipnum", "offset": 1692, "length": 235, "lineno": 7}
assert lines[3] == {"urlkey": "org,iana)/_css/2013.1/fonts/opensans-regular.ttf 20140126200816", "part": "zipnum", "offset": 1927, "length": 231, "lineno": 8}