1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 00:03:28 +01:00

loaders: webhdfs loader: support optional '&user.name=<name>' param from WEBHDFS_USER env var or '&delegation=<token>' from WEBHDFS_TOKEN env var (fixes ukwa/ukwa-pywb#5)

This commit is contained in:
Ilya Kreymer 2018-02-07 23:22:47 -08:00 committed by John Berlin
parent ec88e962b3
commit 959481fd48
No known key found for this signature in database
GPG Key ID: 6EF5E4B442011B02
2 changed files with 44 additions and 19 deletions

View File

@ -404,23 +404,30 @@ class S3Loader(BaseLoader):
# =================================================================
class WebHDFSLoader(HttpLoader):
HTTP_URL = 'http://{host}/webhdfs/v1{path}?op=OPEN&offset={offset}'
LENGTH_PARAM = '&length={length}'
HTTP_URL = 'http://{host}/webhdfs/v1{path}?'
def load(self, url, offset, length):
parts = urlsplit(url)
http_url = self.HTTP_URL
http_url = self.HTTP_URL.format(host=parts.netloc,
path=parts.path)
params = {'op': 'OPEN',
'offset': str(offset)
}
if length > 0:
http_url += self.LENGTH_PARAM
params['length'] = str(length)
full_url = http_url.format(host=parts.netloc,
path=parts.path,
offset=offset,
length=length)
if os.environ.get('WEBHDFS_USER'):
params['user.name'] = os.environ.get('WEBHDFS_USER')
return super(WebHDFSLoader, self).load(full_url, 0, -1)
if os.environ.get('WEBHDFS_TOKEN'):
params['delegation'] = os.environ.get('WEBHDFS_TOKEN')
http_url += urlencode(params)
return super(WebHDFSLoader, self).load(http_url, 0, -1)
# =================================================================

View File

@ -85,6 +85,8 @@ from pywb.utils.loaders import BlockLoader, HMACCookieMaker, to_file_url
from pywb.utils.loaders import extract_client_cookie
from pywb.utils.loaders import read_last_line
from pywb.utils.canonicalize import canonicalize
from mock import patch
from warcio.bufferedreaders import DecompressingBufferedReader
@ -119,20 +121,36 @@ def test_s3_read_2():
reader = DecompressingBufferedReader(BytesIO(buff))
assert reader.readline() == b'<!DOCTYPE html>\n'
def test_mock_webhdfs_load():
def mock_load(expected):
def mock(self, url, offset, length):
assert url == expected
assert offset == 0
assert length == -1
return None
def mock_load(expected):
def mock(self, url, offset, length):
assert canonicalize(url) == canonicalize(expected)
assert offset == 0
assert length == -1
return None
return mock
return mock
with patch('pywb.utils.loaders.HttpLoader.load', mock_load('http://remote-host:1234/webhdfs/v1/some/file.warc.gz?op=OPEN&offset=10&length=50')):
def test_mock_webhdfs_load_1():
expected = 'http://remote-host:1234/webhdfs/v1/some/file.warc.gz?op=OPEN&offset=10&length=50'
with patch('pywb.utils.loaders.HttpLoader.load', mock_load(expected)):
res = BlockLoader().load('webhdfs://remote-host:1234/some/file.warc.gz', 10, 50)
with patch('pywb.utils.loaders.HttpLoader.load', mock_load('http://remote-host/webhdfs/v1/some/file.warc.gz?op=OPEN&offset=10')):
def test_mock_webhdfs_load_2():
expected = 'http://remote-host/webhdfs/v1/some/file.warc.gz?op=OPEN&offset=10'
with patch('pywb.utils.loaders.HttpLoader.load', mock_load(expected)):
res = BlockLoader().load('webhdfs://remote-host/some/file.warc.gz', 10, -1)
def test_mock_webhdfs_load_3_username():
os.environ['WEBHDFS_USER'] = 'someuser'
expected = 'http://remote-host/webhdfs/v1/some/file.warc.gz?op=OPEN&offset=10&user.name=someuser'
with patch('pywb.utils.loaders.HttpLoader.load', mock_load(expected)):
res = BlockLoader().load('webhdfs://remote-host/some/file.warc.gz', 10, -1)
def test_mock_webhdfs_load_4_token():
os.environ['WEBHDFS_USER'] = ''
os.environ['WEBHDFS_TOKEN'] = 'ATOKEN'
expected = 'http://remote-host/webhdfs/v1/some/file.warc.gz?op=OPEN&offset=10&delegation=ATOKEN'
with patch('pywb.utils.loaders.HttpLoader.load', mock_load(expected)):
res = BlockLoader().load('webhdfs://remote-host/some/file.warc.gz', 10, -1)