refactor some general mitm proxy stuff into mitmproxy.py

This commit is contained in:
Noah Levitt 2016-10-19 15:32:58 -07:00
parent 15eeaebde5
commit 719380e612
4 changed files with 112 additions and 79 deletions

View File

@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.0b2.dev31',
version='2.0b2.dev32',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -1,27 +1,25 @@
#!/usr/bin/env python
#
# tests/single-threaded-proxy.py - single-threaded recording proxy, useful for
# debugging
#
# Copyright (C) 2015-2016 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
"""
tests/single-threaded-proxy.py - single-threaded MITM proxy, useful for
debugging, does not write warcs
"""Useful for debugging. Does not write warcs."""
Copyright (C) 2015-2016 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
"""
from __future__ import absolute_import

View File

@ -48,6 +48,12 @@ import datetime
import socks
import tempfile
import hashlib
try:
import socketserver
except ImportError:
import SocketServer as socketserver
import resource
import concurrent.futures
class ProxyingRecorder(object):
"""
@ -397,3 +403,67 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
def log_error(self, fmt, *args):
self.logger.warn(fmt, *args)
class PooledMixIn(socketserver.ThreadingMixIn):
logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn")
def __init__(self, max_threads=None):
'''
If max_threads is not supplied, calculates a reasonable value based
on system resource limits.
'''
if not max_threads:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
# real user ID of the calling process."
rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0]
rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2)
self.logger.info(
"max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)",
max_threads, rlimit_nproc, rlimit_nofile)
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)
def process_request(self, request, client_address):
self.pool.submit(self.process_request_thread, request, client_address)
class MitmProxy(http_server.HTTPServer):
def finish_request(self, request, client_address):
'''
We override socketserver.BaseServer.finish_request to get at
MitmProxyHandler's self.request. A normal socket server's self.request
is set to `request` and never changes, but in our case, it may be
replaced with an SSL socket. The caller of this method (e.g.
self.process_request or PooledMitmProxy.process_request_thread) needs
to get a hold of that socket so it can close it.
'''
req_handler = self.RequestHandlerClass(request, client_address, self)
return req_handler.request
def process_request(self, request, client_address):
'''
This an almost verbatim copy/paste of
socketserver.BaseServer.process_request.
The only difference is that it expects self.finish_request to return
the request (i.e. the socket). This new value of request is passed on
to self.shutdown_request. See the comment on self.finish_request for
the rationale.
'''
request = self.finish_request(request, client_address)
self.shutdown_request(request)
class PooledMitmProxy(PooledMixIn, MitmProxy):
def process_request_thread(self, request, client_address):
'''
This an almost verbatim copy/paste of
socketserver.ThreadingMixIn.process_request_thread.
The only difference is that it expects self.finish_request to return
the request (i.e. the socket). This new value of request is passed on
to self.shutdown_request. See the comment on MitmProxy.finish_request
for the rationale.
'''
try:
request = self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

View File

@ -43,8 +43,6 @@ from hanzo import warctools
from certauth.certauth import CertificateAuthority
import warcprox
import datetime
import concurrent.futures
import resource
import ipaddress
import surt
@ -387,64 +385,31 @@ class SingleThreadedWarcProxy(http_server.HTTPServer):
self.options = options
class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(
self, ca=None, recorded_url_q=None, stats_db=None,
options=warcprox.Options()):
if options.max_threads:
self.logger.info(
"max_threads=%s set by command line option",
options.max_threads)
warcprox.mitmproxy.PooledMitmProxy.__init__(self, options.max_threads)
SingleThreadedWarcProxy.__init__(
self, ca, recorded_url_q, stats_db, options)
def server_activate(self):
http_server.HTTPServer.server_activate(self)
self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
self.logger.info(
'listening on %s:%s', self.server_address[0],
self.server_address[1])
def server_close(self):
self.logger.info('WarcProxy shutting down')
self.logger.info('shutting down')
http_server.HTTPServer.server_close(self)
def handle_error(self, request, client_address):
self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True)
def finish_request(self, request, client_address):
'''
We override socketserver.BaseServer.finish_request to get at
WarcProxyHandler's self.request. A normal socket server's self.request
is set to `request` and never changes, but in our case, it may be
replaced with an SSL socket. The caller of this method, e.g.
PooledMixIn.process_request_thread, needs to get a hold of that socket
so it can close it.
'''
req_handler = WarcProxyHandler(request, client_address, self)
return req_handler.request
class PooledMixIn(socketserver.ThreadingMixIn):
def process_request(self, request, client_address):
self.pool.submit(self.process_request_thread, request, client_address)
def process_request_thread(self, request, client_address):
'''
This an almost verbatim copy/paste of
socketserver.ThreadingMixIn.process_request_thread.
The only difference is that it expects self.finish_request to return
a request. See the comment on SingleThreadedWarcProxy.finish_request
above.
'''
try:
request = self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
class WarcProxy(PooledMixIn, SingleThreadedWarcProxy):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, *args, **kwargs):
SingleThreadedWarcProxy.__init__(self, *args, **kwargs)
if self.options.max_threads:
max_threads = self.options.max_threads
self.logger.info("max_threads=%s set by command line option",
max_threads)
else:
# man getrlimit: "RLIMIT_NPROC The maximum number of processes (or,
# more precisely on Linux, threads) that can be created for the
# real user ID of the calling process."
rlimit_nproc = resource.getrlimit(resource.RLIMIT_NPROC)[0]
rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
max_threads = min(rlimit_nofile // 10, rlimit_nproc // 2)
self.logger.info("max_threads=%s (rlimit_nproc=%s, rlimit_nofile=%s)",
max_threads, rlimit_nproc, rlimit_nofile)
self.pool = concurrent.futures.ThreadPoolExecutor(max_threads)
self.logger.warn(
"exception processing request %s from %s", request,
client_address, exc_info=True)