1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-25 23:47:47 +01:00
pywb/pywb/utils/statusandheaders.py
Ilya Kreymer bd841b91a9 more python 3 support work -- pywb.cdx, pywb.warc tests succeed
most relative imports replaced with absolute
2016-02-18 21:26:40 -08:00

238 lines
7.8 KiB
Python

"""
Representation and parsing of HTTP-style status + headers
"""
import pprint
from copy import copy
from six.moves import range
import six
from pywb.utils.loaders import to_native_str
WRAP_WIDTH = 80
#=================================================================
class StatusAndHeaders(object):
"""
Representation of parsed http-style status line and headers
Status Line if first line of request/response
Headers is a list of (name, value) tuples
An optional protocol which appears on first line may be specified
"""
def __init__(self, statusline, headers, protocol='', total_len=0):
self.statusline = statusline
self.headers = headers
self.protocol = protocol
self.total_len = total_len
def get_header(self, name):
"""
return header (name, value)
if found
"""
name_lower = name.lower()
for value in self.headers:
if value[0].lower() == name_lower:
return value[1]
def replace_header(self, name, value):
"""
replace header with new value or add new header
return old header value, if any
"""
name_lower = name.lower()
for index in range(len(self.headers) - 1, -1, -1):
curr_name, curr_value = self.headers[index]
if curr_name.lower() == name_lower:
self.headers[index] = (curr_name, value)
return curr_value
self.headers.append((name, value))
return None
def replace_headers(self, header_dict):
"""
replace all headers in header_dict that already exist
add any remaining headers
"""
header_dict = copy(header_dict)
for index in range(len(self.headers) - 1, -1, -1):
curr_name, curr_value = self.headers[index]
name_lower = curr_name.lower()
if name_lower in header_dict:
self.headers[index] = (curr_name, header_dict[name_lower])
del header_dict[name_lower]
for name, value in header_dict.iteritems():
self.headers.append((name, value))
def remove_header(self, name):
"""
Remove header (case-insensitive)
return True if header removed, False otherwise
"""
name_lower = name.lower()
for index in range(len(self.headers) - 1, -1, -1):
if self.headers[index][0].lower() == name_lower:
del self.headers[index]
return True
return False
def get_statuscode(self):
"""
Return the statuscode part of the status response line
(Assumes no protocol in the statusline)
"""
code = self.statusline.split(' ', 1)[0]
return code
def validate_statusline(self, valid_statusline):
"""
Check that the statusline is valid, eg. starts with a numeric
code. If not, replace with passed in valid_statusline
"""
code = self.get_statuscode()
try:
code = int(code)
assert(code > 0)
return True
except(ValueError, AssertionError):
self.statusline = valid_statusline
return False
def add_range(self, start, part_len, total_len):
"""
Add range headers indicating that this a partial response
"""
content_range = 'bytes {0}-{1}/{2}'.format(start,
start + part_len - 1,
total_len)
self.statusline = '206 Partial Content'
self.replace_header('Content-Range', content_range)
self.replace_header('Accept-Ranges', 'bytes')
return self
def __repr__(self):
headers_str = pprint.pformat(self.headers, indent=2, width=WRAP_WIDTH)
return "StatusAndHeaders(protocol = '{0}', statusline = '{1}', \
headers = {2})".format(self.protocol, self.statusline, headers_str)
def __eq__(self, other):
return (self.statusline == other.statusline and
self.headers == other.headers and
self.protocol == other.protocol)
#=================================================================
def _strip_count(string, total_read):
length = len(string)
return string.rstrip(), total_read + length
#=================================================================
class StatusAndHeadersParser(object):
"""
Parser which consumes a stream support readline() to read
status and headers and return a StatusAndHeaders object
"""
def __init__(self, statuslist, verify=True):
self.statuslist = statuslist
self.verify = verify
def parse(self, stream, full_statusline=None):
"""
parse stream for status line and headers
return a StatusAndHeaders object
support continuation headers starting with space or tab
"""
def readline():
return to_native_str(stream.readline())
# status line w newlines intact
if full_statusline is None:
full_statusline = readline()
else:
full_statusline = to_native_str(full_statusline)
statusline, total_read = _strip_count(full_statusline, 0)
headers = []
# at end of stream
if total_read == 0:
raise EOFError()
elif not statusline:
return StatusAndHeaders(statusline=statusline,
headers=headers,
protocol='',
total_len=total_read)
# validate only if verify is set
if self.verify:
protocol_status = self.split_prefix(statusline, self.statuslist)
if not protocol_status:
msg = 'Expected Status Line starting with {0} - Found: {1}'
msg = msg.format(self.statuslist, statusline)
raise StatusAndHeadersParserException(msg, full_statusline)
else:
protocol_status = statusline.split(' ', 1)
line, total_read = _strip_count(readline(), total_read)
while line:
result = line.split(':', 1)
if len(result) == 2:
name = result[0].rstrip(' \t')
value = result[1].lstrip()
else:
name = result[0]
value = None
next_line, total_read = _strip_count(readline(),
total_read)
# append continuation lines, if any
while next_line and next_line.startswith((' ', '\t')):
if value is not None:
value += next_line
next_line, total_read = _strip_count(readline(),
total_read)
if value is not None:
header = (name, value)
headers.append(header)
line = next_line
return StatusAndHeaders(statusline=protocol_status[-1].strip(),
headers=headers,
protocol=protocol_status[0],
total_len=total_read)
@staticmethod
def split_prefix(key, prefixs):
"""
split key string into prefix and remainder
for first matching prefix from a list
"""
key_upper = key.upper()
for prefix in prefixs:
if key_upper.startswith(prefix):
plen = len(prefix)
return (key_upper[:plen], key[plen:])
#=================================================================
class StatusAndHeadersParserException(Exception):
"""
status + headers parsing exception
"""
def __init__(self, msg, statusline):
super(StatusAndHeadersParserException, self).__init__(msg)
self.statusline = statusline