Refactor failed requests into new class.

This commit is contained in:
Adam Miller 2020-01-03 20:43:47 +00:00
parent f9c9443d2f
commit e88a88f247
7 changed files with 84 additions and 61 deletions

View File

@ -223,7 +223,7 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor):
self.name = listener.__class__.__name__ self.name = listener.__class__.__name__
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
return self.listener.notify(recorded_url, recorded_url.warc_records) return self.listener.notify(recorded_url, recorded_url.warc_records if hasattr(recorded_url, "warc_records") else None)
def start(self): def start(self):
if hasattr(self.listener, 'start'): if hasattr(self.listener, 'start'):

View File

@ -25,6 +25,7 @@ import json
import os import os
import warcprox import warcprox
import socket import socket
from urllib3.exceptions import TimeoutError, HTTPError
class CrawlLogger(object): class CrawlLogger(object):
def __init__(self, dir_, options=warcprox.Options()): def __init__(self, dir_, options=warcprox.Options()):
@ -40,13 +41,14 @@ class CrawlLogger(object):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
# 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"}
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
extra_info = {'contentSize': recorded_url.size,} if recorded_url.size > 0 else {} status = self.get_artificial_status(recorded_url)
extra_info = {'contentSize': recorded_url.size,} if hasattr(recorded_url, "size") and recorded_url.size > 0 else {}
if records: if records:
extra_info['warcFilename'] = records[0].warc_filename extra_info['warcFilename'] = records[0].warc_filename
extra_info['warcFileOffset'] = records[0].offset extra_info['warcFileOffset'] = records[0].offset
if recorded_url.method != 'GET': if recorded_url.method != 'GET':
extra_info['method'] = recorded_url.method extra_info['method'] = recorded_url.method
if recorded_url.response_recorder: if hasattr(recorded_url, "response_recorder") and recorded_url.response_recorder:
content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset
payload_digest = warcprox.digest_str( payload_digest = warcprox.digest_str(
recorded_url.payload_digest, recorded_url.payload_digest,
@ -60,12 +62,12 @@ class CrawlLogger(object):
payload_digest = '-' payload_digest = '-'
fields = [ fields = [
'{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'% 5s' % recorded_url.status, '% 5s' % status,
'% 10s' % content_length, '% 10s' % content_length,
recorded_url.url, recorded_url.url,
'-', # hop path '-', # hop path
recorded_url.referer or '-', recorded_url.referer or '-',
recorded_url.mimetype or '-', recorded_url.mimetype if hasattr(recorded_url, "mimetype") and recorded_url.mimetype is not None else '-',
'-', '-',
'{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format( '{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format(
recorded_url.timestamp, recorded_url.timestamp,
@ -92,3 +94,9 @@ class CrawlLogger(object):
with open(crawl_log_path, 'ab') as f: with open(crawl_log_path, 'ab') as f:
f.write(line) f.write(line)
def get_artificial_status(self, recorded_url):
if hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (socket.timeout, TimeoutError, )):
return '-2'
else:
return recorded_url.status

View File

@ -65,6 +65,8 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
self.dedup_db = dedup_db self.dedup_db = dedup_db
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
if not hasattr(recorded_url, 'response_recorder'):
return
if (recorded_url.response_recorder if (recorded_url.response_recorder
and recorded_url.payload_digest and recorded_url.payload_digest
and self.should_dedup(recorded_url)): and self.should_dedup(recorded_url)):

View File

@ -359,7 +359,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error( self.logger.error(
"problem handling %r: %r", self.requestline, e) "problem handling %r: %r", self.requestline, e)
if type(e) is socket.timeout: if type(e) is socket.timeout:
self.send_error(-2, str(e)) self.send_error(504, str(e), exception=e)
else: else:
self.send_error(500, str(e)) self.send_error(500, str(e))
except Exception as f: except Exception as f:
@ -425,7 +425,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
response_code = 500 response_code = 500
cache = False cache = False
if isinstance(e, (socket.timeout, TimeoutError,)): if isinstance(e, (socket.timeout, TimeoutError,)):
response_code = -2 response_code = 504
cache = True cache = True
elif isinstance(e, HTTPError): elif isinstance(e, HTTPError):
response_code = 502 response_code = 502
@ -440,7 +440,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error( self.logger.error(
"problem processing request %r: %r", "problem processing request %r: %r",
self.requestline, e, exc_info=True) self.requestline, e, exc_info=True)
self.send_error(response_code) self.send_error(response_code, exception=e)
return return
try: try:
@ -458,13 +458,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.send_error(502) self.send_error(502)
return return
def send_error(self, code, message=None, explain=None): def send_error(self, code, message=None, explain=None, exception=None):
if code == -2:
return_code = 504
else:
return_code = code
# BaseHTTPRequestHandler.send_response_only() in http/server.py # BaseHTTPRequestHandler.send_response_only() in http/server.py
# does this: # does this:
# if not hasattr(self, '_headers_buffer'): # if not hasattr(self, '_headers_buffer'):
@ -476,7 +470,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._headers_buffer = [] self._headers_buffer = []
try: try:
return http_server.BaseHTTPRequestHandler.send_error( return http_server.BaseHTTPRequestHandler.send_error(
self, return_code, message, explain) self, code, message, explain)
except Exception as e: except Exception as e:
level = logging.ERROR level = logging.ERROR
if isinstance(e, OSError) and e.errno == 9: if isinstance(e, OSError) and e.errno == 9:

View File

@ -162,6 +162,8 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
def _tally_batch(self, batch): def _tally_batch(self, batch):
batch_buckets = {} batch_buckets = {}
for recorded_url in batch: for recorded_url in batch:
if not hasattr(recorded_url, 'response_recorder'):
continue
for bucket in self.buckets(recorded_url): for bucket in self.buckets(recorded_url):
bucket_stats = batch_buckets.get(bucket) bucket_stats = batch_buckets.get(bucket)
if not bucket_stats: if not bucket_stats:
@ -297,6 +299,8 @@ class RunningStats:
(self.first_snap_time - 120 + i * 10, 0, 0)) (self.first_snap_time - 120 + i * 10, 0, 0))
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if not hasattr(recorded_url, 'response_recorder'):
return
with self._lock: with self._lock:
self.urls += 1 self.urls += 1
if records: if records:

View File

@ -343,8 +343,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
except: except:
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
raise raise
def send_error(self, code, message=None, explain=None): def send_error(self, code, message=None, explain=None, exception=None):
super().send_error(code, message, explain) super().send_error(code, message=message, explain=explain, exception=exception)
# Build request # Build request
req_str = '{} {} {}\r\n'.format( req_str = '{} {} {}\r\n'.format(
@ -359,10 +359,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'): 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'):
del self.headers[key] del self.headers[key]
self.headers['Via'] = via_header_value(
self.headers.get('Via'),
self.request_version.replace('HTTP/', ''))
# Add headers to the request # Add headers to the request
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :( # XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join( req_str += '\r\n'.join(
@ -374,22 +370,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
warcprox_meta = json.loads(raw_warcprox_meta) warcprox_meta = json.loads(raw_warcprox_meta)
req = req_str.encode('latin1') + b'\r\n\r\n' req = req_str.encode('latin1') + b'\r\n\r\n'
recorded_url = RecordedUrl( failed_url = FailedUrl(
url=self.url, url=self.url,
remote_ip=b'', request_data=req,
warcprox_meta=warcprox_meta, warcprox_meta=warcprox_meta,
status=code, status=code,
client_ip=self.client_address[0], client_ip=self.client_address[0],
method=self.command, method=self.command,
content_type="unknown", timestamp=None,
response_recorder=None, host=self.hostname,
request_data=req, duration=None,
duration=None ,size=0, referer=self.headers.get('referer'),
timestamp=None, host=self.hostname,
do_not_archive=True, do_not_archive=True,
referer=self.headers.get('referer')) exception=exception)
self.server.recorded_url_q.put(recorded_url) self.server.recorded_url_q.put(failed_url)
def log_message(self, fmt, *args): def log_message(self, fmt, *args):
@ -397,23 +392,12 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
pass pass
RE_MIMETYPE = re.compile(r'[;\s]') RE_MIMETYPE = re.compile(r'[;\s]')
def via_header_value(orig, request_version):
via = orig
if via:
via += ', '
else:
via = ''
via = via + '%s %s' % (request_version, 'warcprox')
return via
class RecordedUrl:
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
def __init__(self, url, request_data, response_recorder, remote_ip, class RequestedUrl:
warcprox_meta=None, content_type=None, custom_type=None, logger = logging.getLogger("warcprox.warcproxy.RequestedUrl")
status=None, size=None, client_ip=None, method=None, def __init__(self, url, request_data, warcprox_meta=None, status=None,
timestamp=None, host=None, duration=None, referer=None, client_ip=None, method=None, timestamp=None, host=None, duration=None,
payload_digest=None, truncated=None, warc_records=None, referer=None, do_not_archive=True):
do_not_archive=False):
# XXX should test what happens with non-ascii url (when does # XXX should test what happens with non-ascii url (when does
# url-encoding happen?) # url-encoding happen?)
if type(url) is not bytes: if type(url) is not bytes:
@ -421,13 +405,7 @@ class RecordedUrl:
else: else:
self.url = url self.url = url
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
self.request_data = request_data self.request_data = request_data
self.response_recorder = response_recorder
if warcprox_meta: if warcprox_meta:
if 'captures-bucket' in warcprox_meta: if 'captures-bucket' in warcprox_meta:
@ -444,6 +422,47 @@ class RecordedUrl:
else: else:
self.warcprox_meta = {} self.warcprox_meta = {}
self.status = status
self.client_ip = client_ip
self.method = method
self.timestamp = timestamp
self.host = host
self.duration = duration
self.referer = referer
self.do_not_archive = do_not_archive
class FailedUrl(RequestedUrl):
logger = logging.getLogger("warcprox.warcproxy.FailedUrl")
def __init__(self, url, request_data, warcprox_meta=None, status=None,
client_ip=None, method=None, timestamp=None, host=None, duration=None,
referer=None, do_not_archive=True, exception=None):
super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status,
client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration,
referer=referer, do_not_archive=do_not_archive)
self.exception = exception
class RecordedUrl(RequestedUrl):
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None,
payload_digest=None, truncated=None, warc_records=None,
do_not_archive=False):
super().__init__(url, request_data, warcprox_meta=warcprox_meta, status=status,
client_ip=client_ip, method=method, timestamp=timestamp, host=host, duration=duration,
referer=referer, do_not_archive=do_not_archive)
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
self.content_type = content_type self.content_type = content_type
self.mimetype = content_type self.mimetype = content_type
@ -452,18 +471,12 @@ class RecordedUrl:
self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0] self.mimetype = RE_MIMETYPE.split(self.mimetype, 2)[0]
self.custom_type = custom_type self.custom_type = custom_type
self.status = status
self.size = size self.size = size
self.client_ip = client_ip self.response_recorder = response_recorder
self.method = method self.custom_type = custom_type
self.timestamp = timestamp
self.host = host
self.duration = duration
self.referer = referer
self.payload_digest = payload_digest self.payload_digest = payload_digest
self.truncated = truncated self.truncated = truncated
self.warc_records = warc_records self.warc_records = warc_records
self.do_not_archive = do_not_archive
def is_text(self): def is_text(self):
"""Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types """Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types

View File

@ -72,6 +72,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
self.close_prefix_reqs.put(prefix) self.close_prefix_reqs.put(prefix)
def _process_url(self, recorded_url): def _process_url(self, recorded_url):
if not hasattr(recorded_url, 'response_recorder'):
return
try: try:
records = [] records = []
if self._should_archive(recorded_url): if self._should_archive(recorded_url):