mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
limit max number of threads to 500; make sure connection with proxy client has a timeout; log errors from connection with proxy client
This commit is contained in:
parent
d8f97ad472
commit
734b2f5396
@ -78,7 +78,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
default=None, help='kafka broker list for capture feed')
|
default=None, help='kafka broker list for capture feed')
|
||||||
arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
|
arg_parser.add_argument('--kafka-capture-feed-topic', dest='kafka_capture_feed_topic',
|
||||||
default=None, help='kafka capture feed topic')
|
default=None, help='kafka capture feed topic')
|
||||||
arg_parser.add_argument('--queue-size', dest='queue_size', default=1000,
|
arg_parser.add_argument('--queue-size', dest='queue_size', default=500,
|
||||||
|
help=argparse.SUPPRESS)
|
||||||
|
arg_parser.add_argument('--max-threads', dest='max_threads', default=500,
|
||||||
help=argparse.SUPPRESS)
|
help=argparse.SUPPRESS)
|
||||||
arg_parser.add_argument('--profile', action='store_true', default=False,
|
arg_parser.add_argument('--profile', action='store_true', default=False,
|
||||||
help=argparse.SUPPRESS)
|
help=argparse.SUPPRESS)
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
# vim:set sw=4 et:
|
|
||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -17,14 +15,16 @@ import logging
|
|||||||
import ssl
|
import ssl
|
||||||
import warcprox
|
import warcprox
|
||||||
import threading
|
import threading
|
||||||
|
import datetime
|
||||||
|
|
||||||
class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
||||||
logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler")
|
logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler")
|
||||||
|
|
||||||
def __init__(self, request, client_address, server):
|
def __init__(self, request, client_address, server):
|
||||||
threading.current_thread.name = 'MitmProxyHandler-thread(tid={})'.format(warcprox.gettid())
|
threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1])
|
||||||
self.is_connect = False
|
self.is_connect = False
|
||||||
self._headers_buffer = []
|
self._headers_buffer = []
|
||||||
|
request.settimeout(60) # XXX what value should this have?
|
||||||
http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
|
http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
|
||||||
|
|
||||||
def _determine_host_port(self):
|
def _determine_host_port(self):
|
||||||
@ -52,7 +52,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
def _connect_to_host(self):
|
def _connect_to_host(self):
|
||||||
# Connect to destination
|
# Connect to destination
|
||||||
self._proxy_sock = socket.socket()
|
self._proxy_sock = socket.socket()
|
||||||
self._proxy_sock.settimeout(60)
|
self._proxy_sock.settimeout(60) # XXX what value should this have?
|
||||||
self._proxy_sock.connect((self.hostname, int(self.port)))
|
self._proxy_sock.connect((self.hostname, int(self.port)))
|
||||||
|
|
||||||
# Wrap socket if SSL is required
|
# Wrap socket if SSL is required
|
||||||
@ -146,4 +146,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
if item.startswith('do_'):
|
if item.startswith('do_'):
|
||||||
return self.do_COMMAND
|
return self.do_COMMAND
|
||||||
|
|
||||||
|
def log_error(self, fmt, *args):
|
||||||
|
self.logger.warn(fmt, *args)
|
||||||
|
|
||||||
|
@ -35,6 +35,7 @@ from hanzo import warctools
|
|||||||
from certauth.certauth import CertificateAuthority
|
from certauth.certauth import CertificateAuthority
|
||||||
import warcprox
|
import warcprox
|
||||||
import datetime
|
import datetime
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
class ProxyingRecorder(object):
|
class ProxyingRecorder(object):
|
||||||
"""
|
"""
|
||||||
@ -294,10 +295,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
|
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def log_error(self, fmt, *args):
|
|
||||||
# logging better handled elsewhere?
|
|
||||||
pass
|
|
||||||
|
|
||||||
def log_message(self, fmt, *args):
|
def log_message(self, fmt, *args):
|
||||||
# logging better handled elsewhere?
|
# logging better handled elsewhere?
|
||||||
pass
|
pass
|
||||||
@ -385,5 +382,14 @@ class SingleThreadedWarcProxy(http_server.HTTPServer):
|
|||||||
def handle_error(self, request, client_address):
|
def handle_error(self, request, client_address):
|
||||||
self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True)
|
self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True)
|
||||||
|
|
||||||
class WarcProxy(socketserver.ThreadingMixIn, SingleThreadedWarcProxy):
|
class PooledMixIn(socketserver.ThreadingMixIn):
|
||||||
pass
|
def process_request(self, request, client_address):
|
||||||
|
if hasattr(self, 'pool') and self.pool:
|
||||||
|
self.pool.submit(self.process_request_thread, request, client_address)
|
||||||
|
else:
|
||||||
|
socketserver.ThreadingMixIn.process_request(self, request, client_address)
|
||||||
|
|
||||||
|
class WarcProxy(PooledMixIn, SingleThreadedWarcProxy):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
SingleThreadedWarcProxy.__init__(self, *args, **kwargs)
|
||||||
|
self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.options.max_threads or 500)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user