Merge branch 'log-long-fetches' into qa

This commit is contained in:
Barbara Miller 2019-09-13 11:44:14 -07:00
commit 999332ef3f
3 changed files with 47 additions and 24 deletions

View File

@ -28,7 +28,7 @@ deps = [
'warctools>=4.10.0', 'warctools>=4.10.0',
'urlcanon>=0.3.0', 'urlcanon>=0.3.0',
'doublethink>=0.2.0.dev87', 'doublethink>=0.2.0.dev87',
'urllib3>=1.14,<1.25', 'urllib3>=1.14',
'requests>=2.0.1', 'requests>=2.0.1',
'PySocks>=1.6.8', 'PySocks>=1.6.8',
'cryptography>=2.3', 'cryptography>=2.3',
@ -43,7 +43,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.4.14', version='2.4.18',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -276,6 +276,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
host=self.hostname, port=int(self.port), scheme='http', host=self.hostname, port=int(self.port), scheme='http',
pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout}) pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout})
remote_ip = None
self._remote_server_conn = self._conn_pool._get_conn() self._remote_server_conn = self._conn_pool._get_conn()
if is_connection_dropped(self._remote_server_conn): if is_connection_dropped(self._remote_server_conn):
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'): if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
@ -291,6 +293,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._remote_server_conn.sock.connect((self.hostname, int(self.port))) self._remote_server_conn.sock.connect((self.hostname, int(self.port)))
else: else:
self._remote_server_conn.connect() self._remote_server_conn.connect()
remote_ip = self._remote_server_conn.sock.getpeername()[0]
# Wrap socket if SSL is required # Wrap socket if SSL is required
if self.is_connect: if self.is_connect:
@ -312,6 +315,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
"consider upgrading to python 2.7.9+ or 3.4+", "consider upgrading to python 2.7.9+ or 3.4+",
self.hostname) self.hostname)
raise raise
except ssl.SSLError as e:
self.logger.error(
'error connecting to %s (%s) port %s: %s',
self.hostname, remote_ip, self.port, e)
raise
return self._remote_server_conn.sock return self._remote_server_conn.sock
def _transition_to_ssl(self): def _transition_to_ssl(self):
@ -553,15 +561,18 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
'bytes exceeded for URL %s', 'bytes exceeded for URL %s',
self._max_resource_size, self.url) self._max_resource_size, self.url)
break break
elif (not 'content-length' in self.headers elif time.time() - start > 3 * 60 * 60:
and time.time() - start > 3 * 60 * 60): if not 'content-length' in self.headers:
prox_rec_res.truncated = b'time' prox_rec_res.truncated = b'time'
self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR)
self._remote_server_conn.sock.close() self._remote_server_conn.sock.close()
self.logger.info( self.logger.info(
'reached hard timeout of 3 hours fetching url ' 'reached hard timeout of 3 hours fetching url '
'without content-length: %s', self.url) 'without content-length: %s', self.url)
break break
else:
self.logger.info(
'long-running fetch for URL %s', self.url)
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len) self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
# Let's close off the remote end. If remote connection is fine, # Let's close off the remote end. If remote connection is fine,
@ -572,17 +583,27 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
# A common error is to connect to the remote server successfully # A common error is to connect to the remote server successfully
# but raise a `RemoteDisconnected` exception when trying to begin # but raise a `RemoteDisconnected` exception when trying to begin
# downloading. Its caused by prox_rec_res.begin(...) which calls # downloading. Its caused by prox_rec_res.begin(...) which calls
# http_client._read_status(). In that case, the host is also bad # http_client._read_status(). The connection fails there.
# and we must add it to `bad_hostnames_ports` cache. # https://github.com/python/cpython/blob/3.7/Lib/http/client.py#L275
if isinstance(e, http_client.RemoteDisconnected): # Another case is when the connection is fine but the response
# status is problematic, raising `BadStatusLine`.
# https://github.com/python/cpython/blob/3.7/Lib/http/client.py#L296
# In both cases, the host is bad and we must add it to
# `bad_hostnames_ports` cache.
if isinstance(e, (http_client.RemoteDisconnected,
http_client.BadStatusLine)):
host_port = self._hostname_port_cache_key() host_port = self._hostname_port_cache_key()
with self.server.bad_hostnames_ports_lock: with self.server.bad_hostnames_ports_lock:
self.server.bad_hostnames_ports[host_port] = 502 self.server.bad_hostnames_ports[host_port] = 502
self.logger.info('bad_hostnames_ports cache size: %d', self.logger.info('bad_hostnames_ports cache size: %d',
len(self.server.bad_hostnames_ports)) len(self.server.bad_hostnames_ports))
self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR) # Close the connection only if its still open. If its already
self._remote_server_conn.sock.close() # closed, an `OSError` "([Errno 107] Transport endpoint is not
# connected)" would be raised.
if not is_connection_dropped(self._remote_server_conn):
self._remote_server_conn.sock.shutdown(socket.SHUT_RDWR)
self._remote_server_conn.sock.close()
raise raise
finally: finally:
if prox_rec_res: if prox_rec_res:

View File

@ -115,10 +115,8 @@ class WarcWriter:
''' '''
Ensures `self.f` is ready to write the next warc record. Ensures `self.f` is ready to write the next warc record.
Closes current warc if size limit has been reached. Then, if warc is If warc is not open, opens one, and writes the warcinfo record.
not open, opens one, and writes the warcinfo record.
''' '''
self.maybe_size_rollover()
if not self.f: if not self.f:
serial = self.serial serial = self.serial
self.serial += 1 self.serial += 1
@ -136,11 +134,14 @@ class WarcWriter:
records = self.record_builder.build_warc_records(recorded_url) records = self.record_builder.build_warc_records(recorded_url)
self.ensure_open() self.ensure_open()
total_warc_file_size = None
for record in records: for record in records:
offset = self.f.tell() offset = self.f.tell()
record.write_to(self.f, gzip=self.gzip) record.write_to(self.f, gzip=self.gzip)
record.offset = offset record.offset = offset
record.length = self.f.tell() - offset offset2 = self.f.tell()
record.length = offset2 - offset
total_warc_file_size = offset2
record.warc_filename = self.finalname record.warc_filename = self.finalname
self.logger.trace( self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s ' 'wrote warc record: warc_type=%s content_length=%s '
@ -150,7 +151,8 @@ class WarcWriter:
self.path, record.get_header(warctools.WarcRecord.URL)) self.path, record.get_header(warctools.WarcRecord.URL))
self.f.flush() self.f.flush()
self.last_activity = time.time() self.last_activity = time.time()
# Closes current warc if size limit has been reached.
self.maybe_size_rollover(total_warc_file_size)
return records return records
def close(self): def close(self):
@ -185,11 +187,11 @@ class WarcWriter:
self.finalname, time.time() - self.last_activity) self.finalname, time.time() - self.last_activity)
self.close() self.close()
def maybe_size_rollover(self): def maybe_size_rollover(self, total_warc_file_size):
if self.path and os.path.getsize(self.path) > self.rollover_size: if total_warc_file_size and total_warc_file_size > self.rollover_size:
self.logger.info( self.logger.info(
'rolling over %s because it has reached %s bytes in size', 'rolling over %s because it has reached %s bytes in size',
self.finalname, os.path.getsize(self.path)) self.finalname, total_warc_file_size)
self.close() self.close()
class WarcWriterPool: class WarcWriterPool: