Merge pull request #145 from internetarchive/adds-logging-for-failed-connections

Adds logging for failed connections
This commit is contained in:
jkafader 2020-09-23 12:22:12 -07:00 committed by GitHub
commit f19ead0058
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 164 additions and 48 deletions

View File

@ -61,5 +61,5 @@ after_script:
notifications: notifications:
slack: slack:
secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= - secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo=
secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc= - secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc=

View File

@ -25,6 +25,7 @@ import json
import os import os
import warcprox import warcprox
import socket import socket
from urllib3.exceptions import TimeoutError, HTTPError, NewConnectionError, MaxRetryError
class CrawlLogger(object): class CrawlLogger(object):
def __init__(self, dir_, options=warcprox.Options()): def __init__(self, dir_, options=warcprox.Options()):
@ -40,7 +41,12 @@ class CrawlLogger(object):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
# 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"}
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
extra_info = {'contentSize': recorded_url.size,} status = self.get_artificial_status(recorded_url)
extra_info = {'contentSize': recorded_url.size,} if recorded_url.size is not None and recorded_url.size > 0 else {}
if hasattr(recorded_url, 'exception') and recorded_url.exception is not None:
extra_info['exception'] = str(recorded_url.exception)
if(hasattr(recorded_url, 'message') and recorded_url.message is not None):
extra_info['exceptionMessage'] = str(recorded_url.message)
if records: if records:
extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFilename'] = records[0].warc_filename
extra_info['warcFileOffset'] = records[0].offset extra_info['warcFileOffset'] = records[0].offset
@ -51,23 +57,26 @@ class CrawlLogger(object):
payload_digest = warcprox.digest_str( payload_digest = warcprox.digest_str(
recorded_url.payload_digest, recorded_url.payload_digest,
self.options.base32) self.options.base32)
else: elif records is not None and len(records) > 0:
# WARCPROX_WRITE_RECORD request # WARCPROX_WRITE_RECORD request
content_length = int(records[0].get_header(b'Content-Length')) content_length = int(records[0].get_header(b'Content-Length'))
payload_digest = records[0].get_header(b'WARC-Payload-Digest') payload_digest = records[0].get_header(b'WARC-Payload-Digest')
else:
content_length = 0
payload_digest = '-'
fields = [ fields = [
'{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'% 5s' % recorded_url.status, '% 5s' % status,
'% 10s' % content_length, '% 10s' % content_length,
recorded_url.url, recorded_url.url,
'-', # hop path '-', # hop path
recorded_url.referer or '-', recorded_url.referer or '-',
recorded_url.mimetype or '-', recorded_url.mimetype if recorded_url.mimetype is not None else '-',
'-', '-',
'{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format(
recorded_url.timestamp, recorded_url.timestamp,
recorded_url.timestamp.microsecond//1000, recorded_url.timestamp.microsecond//1000,
recorded_url.duration.microseconds//1000), recorded_url.duration.microseconds//1000) if (recorded_url.timestamp is not None and recorded_url.duration is not None) else '-',
payload_digest, payload_digest,
recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'), recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'),
'duplicate:digest' if records and records[0].type == b'revisit' else '-', 'duplicate:digest' if records and records[0].type == b'revisit' else '-',
@ -89,3 +98,29 @@ class CrawlLogger(object):
with open(crawl_log_path, 'ab') as f: with open(crawl_log_path, 'ab') as f:
f.write(line) f.write(line)
def get_artificial_status(self, recorded_url):
# urllib3 Does not specify DNS errors. We must parse them from the exception string.
# Unfortunately, the errors are reported differently on different systems.
# https://stackoverflow.com/questions/40145631
if hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (MaxRetryError, )):
return '-8'
elif hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (NewConnectionError, )):
exception_string=str(recorded_url.exception)
if ("[Errno 11001] getaddrinfo failed" in exception_string or # Windows
"[Errno -2] Name or service not known" in exception_string or # Linux
"[Errno -3] Temporary failure in name resolution" in exception_string or # Linux
"[Errno 8] nodename nor servname " in exception_string): # OS X
return '-6' # DNS Failure
else:
return '-2' # Other Connection Failure
elif hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (socket.timeout, TimeoutError, )):
return '-2' # Connection Timeout
elif isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
# synthetic status, used when some other status (such as connection-lost)
# is considered by policy the same as a document-not-found
# Cached failures result in FailedUrl with no Exception
return '-404'
else:
return recorded_url.status

View File

@ -68,6 +68,8 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
self.dedup_db = dedup_db self.dedup_db = dedup_db
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
return
if (recorded_url.response_recorder if (recorded_url.response_recorder
and recorded_url.payload_digest and recorded_url.payload_digest
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):

View File

@ -78,7 +78,7 @@ import collections
import cProfile import cProfile
from urllib3 import PoolManager from urllib3 import PoolManager
from urllib3.util import is_connection_dropped from urllib3.util import is_connection_dropped
from urllib3.exceptions import TimeoutError, HTTPError from urllib3.exceptions import TimeoutError, HTTPError, NewConnectionError
import doublethink import doublethink
from cachetools import TTLCache from cachetools import TTLCache
from threading import RLock from threading import RLock
@ -359,7 +359,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error( self.logger.error(
"problem handling %r: %r", self.requestline, e) "problem handling %r: %r", self.requestline, e)
if type(e) is socket.timeout: if type(e) is socket.timeout:
self.send_error(504, str(e)) self.send_error(504, str(e), exception=e)
else: else:
self.send_error(500, str(e)) self.send_error(500, str(e))
except Exception as f: except Exception as f:
@ -407,7 +407,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
cached = self.server.bad_hostnames_ports.get(hostname_port) cached = self.server.bad_hostnames_ports.get(hostname_port)
if cached: if cached:
self.logger.info('Cannot connect to %s (cache)', hostname_port) self.logger.info('Cannot connect to %s (cache)', hostname_port)
self.send_error(cached) self.send_error(cached, exception=Exception('Cached Failed Connection'))
return return
# Connect to destination # Connect to destination
self._connect_to_remote_server() self._connect_to_remote_server()
@ -440,7 +440,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error( self.logger.error(
"problem processing request %r: %r", "problem processing request %r: %r",
self.requestline, e, exc_info=True) self.requestline, e, exc_info=True)
self.send_error(response_code) self.send_error(response_code, exception=e)
return return
try: try:
@ -458,7 +458,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.send_error(502) self.send_error(502)
return return
def send_error(self, code, message=None, explain=None): def send_error(self, code, message=None, explain=None, exception=None):
# BaseHTTPRequestHandler.send_response_only() in http/server.py # BaseHTTPRequestHandler.send_response_only() in http/server.py
# does this: # does this:
# if not hasattr(self, '_headers_buffer'): # if not hasattr(self, '_headers_buffer'):
@ -489,6 +489,33 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.server.unregister_remote_server_sock( self.server.unregister_remote_server_sock(
self._remote_server_conn.sock) self._remote_server_conn.sock)
def _swallow_hop_by_hop_headers(self):
'''
Swallow headers that don't make sense to forward on, i.e.
most hop-by-hop headers.
http://tools.ietf.org/html/rfc2616#section-13.5.
'''
# self.headers is an email.message.Message, which is case-insensitive
# and doesn't throw KeyError in __delitem__
for key in (
'Warcprox-Meta', 'Connection', 'Proxy-Connection', 'Keep-Alive',
'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'):
del self.headers[key]
def _build_request(self):
req_str = '{} {} {}\r\n'.format(
self.command, self.path, self.request_version)
# Add headers to the request
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join(
'{}: {}'.format(k,v) for (k,v) in self.headers.items())
req = req_str.encode('latin1') + b'\r\n\r\n'
return req
def _inner_proxy_request(self, extra_response_headers={}): def _inner_proxy_request(self, extra_response_headers={}):
''' '''
Sends the request to the remote server, then uses a ProxyingRecorder to Sends the request to the remote server, then uses a ProxyingRecorder to
@ -500,29 +527,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
It may contain extra HTTP headers such as ``Warcprox-Meta`` which It may contain extra HTTP headers such as ``Warcprox-Meta`` which
are written in the WARC record for this request. are written in the WARC record for this request.
''' '''
# Build request self._swallow_hop_by_hop_headers()
req_str = '{} {} {}\r\n'.format(
self.command, self.path, self.request_version)
# Swallow headers that don't make sense to forward on, i.e. most
# hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5.
# self.headers is an email.message.Message, which is case-insensitive
# and doesn't throw KeyError in __delitem__
for key in (
'Connection', 'Proxy-Connection', 'Keep-Alive',
'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'):
del self.headers[key]
self.headers['Via'] = via_header_value( self.headers['Via'] = via_header_value(
self.headers.get('Via'), self.headers.get('Via'),
self.request_version.replace('HTTP/', '')) self.request_version.replace('HTTP/', ''))
req = self._build_request()
# Add headers to the request
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join(
'{}: {}'.format(k,v) for (k,v) in self.headers.items())
req = req_str.encode('latin1') + b'\r\n\r\n'
# Append message body if present to the request # Append message body if present to the request
if 'Content-Length' in self.headers: if 'Content-Length' in self.headers:
@ -548,7 +557,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
try: try:
buf = prox_rec_res.read(65536) buf = prox_rec_res.read(65536)
except http_client.IncompleteRead as e: except http_client.IncompleteRead as e:
self.logger.warn('%s from %s', e, self.url) self.logger.warning('%s from %s', e, self.url)
buf = e.partial buf = e.partial
if (self._max_resource_size and if (self._max_resource_size and

View File

@ -162,6 +162,8 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
def _tally_batch(self, batch): def _tally_batch(self, batch):
batch_buckets = {} batch_buckets = {}
for recorded_url in batch: for recorded_url in batch:
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
continue
for bucket in self.buckets(recorded_url): for bucket in self.buckets(recorded_url):
bucket_stats = batch_buckets.get(bucket) bucket_stats = batch_buckets.get(bucket)
if not bucket_stats: if not bucket_stats:
@ -297,6 +299,8 @@ class RunningStats:
(self.first_snap_time - 120 + i * 10, 0, 0)) (self.first_snap_time - 120 + i * 10, 0, 0))
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
return
with self._lock: with self._lock:
self.urls += 1 self.urls += 1
if records: if records:

View File

@ -188,16 +188,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self._enforce_limits_and_blocks() self._enforce_limits_and_blocks()
return warcprox.mitmproxy.MitmProxyHandler._connect_to_remote_server(self) return warcprox.mitmproxy.MitmProxyHandler._connect_to_remote_server(self)
def _proxy_request(self): def _parse_warcprox_meta(self):
warcprox_meta = None '''
:return: Warcprox-Meta request header value as a dictionary, or None
'''
raw_warcprox_meta = self.headers.get('Warcprox-Meta') raw_warcprox_meta = self.headers.get('Warcprox-Meta')
self.logger.trace( self.logger.trace(
'request for %s Warcprox-Meta header: %s', self.url, 'request for %s Warcprox-Meta header: %s', self.url,
raw_warcprox_meta) raw_warcprox_meta)
if raw_warcprox_meta: if raw_warcprox_meta:
warcprox_meta = json.loads(raw_warcprox_meta) return json.loads(raw_warcprox_meta)
del self.headers['Warcprox-Meta'] else:
return None
def _proxy_request(self):
warcprox_meta = self._parse_warcprox_meta()
remote_ip = self._remote_server_conn.sock.getpeername()[0] remote_ip = self._remote_server_conn.sock.getpeername()[0]
timestamp = doublethink.utcnow() timestamp = doublethink.utcnow()
extra_response_headers = {} extra_response_headers = {}
@ -344,16 +349,43 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
raise raise
def send_error(self, code, message=None, explain=None, exception=None):
super().send_error(code, message=message, explain=explain, exception=exception)
# If error happens during CONNECT handling and before the inner request, self.url
# is unset, and self.path is something like 'example.com:443'
urlish = self.url or self.path
warcprox_meta = self._parse_warcprox_meta()
self._swallow_hop_by_hop_headers()
request_data = self._build_request()
failed_url = FailedUrl(
url=urlish,
request_data=request_data,
warcprox_meta=warcprox_meta,
status=code,
client_ip=self.client_address[0],
method=self.command,
timestamp=None,
host=self.hostname,
duration=None,
referer=self.headers.get('referer'),
do_not_archive=True,
message=message,
exception=exception)
self.server.recorded_url_q.put(failed_url)
def log_message(self, fmt, *args): def log_message(self, fmt, *args):
# logging better handled elsewhere? # logging better handled elsewhere?
pass pass
RE_MIMETYPE = re.compile(r'[;\s]') RE_MIMETYPE = re.compile(r'[;\s]')
class RecordedUrl: class RequestedUrl:
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") logger = logging.getLogger("warcprox.warcproxy.RequestedUrl")
def __init__(self, url, request_data, response_recorder=None, remote_ip=None,
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None, warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None, status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None, timestamp=None, host=None, duration=None, referer=None,
@ -366,11 +398,6 @@ class RecordedUrl:
else: else:
self.url = url self.url = url
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
self.request_data = request_data self.request_data = request_data
self.response_recorder = response_recorder self.response_recorder = response_recorder
@ -410,6 +437,43 @@ class RecordedUrl:
self.warc_records = warc_records self.warc_records = warc_records
self.do_not_archive = do_not_archive self.do_not_archive = do_not_archive
class FailedUrl(RequestedUrl):
logger = logging.getLogger("warcprox.warcproxy.FailedUrl")
def __init__(self, url, request_data, warcprox_meta=None, status=None,
client_ip=None, method=None, timestamp=None, host=None, duration=None,
referer=None, do_not_archive=True, message=None, exception=None):
super().__init__(url, request_data, warcprox_meta=warcprox_meta,
status=status, client_ip=client_ip, method=method,
timestamp=timestamp, host=host, duration=duration,
referer=referer, do_not_archive=do_not_archive)
self.message = message
self.exception = exception
class RecordedUrl(RequestedUrl):
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None,
payload_digest=None, truncated=None, warc_records=None,
do_not_archive=False):
super().__init__(url, request_data, response_recorder=response_recorder,
warcprox_meta=warcprox_meta, content_type=content_type,
custom_type=custom_type, status=status, size=size, client_ip=client_ip,
method=method, timestamp=timestamp, host=host, duration=duration,
referer=referer, payload_digest=payload_digest, truncated=truncated,
warc_records=warc_records, do_not_archive=do_not_archive)
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
def is_text(self): def is_text(self):
"""Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types
Alternative method: try to decode('ascii') first N bytes to make sure Alternative method: try to decode('ascii') first N bytes to make sure

View File

@ -72,6 +72,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
self.close_prefix_reqs.put(prefix) self.close_prefix_reqs.put(prefix)
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
return
try: try:
records = [] records = []
if self._should_archive(recorded_url): if self._should_archive(recorded_url):