#import chardet import pkgutil import yaml import re from chardet.universaldetector import UniversalDetector from io import BytesIO from header_rewriter import RewrittenStatusAndHeaders from rewriterules import RewriteRules from pywb.utils.dsrules import RuleSet from pywb.utils.statusandheaders import StatusAndHeaders from pywb.utils.bufferedreaders import DecompressingBufferedReader from pywb.utils.bufferedreaders import ChunkedDataReader, BufferedReader #================================================================= class RewriteContent: HEAD_REGEX = re.compile(r'<\s*head\b[^>]*[>]+', re.I) TAG_REGEX = re.compile(r'^\s*\<') CHARSET_REGEX = re.compile(r']*?[\s;"\']charset\s*=[\s"\']*([^\s"\'/>]*)') BUFF_SIZE = 16384 def __init__(self, ds_rules_file=None, is_framed_replay=False): self.ruleset = RuleSet(RewriteRules, 'rewrite', default_rule_config={}, ds_rules_file=ds_rules_file) if is_framed_replay == 'inverse': self.defmod = 'mp_' else: self.defmod = '' def sanitize_content(self, status_headers, stream): # remove transfer encoding chunked and wrap in a dechunking stream if (status_headers.remove_header('transfer-encoding')): stream = ChunkedDataReader(stream) return (status_headers, stream) def _rewrite_headers(self, urlrewriter, rule, status_headers, stream, urlkey=''): header_rewriter_class = rule.rewriters['header'] cookie_rewriter = None if urlrewriter: cookie_rewriter = urlrewriter.get_cookie_rewriter(rule) rewritten_headers = (header_rewriter_class(). rewrite(status_headers, urlrewriter, cookie_rewriter)) # note: since chunk encoding may/may not be valid, # the approach taken here is to *always* attempt # to dechunk if 'transfer-encoding: chunked' is present # # an alternative may be to serve chunked unless # content rewriting is needed # todo: possible revisit this approach if (rewritten_headers. contains_removed_header('transfer-encoding', 'chunked')): stream = ChunkedDataReader(stream) return (rewritten_headers, stream) def _check_encoding(self, rewritten_headers, stream, enc): if (rewritten_headers. contains_removed_header('content-encoding', enc)): #optimize: if already a ChunkedDataReader, add the encoding if isinstance(stream, ChunkedDataReader): stream.set_decomp(enc) else: stream = DecompressingBufferedReader(stream, decomp_type=enc) return stream def rewrite_content(self, urlrewriter, headers, stream, head_insert_func=None, urlkey='', cdx=None): wb_url = urlrewriter.wburl if (wb_url.is_identity or (not head_insert_func and wb_url.is_banner_only)): status_headers, stream = self.sanitize_content(headers, stream) return (status_headers, self.stream_to_gen(stream), False) if wb_url.is_banner_only: urlrewriter = None rule = self.ruleset.get_first_match(urlkey) (rewritten_headers, stream) = self._rewrite_headers(urlrewriter, rule, headers, stream) status_headers = rewritten_headers.status_headers # use rewritten headers, but no further rewriting needed if rewritten_headers.text_type is None: return (status_headers, self.stream_to_gen(stream), False) # Handle text content rewriting # ==================================================================== # special case -- need to ungzip the body text_type = rewritten_headers.text_type # see known js/css modifier specified, the context should run # default text_type mod = wb_url.mod stream_raw = False encoding = None first_buff = '' stream = self._check_encoding(rewritten_headers, stream, 'gzip') stream = self._check_encoding(rewritten_headers, stream, 'deflate') if mod == 'js_': text_type, stream = self._resolve_text_type('js', text_type, stream) elif mod == 'cs_': text_type, stream = self._resolve_text_type('css', text_type, stream) rewriter_class = rule.rewriters[text_type] # for html, need to perform header insert, supply js, css, xml # rewriters if text_type == 'html': head_insert_str = '' charset = rewritten_headers.charset # if no charset set, attempt to extract from first 1024 if not rewritten_headers.charset: first_buff = stream.read(1024) charset = self._extract_html_charset(first_buff, status_headers) if head_insert_func: if not charset: charset = 'utf-8' head_insert_str = head_insert_func(rule, cdx) head_insert_str = head_insert_str.encode(charset) if wb_url.is_banner_only: gen = self._head_insert_only_gen(head_insert_str, stream, first_buff) content_len = headers.get_header('Content-Length') try: content_len = int(content_len) except Exception: content_len = None if content_len and content_len >= 0: content_len = str(content_len + len(head_insert_str)) status_headers.replace_header('Content-Length', content_len) return (status_headers, gen, False) rewriter = rewriter_class(urlrewriter, js_rewriter_class=rule.rewriters['js'], css_rewriter_class=rule.rewriters['css'], head_insert=head_insert_str, defmod=self.defmod, parse_comments=rule.parse_comments) else: if wb_url.is_banner_only: return (status_headers, self.stream_to_gen(stream), False) # apply one of (js, css, xml) rewriters rewriter = rewriter_class(urlrewriter) # align to line end for all non-html rewriting align = (text_type != 'html') # Create rewriting generator gen = self.rewrite_text_stream_to_gen(stream, rewrite_func=rewriter.rewrite, final_read_func=rewriter.close, first_buff=first_buff, align_to_line=align) return (status_headers, gen, True) @staticmethod def _extract_html_charset(buff, status_headers): charset = None m = RewriteContent.CHARSET_REGEX.search(buff) if m: charset = m.group(1) content_type = 'text/html; charset=' + charset status_headers.replace_header('content-type', content_type) return charset @staticmethod def _resolve_text_type(mod, text_type, stream): if text_type == 'css' and mod == 'js': return 'css', stream # only attempt to resolve between html and other text types if text_type != 'html': return mod, stream buff = stream.read(128) wrapped_stream = BufferedReader(stream, starting_data=buff) # check if starts with a tag, then likely html if RewriteContent.TAG_REGEX.match(buff): mod = 'html' return mod, wrapped_stream def _head_insert_only_gen(self, insert_str, stream, first_buff=''): buff = first_buff max_len = 1024 - len(first_buff) while max_len > 0: curr = stream.read(max_len) if not curr: break max_len -= len(buff) buff += curr matcher = self.HEAD_REGEX.search(buff) if matcher: yield buff[:matcher.end()] yield insert_str yield buff[matcher.end():] else: yield insert_str yield buff for buff in self.stream_to_gen(stream): yield buff @staticmethod def _decode_buff(buff, stream, encoding): # pragma: no coverage try: buff = buff.decode(encoding) except UnicodeDecodeError, e: # chunk may have cut apart unicode bytes -- add 1-3 bytes and retry for i in range(3): buff += stream.read(1) try: buff = buff.decode(encoding) break except UnicodeDecodeError: pass else: raise return buff @staticmethod def stream_to_gen(stream): """ Convert stream to an iterator, reading BUFF_SIZE bytes """ try: while True: buff = stream.read(RewriteContent.BUFF_SIZE) yield buff if not buff: break finally: stream.close() @staticmethod def rewrite_text_stream_to_gen(stream, rewrite_func, final_read_func, first_buff, align_to_line): """ Convert stream to generator using applying rewriting func to each portion of the stream. Align to line boundaries if needed. """ try: has_closed = hasattr(stream, 'closed') buff = first_buff while True: if buff: buff = rewrite_func(buff) yield buff buff = stream.read(RewriteContent.BUFF_SIZE) # on 2.6, readline() (but not read()) throws an exception # if stream already closed, so check stream.closed if present if (buff and align_to_line and (not has_closed or not stream.closed)): buff += stream.readline() if not buff: break # For adding a tail/handling final buffer buff = final_read_func() if buff: yield buff finally: stream.close()