diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 9cd09a8..3dd84c8 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -223,7 +223,7 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.name = listener.__class__.__name__ def _process_url(self, recorded_url): - return self.listener.notify(recorded_url, recorded_url.warc_records) + return self.listener.notify(recorded_url, recorded_url.warc_records if hasattr(recorded_url, "warc_records") else None) def start(self): if hasattr(self.listener, 'start'): diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 0ba075b..b30dd30 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -25,6 +25,7 @@ import json import os import warcprox import socket +from urllib3.exceptions import TimeoutError, HTTPError class CrawlLogger(object): def __init__(self, dir_, options=warcprox.Options()): @@ -40,13 +41,14 @@ class CrawlLogger(object): def notify(self, recorded_url, records): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() - extra_info = {'contentSize': recorded_url.size,} if recorded_url.size > 0 else {} + status = self.get_artificial_status(recorded_url) + extra_info = {'contentSize': recorded_url.size,} if hasattr(recorded_url, "size") and recorded_url.size > 0 else {} if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset if recorded_url.method != 'GET': extra_info['method'] = recorded_url.method - if recorded_url.response_recorder: + if hasattr(recorded_url, "response_recorder") and recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( recorded_url.payload_digest, @@ -60,12 +62,12 @@ class CrawlLogger(object): payload_digest = '-' fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), - '% 5s' % recorded_url.status, + '% 5s' % status, '% 10s' % content_length, recorded_url.url, '-', # hop path recorded_url.referer or '-', - recorded_url.mimetype or '-', + recorded_url.mimetype if hasattr(recorded_url, "mimetype") and recorded_url.mimetype is not None else '-', '-', '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( recorded_url.timestamp, @@ -92,3 +94,9 @@ class CrawlLogger(object): with open(crawl_log_path, 'ab') as f: f.write(line) + def get_artificial_status(self, recorded_url): + if hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (socket.timeout, TimeoutError, )): + return '-2' + else: + return recorded_url.status + diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 0e09239..a07091e 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -65,6 +65,8 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): self.dedup_db = dedup_db def _process_url(self, recorded_url): + if not hasattr(recorded_url, 'response_recorder'): + return if (recorded_url.response_recorder and recorded_url.payload_digest and self.should_dedup(recorded_url)): diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index f4279de..3bbefb9 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -359,7 +359,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem handling %r: %r", self.requestline, e) if type(e) is socket.timeout: - self.send_error(-2, str(e)) + self.send_error(504, str(e), exception=e) else: self.send_error(500, str(e)) except Exception as f: @@ -425,7 +425,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): response_code = 500 cache = False if isinstance(e, (socket.timeout, TimeoutError,)): - response_code = -2 + response_code = 504 cache = True elif isinstance(e, HTTPError): response_code = 502 @@ -440,7 +440,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) - self.send_error(response_code) + self.send_error(response_code, exception=e) return try: @@ -458,13 +458,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.send_error(502) return - def send_error(self, code, message=None, explain=None): - - if code == -2: - return_code = 504 - else: - return_code = code - + def send_error(self, code, message=None, explain=None, exception=None): # BaseHTTPRequestHandler.send_response_only() in http/server.py # does this: # if not hasattr(self, '_headers_buffer'): @@ -476,7 +470,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._headers_buffer = [] try: return http_server.BaseHTTPRequestHandler.send_error( - self, return_code, message, explain) + self, code, message, explain) except Exception as e: level = logging.ERROR if isinstance(e, OSError) and e.errno == 9: diff --git a/warcprox/stats.py b/warcprox/stats.py index 1a71cad..2fe0848 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -162,6 +162,8 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor): def _tally_batch(self, batch): batch_buckets = {} for recorded_url in batch: + if not hasattr(recorded_url, 'response_recorder'): + continue for bucket in self.buckets(recorded_url): bucket_stats = batch_buckets.get(bucket) if not bucket_stats: @@ -297,6 +299,8 @@ class RunningStats: (self.first_snap_time - 120 + i * 10, 0, 0)) def notify(self, recorded_url, records): + if not hasattr(recorded_url, 'response_recorder'): + return with self._lock: self.urls += 1 if records: diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 0c780c3..2603759 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -343,8 +343,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): except: self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise - def send_error(self, code, message=None, explain=None): - super().send_error(code, message, explain) + def send_error(self, code, message=None, explain=None, exception=None): + super().send_error(code, message=message, explain=explain, exception=exception) # Build request req_str = '{} {} {}\r\n'.format( @@ -359,10 +359,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): del self.headers[key] - self.headers['Via'] = via_header_value( - self.headers.get('Via'), - self.request_version.replace('HTTP/', '')) - # Add headers to the request # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( req_str += '\r\n'.join( @@ -374,22 +370,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): warcprox_meta = json.loads(raw_warcprox_meta) req = req_str.encode('latin1') + b'\r\n\r\n' - recorded_url = RecordedUrl( + failed_url = FailedUrl( url=self.url, - remote_ip=b'', + request_data=req, warcprox_meta=warcprox_meta, status=code, client_ip=self.client_address[0], method=self.command, - content_type="unknown", - response_recorder=None, - request_data=req, - duration=None ,size=0, - timestamp=None, host=self.hostname, + timestamp=None, + host=self.hostname, + duration=None, + referer=self.headers.get('referer'), do_not_archive=True, - referer=self.headers.get('referer')) + exception=exception) - self.server.recorded_url_q.put(recorded_url) + self.server.recorded_url_q.put(failed_url) def log_message(self, fmt, *args): @@ -397,23 +392,12 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): pass RE_MIMETYPE = re.compile(r'[;\s]') -def via_header_value(orig, request_version): - via = orig - if via: - via += ', ' - else: - via = '' - via = via + '%s %s' % (request_version, 'warcprox') - return via -class RecordedUrl: - logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") - def __init__(self, url, request_data, response_recorder, remote_ip, - warcprox_meta=None, content_type=None, custom_type=None, - status=None, size=None, client_ip=None, method=None, - timestamp=None, host=None, duration=None, referer=None, - payload_digest=None, truncated=None, warc_records=None, - do_not_archive=False): +class RequestedUrl: + logger = logging.getLogger("warcprox.warcproxy.RequestedUrl") + def __init__(self, url, request_data, warcprox_meta=None, status=None, + client_ip=None, method=None, timestamp=None, host=None, duration=None, + referer=None, do_not_archive=True): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -421,13 +405,7 @@ class RecordedUrl: else: self.url = url - if type(remote_ip) is not bytes: - self.remote_ip = remote_ip.encode('ascii') - else: - self.remote_ip = remote_ip - self.request_data = request_data - self.response_recorder = response_recorder if warcprox_meta: if 'captures-bucket' in warcprox_meta: @@ -444,6 +422,47 @@ class RecordedUrl: else: self.warcprox_meta = {} + self.status = status + self.client_ip = client_ip + self.method = method + self.timestamp = timestamp + self.host = host + self.duration = duration + self.referer = referer + self.do_not_archive = do_not_archive + +class FailedUrl(RequestedUrl): + logger = logging.getLogger("warcprox.warcproxy.FailedUrl") + + def __init__(self, url, request_data, warcprox_meta=None, status=None, + client_ip=None, method=None, timestamp=None, host=None, duration=None, + referer=None, do_not_archive=True, exception=None): + + super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status, + client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, + referer=referer, do_not_archive=do_not_archive) + + self.exception = exception + +class RecordedUrl(RequestedUrl): + logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") + + def __init__(self, url, request_data, response_recorder, remote_ip, + warcprox_meta=None, content_type=None, custom_type=None, + status=None, size=None, client_ip=None, method=None, + timestamp=None, host=None, duration=None, referer=None, + payload_digest=None, truncated=None, warc_records=None, + do_not_archive=False): + + super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status, + client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration, + referer=referer, do_not_archive=do_not_archive) + + if type(remote_ip) is not bytes: + self.remote_ip = remote_ip.encode('ascii') + else: + self.remote_ip = remote_ip + self.content_type = content_type self.mimetype = content_type @@ -452,18 +471,12 @@ class RecordedUrl: self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] self.custom_type = custom_type - self.status = status self.size = size - self.client_ip = client_ip - self.method = method - self.timestamp = timestamp - self.host = host - self.duration = duration - self.referer = referer + self.response_recorder = response_recorder + self.custom_type = custom_type self.payload_digest = payload_digest self.truncated = truncated self.warc_records = warc_records - self.do_not_archive = do_not_archive def is_text(self): """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index b929a7f..968a90c 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -72,6 +72,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.close_prefix_reqs.put(prefix) def _process_url(self, recorded_url): + if not hasattr(recorded_url, 'response_recorder'): + return try: records = [] if self._should_archive(recorded_url):