diff --git a/.travis.yml b/.travis.yml index 10d902e..4643ef3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,3 @@ -sudo: required dist: xenial language: python python: @@ -60,5 +59,5 @@ after_script: notifications: slack: - secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= - secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc= + - secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= + - secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc= diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 2f7ea5e..eea17d5 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -25,6 +25,7 @@ import json import os import warcprox import socket +from urllib3.exceptions import TimeoutError, HTTPError class CrawlLogger(object): def __init__(self, dir_, options=warcprox.Options()): @@ -40,7 +41,8 @@ class CrawlLogger(object): def notify(self, recorded_url, records): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() - extra_info = {'contentSize': recorded_url.size,} + status = self.get_artificial_status(recorded_url) + extra_info = {'contentSize': recorded_url.size,} if recorded_url.size is not None and recorded_url.size > 0 else {} if records: extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFileOffset'] = records[0].offset @@ -51,23 +53,26 @@ class CrawlLogger(object): payload_digest = warcprox.digest_str( recorded_url.payload_digest, self.options.base32) - else: + elif records is not None and len(records) > 0: # WARCPROX_WRITE_RECORD request content_length = int(records[0].get_header(b'Content-Length')) payload_digest = records[0].get_header(b'WARC-Payload-Digest') + else: + content_length = 0 + payload_digest = '-' fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), - '% 5s' % recorded_url.status, + '% 5s' % status, '% 10s' % content_length, recorded_url.url, '-', # hop path recorded_url.referer or '-', - recorded_url.mimetype or '-', + recorded_url.mimetype if recorded_url.mimetype is not None else '-', '-', '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( recorded_url.timestamp, recorded_url.timestamp.microsecond//1000, - recorded_url.duration.microseconds//1000), + recorded_url.duration.microseconds//1000) if (recorded_url.timestamp is not None and recorded_url.duration is not None) else '-', payload_digest, recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'), 'duplicate:digest' if records and records[0].type == b'revisit' else '-', @@ -89,3 +94,9 @@ class CrawlLogger(object): with open(crawl_log_path, 'ab') as f: f.write(line) + def get_artificial_status(self, recorded_url): + if hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (socket.timeout, TimeoutError, )): + return '-2' + else: + return recorded_url.status + diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 0181019..f1644ec 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -64,6 +64,8 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): self.dedup_db = dedup_db def _process_url(self, recorded_url): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): + return if (recorded_url.response_recorder and recorded_url.payload_digest and self.should_dedup(recorded_url)): diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 8f86cd0..78d0ec3 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -359,7 +359,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem handling %r: %r", self.requestline, e) if type(e) is socket.timeout: - self.send_error(504, str(e)) + self.send_error(504, str(e), exception=e) else: self.send_error(500, str(e)) except Exception as f: @@ -440,7 +440,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error( "problem processing request %r: %r", self.requestline, e, exc_info=True) - self.send_error(response_code) + self.send_error(response_code, exception=e) return try: @@ -458,7 +458,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.send_error(502) return - def send_error(self, code, message=None, explain=None): + def send_error(self, code, message=None, explain=None, exception=None): # BaseHTTPRequestHandler.send_response_only() in http/server.py # does this: # if not hasattr(self, '_headers_buffer'): @@ -489,6 +489,33 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.server.unregister_remote_server_sock( self._remote_server_conn.sock) + def _swallow_hop_by_hop_headers(self): + ''' + Swallow headers that don't make sense to forward on, i.e. + most hop-by-hop headers. + + http://tools.ietf.org/html/rfc2616#section-13.5. + ''' + # self.headers is an email.message.Message, which is case-insensitive + # and doesn't throw KeyError in __delitem__ + for key in ( + 'Warcprox-Meta', 'Connection', 'Proxy-Connection', 'Keep-Alive', + 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): + del self.headers[key] + + def _build_request(self): + req_str = '{} {} {}\r\n'.format( + self.command, self.path, self.request_version) + + # Add headers to the request + # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( + req_str += '\r\n'.join( + '{}: {}'.format(k,v) for (k,v) in self.headers.items()) + + req = req_str.encode('latin1') + b'\r\n\r\n' + + return req + def _inner_proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to @@ -500,29 +527,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): It may contain extra HTTP headers such as ``Warcprox-Meta`` which are written in the WARC record for this request. ''' - # Build request - req_str = '{} {} {}\r\n'.format( - self.command, self.path, self.request_version) - - # Swallow headers that don't make sense to forward on, i.e. most - # hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5. - # self.headers is an email.message.Message, which is case-insensitive - # and doesn't throw KeyError in __delitem__ - for key in ( - 'Connection', 'Proxy-Connection', 'Keep-Alive', - 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): - del self.headers[key] - + self._swallow_hop_by_hop_headers() self.headers['Via'] = via_header_value( self.headers.get('Via'), self.request_version.replace('HTTP/', '')) - - # Add headers to the request - # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( - req_str += '\r\n'.join( - '{}: {}'.format(k,v) for (k,v) in self.headers.items()) - - req = req_str.encode('latin1') + b'\r\n\r\n' + req = self._build_request() # Append message body if present to the request if 'Content-Length' in self.headers: @@ -548,7 +557,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): try: buf = prox_rec_res.read(65536) except http_client.IncompleteRead as e: - self.logger.warn('%s from %s', e, self.url) + self.logger.warning('%s from %s', e, self.url) buf = e.partial if (self._max_resource_size and diff --git a/warcprox/stats.py b/warcprox/stats.py index 1a71cad..3bc560e 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -162,6 +162,8 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor): def _tally_batch(self, batch): batch_buckets = {} for recorded_url in batch: + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): + continue for bucket in self.buckets(recorded_url): bucket_stats = batch_buckets.get(bucket) if not bucket_stats: @@ -297,6 +299,8 @@ class RunningStats: (self.first_snap_time - 120 + i * 10, 0, 0)) def notify(self, recorded_url, records): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): + return with self._lock: self.urls += 1 if records: diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 9d23244..a586cee 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -188,16 +188,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self._enforce_limits_and_blocks() return warcprox.mitmproxy.MitmProxyHandler._connect_to_remote_server(self) - def _proxy_request(self): - warcprox_meta = None + def _parse_warcprox_meta(self): + ''' + :return: Warcprox-Meta request header value as a dictionary, or None + ''' raw_warcprox_meta = self.headers.get('Warcprox-Meta') self.logger.trace( - 'request for %s Warcprox-Meta header: %s', self.url, - raw_warcprox_meta) + 'request for %s Warcprox-Meta header: %s', self.url, + raw_warcprox_meta) if raw_warcprox_meta: - warcprox_meta = json.loads(raw_warcprox_meta) - del self.headers['Warcprox-Meta'] + return json.loads(raw_warcprox_meta) + else: + return None + def _proxy_request(self): + warcprox_meta = self._parse_warcprox_meta() remote_ip = self._remote_server_conn.sock.getpeername()[0] timestamp = doublethink.utcnow() extra_response_headers = {} @@ -344,16 +349,42 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise + def send_error(self, code, message=None, explain=None, exception=None): + super().send_error(code, message=message, explain=explain, exception=exception) + + # If error happens during CONNECT handling and before the inner request, self.url + # is unset, and self.path is something like 'example.com:443' + urlish = self.url or self.path + + warcprox_meta = self._parse_warcprox_meta() + self._swallow_hop_by_hop_headers() + request_data = self._build_request() + + failed_url = FailedUrl( + url=urlish, + request_data=request_data, + warcprox_meta=warcprox_meta, + status=code, + client_ip=self.client_address[0], + method=self.command, + timestamp=None, + host=self.hostname, + duration=None, + referer=self.headers.get('referer'), + do_not_archive=True, + exception=exception) + + self.server.recorded_url_q.put(failed_url) + def log_message(self, fmt, *args): # logging better handled elsewhere? pass RE_MIMETYPE = re.compile(r'[;\s]') -class RecordedUrl: - logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") - - def __init__(self, url, request_data, response_recorder, remote_ip, +class RequestedUrl: + logger = logging.getLogger("warcprox.warcproxy.RequestedUrl") + def __init__(self, url, request_data, response_recorder=None, remote_ip=None, warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, @@ -366,11 +397,6 @@ class RecordedUrl: else: self.url = url - if type(remote_ip) is not bytes: - self.remote_ip = remote_ip.encode('ascii') - else: - self.remote_ip = remote_ip - self.request_data = request_data self.response_recorder = response_recorder @@ -410,6 +436,42 @@ class RecordedUrl: self.warc_records = warc_records self.do_not_archive = do_not_archive +class FailedUrl(RequestedUrl): + logger = logging.getLogger("warcprox.warcproxy.FailedUrl") + + def __init__(self, url, request_data, warcprox_meta=None, status=None, + client_ip=None, method=None, timestamp=None, host=None, duration=None, + referer=None, do_not_archive=True, exception=None): + + super().__init__(url, request_data, warcprox_meta=warcprox_meta, + status=status, client_ip=client_ip, method=method, + timestamp=timestamp, host=host, duration=duration, + referer=referer, do_not_archive=do_not_archive) + + self.exception = exception + +class RecordedUrl(RequestedUrl): + logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") + + def __init__(self, url, request_data, response_recorder, remote_ip, + warcprox_meta=None, content_type=None, custom_type=None, + status=None, size=None, client_ip=None, method=None, + timestamp=None, host=None, duration=None, referer=None, + payload_digest=None, truncated=None, warc_records=None, + do_not_archive=False): + + super().__init__(url, request_data, response_recorder=response_recorder, + warcprox_meta=warcprox_meta, content_type=content_type, + custom_type=custom_type, status=status, size=size, client_ip=client_ip, + method=method, timestamp=timestamp, host=host, duration=duration, + referer=referer, payload_digest=payload_digest, truncated=truncated, + warc_records=warc_records, do_not_archive=do_not_archive) + + if type(remote_ip) is not bytes: + self.remote_ip = remote_ip.encode('ascii') + else: + self.remote_ip = remote_ip + def is_text(self): """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types Alternative method: try to decode('ascii') first N bytes to make sure diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index b929a7f..3cd6bc6 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -72,6 +72,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): self.close_prefix_reqs.put(prefix) def _process_url(self, recorded_url): + if isinstance(recorded_url, warcprox.warcproxy.FailedUrl): + return try: records = [] if self._should_archive(recorded_url):