From f9c9443d2f3bfb4d0960d68b41873aeb6cc8fe60 Mon Sep 17 00:00:00 2001 From: Adam Miller Date: Wed, 11 Dec 2019 01:54:11 +0000 Subject: [PATCH 1/8] Beginning modifications to pass along a dummy RecordedUrl on connection timeout for logging --- warcprox/crawl_log.py | 9 ++++--- warcprox/mitmproxy.py | 12 ++++++--- warcprox/warcproxy.py | 57 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 71 insertions(+), 7 deletions(-) diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 2f7ea5e..0ba075b 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -40,7 +40,7 @@ class CrawlLogger(object): def notify(self, recorded_url, records): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() - extra_info = {'contentSize': recorded_url.size,} + extra_info = {'contentSize': recorded_url.size,} if recorded_url.size > 0 else {} if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset @@ -51,10 +51,13 @@ class CrawlLogger(object): payload_digest = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - else: + elif records is not None and len(records) > 0: # WARCPROX_WRITE_RECORD request content_length = int(records[0].get_header(b'Content-Length')) payload_digest = records[0].get_header(b'WARC-Payload-Digest') + else: + content_length = 0 + payload_digest = '-' fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), '% 5s' % recorded_url.status, @@ -67,7 +70,7 @@ class CrawlLogger(object): '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, - recorded_url.duration.microseconds//1000), + recorded_url.duration.microseconds//1000) if (recorded_url.timestamp is not None and recorded_url.duration is not None) else '-', payload_digest, recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'), 'duplicate:digest' if records and records[0].type == b'revisit' else '-', diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 6b32a40..f4279de 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -359,7 +359,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem handling %r: %r", self.requestline, e) if type(e) is socket.timeout: - self.send_error(504, str(e)) + self.send_error(-2, str(e)) else: self.send_error(500, str(e)) except Exception as f: @@ -425,7 +425,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): response_code = 500 cache = False if isinstance(e, (socket.timeout, TimeoutError,)): - response_code = 504 + response_code = -2 cache = True elif isinstance(e, HTTPError): response_code = 502 @@ -459,6 +459,12 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): return def send_error(self, code, message=None, explain=None): + + if code == -2: + return_code = 504 + else: + return_code = code + # BaseHTTPRequestHandler.send_response_only() in http/server.py # does this: # if not hasattr(self, '_headers_buffer'): @@ -470,7 +476,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._headers_buffer = [] try: return http_server.BaseHTTPRequestHandler.send_error( - self, code, message, explain) + self, return_code, message, explain) except Exception as e: level = logging.ERROR if isinstance(e, OSError) and e.errno == 9: diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 9d23244..0c780c3 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -343,13 +343,68 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): except: self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise + def send_error(self, code, message=None, explain=None): + super().send_error(code, message, explain) + + # Build request + req_str = '{} {} {}\r\n'.format( + self.command, self.path, self.request_version) + + # Swallow headers that don't make sense to forward on, i.e. most + # hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5. + # self.headers is an email.message.Message, which is case-insensitive + # and doesn't throw KeyError in __delitem__ + for key in ( + 'Connection', 'Proxy-Connection', 'Keep-Alive', + 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): + del self.headers[key] + + self.headers['Via'] = via_header_value( + self.headers.get('Via'), + self.request_version.replace('HTTP/', '')) + + # Add headers to the request + # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( + req_str += '\r\n'.join( + '{}: {}'.format(k,v) for (k,v) in self.headers.items()) + + warcprox_meta = None + raw_warcprox_meta = self.headers.get('Warcprox-Meta') + if raw_warcprox_meta: + warcprox_meta = json.loads(raw_warcprox_meta) + + req = req_str.encode('latin1') + b'\r\n\r\n' + recorded_url = RecordedUrl( + url=self.url, + remote_ip=b'', + warcprox_meta=warcprox_meta, + status=code, + client_ip=self.client_address[0], + method=self.command, + content_type="unknown", + response_recorder=None, + request_data=req, + duration=None ,size=0, + timestamp=None, host=self.hostname, + do_not_archive=True, + referer=self.headers.get('referer')) + + self.server.recorded_url_q.put(recorded_url) + def log_message(self, fmt, *args): # logging better handled elsewhere? pass RE_MIMETYPE = re.compile(r'[;\s]') - +def via_header_value(orig, request_version): + via = orig + if via: + via += ', ' + else: + via = '' + via = via + '%s %s' % (request_version, 'warcprox') + return via class RecordedUrl: logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") From e88a88f247bc601edde940af1adc920c21b8240f Mon Sep 17 00:00:00 2001 From: Adam Miller Date: Fri, 3 Jan 2020 20:43:47 +0000 Subject: [PATCH 2/8] Refactor failed requests into new class. --- warcprox/__init__.py | 2 +- warcprox/crawl_log.py | 16 ++++-- warcprox/dedup.py | 2 + warcprox/mitmproxy.py | 16 ++---- warcprox/stats.py | 4 ++ warcprox/warcproxy.py | 103 ++++++++++++++++++++++----------------- warcprox/writerthread.py | 2 + 7 files changed, 84 insertions(+), 61 deletions(-) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 9cd09a8..3dd84c8 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -223,7 +223,7 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.name = listener.__class__.__name__ def _process_url(self, recorded_url): - return self.listener.notify(recorded_url, recorded_url.warc_records) + return self.listener.notify(recorded_url, recorded_url.warc_records if hasattr(recorded_url, "warc_records") else None) def start(self): if hasattr(self.listener, 'start'): diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 0ba075b..b30dd30 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -25,6 +25,7 @@ import json import os import warcprox import socket +from urllib3.exceptions import TimeoutError, HTTPError class CrawlLogger(object): def __init__(self, dir_, options=warcprox.Options()): @@ -40,13 +41,14 @@ class CrawlLogger(object): def notify(self, recorded_url, records): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() - extra_info = {'contentSize': recorded_url.size,} if recorded_url.size > 0 else {} + status = self.get_artificial_status(recorded_url) + extra_info = {'contentSize': recorded_url.size,} if hasattr(recorded_url, "size") and recorded_url.size > 0 else {} if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset if recorded_url.method != 'GET': extra_info['method'] = recorded_url.method - if recorded_url.response_recorder: + if hasattr(recorded_url, "response_recorder") and recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( recorded_url.payload_digest, @@ -60,12 +62,12 @@ class CrawlLogger(object): payload_digest = '-' fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), - '% 5s' % recorded_url.status, + '% 5s' % status, '% 10s' % content_length, recorded_url.url, '-', # hop path recorded_url.referer or '-', - recorded_url.mimetype or '-', + recorded_url.mimetype if hasattr(recorded_url, "mimetype") and recorded_url.mimetype is not None else '-', '-', '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( recorded_url.timestamp, @@ -92,3 +94,9 @@ class CrawlLogger(object): with open(crawl_log_path, 'ab') as f: f.write(line) + def get_artificial_status(self, recorded_url): + if hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (socket.timeout, TimeoutError, )): + return '-2' + else: + return recorded_url.status + diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 0e09239..a07091e 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -65,6 +65,8 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): self.dedup_db = dedup_db def _process_url(self, recorded_url): + if not hasattr(recorded_url, 'response_recorder'): + return if (recorded_url.response_recorder and recorded_url.payload_digest and self.should_dedup(recorded_url)): diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index f4279de..3bbefb9 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -359,7 +359,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem handling %r: %r", self.requestline, e) if type(e) is socket.timeout: - self.send_error(-2, str(e)) + self.send_error(504, str(e), exception=e) else: self.send_error(500, str(e)) except Exception as f: @@ -425,7 +425,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): response_code = 500 cache = False if isinstance(e, (socket.timeout, TimeoutError,)): - response_code = -2 + response_code = 504 cache = True elif isinstance(e, HTTPError): response_code = 502 @@ -440,7 +440,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) - self.send_error(response_code) + self.send_error(response_code, exception=e) return try: @@ -458,13 +458,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.send_error(502) return - def send_error(self, code, message=None, explain=None): - - if code == -2: - return_code = 504 - else: - return_code = code - + def send_error(self, code, message=None, explain=None, exception=None): # BaseHTTPRequestHandler.send_response_only() in http/server.py # does this: # if not hasattr(self, '_headers_buffer'): @@ -476,7 +470,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._headers_buffer = [] try: return http_server.BaseHTTPRequestHandler.send_error( - self, return_code, message, explain) + self, code, message, explain) except Exception as e: level = logging.ERROR if isinstance(e, OSError) and e.errno == 9: diff --git a/warcprox/stats.py b/warcprox/stats.py index 1a71cad..2fe0848 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -162,6 +162,8 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor): def _tally_batch(self, batch): batch_buckets = {} for recorded_url in batch: + if not hasattr(recorded_url, 'response_recorder'): + continue for bucket in self.buckets(recorded_url): bucket_stats = batch_buckets.get(bucket) if not bucket_stats: @@ -297,6 +299,8 @@ class RunningStats: (self.first_snap_time - 120 + i * 10, 0, 0)) def notify(self, recorded_url, records): + if not hasattr(recorded_url, 'response_recorder'): + return with self._lock: self.urls += 1 if records: diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 0c780c3..2603759 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -343,8 +343,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): except: self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise - def send_error(self, code, message=None, explain=None): - super().send_error(code, message, explain) + def send_error(self, code, message=None, explain=None, exception=None): + super().send_error(code, message=message, explain=explain, exception=exception) # Build request req_str = '{} {} {}\r\n'.format( @@ -359,10 +359,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): del self.headers[key] - self.headers['Via'] = via_header_value( - self.headers.get('Via'), - self.request_version.replace('HTTP/', '')) - # Add headers to the request # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( req_str += '\r\n'.join( @@ -374,22 +370,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): warcprox_meta = json.loads(raw_warcprox_meta) req = req_str.encode('latin1') + b'\r\n\r\n' - recorded_url = RecordedUrl( + failed_url = FailedUrl( url=self.url, - remote_ip=b'', + request_data=req, warcprox_meta=warcprox_meta, status=code, client_ip=self.client_address[0], method=self.command, - content_type="unknown", - response_recorder=None, - request_data=req, - duration=None ,size=0, - timestamp=None, host=self.hostname, + timestamp=None, + host=self.hostname, + duration=None, + referer=self.headers.get('referer'), do_not_archive=True, - referer=self.headers.get('referer')) + exception=exception) - self.server.recorded_url_q.put(recorded_url) + self.server.recorded_url_q.put(failed_url) def log_message(self, fmt, *args): @@ -397,23 +392,12 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): pass RE_MIMETYPE = re.compile(r'[;\s]') -def via_header_value(orig, request_version): - via = orig - if via: - via += ', ' - else: - via = '' - via = via + '%s %s' % (request_version, 'warcprox') - return via -class RecordedUrl: - logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") - def __init__(self, url, request_data, response_recorder, remote_ip, - warcprox_meta=None, content_type=None, custom_type=None, - status=None, size=None, client_ip=None, method=None, - timestamp=None, host=None, duration=None, referer=None, - payload_digest=None, truncated=None, warc_records=None, - do_not_archive=False): +class RequestedUrl: + logger = logging.getLogger("warcprox.warcproxy.RequestedUrl") + def __init__(self, url, request_data, warcprox_meta=None, status=None, + client_ip=None, method=None, timestamp=None, host=None, duration=None, + referer=None, do_not_archive=True): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -421,13 +405,7 @@ class RecordedUrl: else: self.url = url - if type(remote_ip) is not bytes: - self.remote_ip = remote_ip.encode('ascii') - else: - self.remote_ip = remote_ip - self.request_data = request_data - self.response_recorder = response_recorder if warcprox_meta: if 'captures-bucket' in warcprox_meta: @@ -444,6 +422,47 @@ class RecordedUrl: else: self.warcprox_meta = {} + self.status = status + self.client_ip = client_ip + self.method = method + self.timestamp = timestamp + self.host = host + self.duration = duration + self.referer = referer + self.do_not_archive = do_not_archive + +class FailedUrl(RequestedUrl): + logger = logging.getLogger("warcprox.warcproxy.FailedUrl") + + def __init__(self, url, request_data, warcprox_meta=None, status=None, + client_ip=None, method=None, timestamp=None, host=None, duration=None, + referer=None, do_not_archive=True, exception=None): + + super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status, + client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, + referer=referer, do_not_archive=do_not_archive) + + self.exception = exception + +class RecordedUrl(RequestedUrl): + logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") + + def __init__(self, url, request_data, response_recorder, remote_ip, + warcprox_meta=None, content_type=None, custom_type=None, + status=None, size=None, client_ip=None, method=None, + timestamp=None, host=None, duration=None, referer=None, + payload_digest=None, truncated=None, warc_records=None, + do_not_archive=False): + + super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status, + client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, + referer=referer, do_not_archive=do_not_archive) + + if type(remote_ip) is not bytes: + self.remote_ip = remote_ip.encode('ascii') + else: + self.remote_ip = remote_ip + self.content_type = content_type self.mimetype = content_type @@ -452,18 +471,12 @@ class RecordedUrl: self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] self.custom_type = custom_type - self.status = status self.size = size - self.client_ip = client_ip - self.method = method - self.timestamp = timestamp - self.host = host - self.duration = duration - self.referer = referer + self.response_recorder = response_recorder + self.custom_type = custom_type self.payload_digest = payload_digest self.truncated = truncated self.warc_records = warc_records - self.do_not_archive = do_not_archive def is_text(self): """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index b929a7f..968a90c 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -72,6 +72,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.close_prefix_reqs.put(prefix) def _process_url(self, recorded_url): + if not hasattr(recorded_url, 'response_recorder'): + return try: records = [] if self._should_archive(recorded_url): From 4ceebe1fa92166844483eab599f6bf4d91af3bdb Mon Sep 17 00:00:00 2001 From: Adam Miller Date: Sat, 4 Jan 2020 01:41:28 +0000 Subject: [PATCH 3/8] Moving more variables from RecordedUrl to RequiredUrl --- warcprox/__init__.py | 2 +- warcprox/crawl_log.py | 6 ++--- warcprox/dedup.py | 2 +- warcprox/stats.py | 4 ++-- warcprox/warcproxy.py | 50 ++++++++++++++++++++++------------------ warcprox/writerthread.py | 2 +- 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 3dd84c8..9cd09a8 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -223,7 +223,7 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.name = listener.__class__.__name__ def _process_url(self, recorded_url): - return self.listener.notify(recorded_url, recorded_url.warc_records if hasattr(recorded_url, "warc_records") else None) + return self.listener.notify(recorded_url, recorded_url.warc_records) def start(self): if hasattr(self.listener, 'start'): diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index b30dd30..eea17d5 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -42,13 +42,13 @@ class CrawlLogger(object): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() status = self.get_artificial_status(recorded_url) - extra_info = {'contentSize': recorded_url.size,} if hasattr(recorded_url, "size") and recorded_url.size > 0 else {} + extra_info = {'contentSize': recorded_url.size,} if recorded_url.size is not None and recorded_url.size > 0 else {} if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset if recorded_url.method != 'GET': extra_info['method'] = recorded_url.method - if hasattr(recorded_url, "response_recorder") and recorded_url.response_recorder: + if recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( recorded_url.payload_digest, @@ -67,7 +67,7 @@ class CrawlLogger(object): recorded_url.url, '-', # hop path recorded_url.referer or '-', - recorded_url.mimetype if hasattr(recorded_url, "mimetype") and recorded_url.mimetype is not None else '-', + recorded_url.mimetype if recorded_url.mimetype is not None else '-', '-', '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( recorded_url.timestamp, diff --git a/warcprox/dedup.py b/warcprox/dedup.py index a07091e..223427e 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -65,7 +65,7 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): self.dedup_db = dedup_db def _process_url(self, recorded_url): - if not hasattr(recorded_url, 'response_recorder'): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): return if (recorded_url.response_recorder and recorded_url.payload_digest diff --git a/warcprox/stats.py b/warcprox/stats.py index 2fe0848..3bc560e 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -162,7 +162,7 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor): def _tally_batch(self, batch): batch_buckets = {} for recorded_url in batch: - if not hasattr(recorded_url, 'response_recorder'): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): continue for bucket in self.buckets(recorded_url): bucket_stats = batch_buckets.get(bucket) @@ -299,7 +299,7 @@ class RunningStats: (self.first_snap_time - 120 + i * 10, 0, 0)) def notify(self, recorded_url, records): - if not hasattr(recorded_url, 'response_recorder'): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): return with self._lock: self.urls += 1 diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 2603759..17d0682 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -395,9 +395,12 @@ RE_MIMETYPE = re.compile(r'[;\s]') class RequestedUrl: logger = logging.getLogger("warcprox.warcproxy.RequestedUrl") - def __init__(self, url, request_data, warcprox_meta=None, status=None, - client_ip=None, method=None, timestamp=None, host=None, duration=None, - referer=None, do_not_archive=True): + def __init__(self, url, request_data, response_recorder=None, remote_ip=None, + warcprox_meta=None, content_type=None, custom_type=None, + status=None, size=None, client_ip=None, method=None, + timestamp=None, host=None, duration=None, referer=None, + payload_digest=None, truncated=None, warc_records=None, + do_not_archive=False): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -406,6 +409,7 @@ class RequestedUrl: self.url = url self.request_data = request_data + self.response_recorder = response_recorder if warcprox_meta: if 'captures-bucket' in warcprox_meta: @@ -422,13 +426,25 @@ class RequestedUrl: else: self.warcprox_meta = {} + self.content_type = content_type + + self.mimetype = content_type + if self.mimetype: + # chop off subtype, and ensure there's no whitespace + self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] + + self.custom_type = custom_type self.status = status + self.size = size self.client_ip = client_ip self.method = method self.timestamp = timestamp self.host = host self.duration = duration self.referer = referer + self.payload_digest = payload_digest + self.truncated = truncated + self.warc_records = warc_records self.do_not_archive = do_not_archive class FailedUrl(RequestedUrl): @@ -438,9 +454,9 @@ class FailedUrl(RequestedUrl): client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, do_not_archive=True, exception=None): - super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status, + super().__init__(url, request_data, response_recorder=None, warcprox_meta=warcprox_meta, content_type=None, custom_type=None, status=status, size=None, client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, - referer=referer, do_not_archive=do_not_archive) + referer=referer, payload_digest=None, truncated=None, warc_records=None, do_not_archive=do_not_archive) self.exception = exception @@ -454,30 +470,18 @@ class RecordedUrl(RequestedUrl): payload_digest=None, truncated=None, warc_records=None, do_not_archive=False): - super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status, - client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, - referer=referer, do_not_archive=do_not_archive) + super().__init__(url, request_data, response_recorder=response_recorder, + warcprox_meta=warcprox_meta, content_type=content_type, + custom_type=custom_type, status=status, size=size, client_ip=client_ip, + method=method, timestamp=timestamp, host=host, duration=duration, + referer=referer, payload_digest=payload_digest, truncated=truncated, + warc_records=warc_records, do_not_archive=do_not_archive) if type(remote_ip) is not bytes: self.remote_ip = remote_ip.encode('ascii') else: self.remote_ip = remote_ip - self.content_type = content_type - - self.mimetype = content_type - if self.mimetype: - # chop off subtype, and ensure there's no whitespace - self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] - - self.custom_type = custom_type - self.size = size - self.response_recorder = response_recorder - self.custom_type = custom_type - self.payload_digest = payload_digest - self.truncated = truncated - self.warc_records = warc_records - def is_text(self): """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types Alternative method: try to decode('ascii') first N bytes to make sure diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 968a90c..3cd6bc6 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -72,7 +72,7 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.close_prefix_reqs.put(prefix) def _process_url(self, recorded_url): - if not hasattr(recorded_url, 'response_recorder'): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): return try: records = [] From a5e9c2722317fef56f2ba476203880d031e6d9a3 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 6 May 2020 09:54:17 -0700 Subject: [PATCH 4/8] Share code, handle exception during CONNECT --- warcprox/mitmproxy.py | 51 ++++++++++++++++++++++++------------------- warcprox/warcproxy.py | 51 +++++++++++++++++-------------------------- 2 files changed, 49 insertions(+), 53 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 3bbefb9..8b61d52 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -363,7 +363,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self.send_error(500, str(e)) except Exception as f: - self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f)) + self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f), exc_info=True) return # Reload! @@ -489,6 +489,31 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.server.unregister_remote_server_sock( self._remote_server_conn.sock) + def _swallow_hop_by_hop_headers(self): + ''' + Swallow headers that don't make sense to forward on, i.e. + most hop-by-hop headers. + + http://tools.ietf.org/html/rfc2616#section-13.5. + ''' + # self.headers is an email.message.Message, which is case-insensitive + # and doesn't throw KeyError in __delitem__ + for key in ( + 'Warcprox-Meta', 'Connection', 'Proxy-Connection', 'Keep-Alive', + 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): + del self.headers[key] + + def _build_request(self): + req_str = '{} {} {}\r\n'.format( + self.command, self.path, self.request_version) + + # Add headers to the request + # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( + req_str += '\r\n'.join( + '{}: {}'.format(k,v) for (k,v) in self.headers.items()) + + req = req_str.encode('latin1') + b'\r\n\r\n' + def _inner_proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to @@ -500,29 +525,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): It may contain extra HTTP headers such as ``Warcprox-Meta`` which are written in the WARC record for this request. ''' - # Build request - req_str = '{} {} {}\r\n'.format( - self.command, self.path, self.request_version) - - # Swallow headers that don't make sense to forward on, i.e. most - # hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5. - # self.headers is an email.message.Message, which is case-insensitive - # and doesn't throw KeyError in __delitem__ - for key in ( - 'Connection', 'Proxy-Connection', 'Keep-Alive', - 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): - del self.headers[key] - + self._swallow_hop_by_hop_headers() self.headers['Via'] = via_header_value( self.headers.get('Via'), self.request_version.replace('HTTP/', '')) - - # Add headers to the request - # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( - req_str += '\r\n'.join( - '{}: {}'.format(k,v) for (k,v) in self.headers.items()) - - req = req_str.encode('latin1') + b'\r\n\r\n' + req = self._build_request() # Append message body if present to the request if 'Content-Length' in self.headers: @@ -548,7 +555,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): try: buf = prox_rec_res.read(65536) except http_client.IncompleteRead as e: - self.logger.warn('%s from %s', e, self.url) + self.logger.warning('%s from %s', e, self.url) buf = e.partial if (self._max_resource_size and diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 17d0682..c6f9f34 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -188,16 +188,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self._enforce_limits_and_blocks() return warcprox.mitmproxy.MitmProxyHandler._connect_to_remote_server(self) - def _proxy_request(self): - warcprox_meta = None + def _parse_warcprox_meta(self): + ''' + :return: Warcprox-Meta request header value as a dictionary, or None + ''' raw_warcprox_meta = self.headers.get('Warcprox-Meta') self.logger.trace( - 'request for %s Warcprox-Meta header: %s', self.url, - raw_warcprox_meta) + 'request for %s Warcprox-Meta header: %s', self.url, + raw_warcprox_meta) if raw_warcprox_meta: - warcprox_meta = json.loads(raw_warcprox_meta) - del self.headers['Warcprox-Meta'] + return json.loads(raw_warcprox_meta) + else: + return None + def _proxy_request(self): + warcprox_meta = self._parse_warcprox_meta() remote_ip = self._remote_server_conn.sock.getpeername()[0] timestamp = doublethink.utcnow() extra_response_headers = {} @@ -343,36 +348,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): except: self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise + def send_error(self, code, message=None, explain=None, exception=None): super().send_error(code, message=message, explain=explain, exception=exception) - # Build request - req_str = '{} {} {}\r\n'.format( - self.command, self.path, self.request_version) + # If error happens during CONNECT handling and before the inner request, self.url + # is unset, and self.path is something like 'example.com:443' + urlish = self.url or self.path - # Swallow headers that don't make sense to forward on, i.e. most - # hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5. - # self.headers is an email.message.Message, which is case-insensitive - # and doesn't throw KeyError in __delitem__ - for key in ( - 'Connection', 'Proxy-Connection', 'Keep-Alive', - 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): - del self.headers[key] + warcprox_meta = self._parse_warcprox_meta() + self._swallow_hop_by_hop_headers() + request_data = self._build_request() - # Add headers to the request - # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( - req_str += '\r\n'.join( - '{}: {}'.format(k,v) for (k,v) in self.headers.items()) - - warcprox_meta = None - raw_warcprox_meta = self.headers.get('Warcprox-Meta') - if raw_warcprox_meta: - warcprox_meta = json.loads(raw_warcprox_meta) - - req = req_str.encode('latin1') + b'\r\n\r\n' failed_url = FailedUrl( - url=self.url, - request_data=req, + url=urlish, + request_data=request_data, warcprox_meta=warcprox_meta, status=code, client_ip=self.client_address[0], @@ -386,7 +376,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.server.recorded_url_q.put(failed_url) - def log_message(self, fmt, *args): # logging better handled elsewhere? pass From 36711c0148b87b05b7282d1108d0f471cfe8d5c0 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 6 May 2020 14:19:19 -0700 Subject: [PATCH 5/8] try to fix .travis.yml --- .travis.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1a351d3..f0069ac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,3 @@ -sudo: required dist: xenial language: python python: @@ -65,5 +64,5 @@ after_script: notifications: slack: - secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= - secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc= + - secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= + - secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc= From d0b21f5dc44c1c2802326187aec7578715601ad9 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 6 May 2020 14:27:34 -0700 Subject: [PATCH 6/8] Undo accidentally committed code --- warcprox/mitmproxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 8b61d52..5f48ed7 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -363,7 +363,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): else: self.send_error(500, str(e)) except Exception as f: - self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f), exc_info=True) + self.logger.warning("failed to send error response ({}) to proxy client: {}".format(e, f)) return # Reload! From 5e397e9bca104aa0fdd3ece4ddd4fda67430593e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 6 May 2020 14:28:00 -0700 Subject: [PATCH 7/8] Elide unnecessary params --- warcprox/warcproxy.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index c6f9f34..a586cee 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -443,9 +443,10 @@ class FailedUrl(RequestedUrl): client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, do_not_archive=True, exception=None): - super().__init__(url, request_data, response_recorder=None, warcprox_meta=warcprox_meta, content_type=None, custom_type=None, status=status, size=None, - client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, - referer=referer, payload_digest=None, truncated=None, warc_records=None, do_not_archive=do_not_archive) + super().__init__(url, request_data, warcprox_meta=warcprox_meta, + status=status, client_ip=client_ip, method=method, + timestamp=timestamp, host=host, duration=duration, + referer=referer, do_not_archive=do_not_archive) self.exception = exception From b34419543f979d08686f7b0d598206a10b0c20be Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 6 May 2020 14:52:32 -0700 Subject: [PATCH 8/8] Oops! --- warcprox/mitmproxy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 5f48ed7..36d20db 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -514,6 +514,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): req = req_str.encode('latin1') + b'\r\n\r\n' + return req + def _inner_proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to