modify response headers from server, always send connection:close to proxy client

This commit is contained in:
Noah Levitt 2015-08-24 20:44:35 +00:00
parent f000d413a2
commit cc71c331a1
2 changed files with 72 additions and 39 deletions

47
warcprox/bigtable.py Normal file
View File

@ -0,0 +1,47 @@
# vim:set sw=4 et:
from __future__ import absolute_import
import logging
from hanzo import warctools
import rethinkdb
r = rethinkdb
import random
class RethinkCaptures:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3):
self.servers = servers
self.db = db
self.table = table
self.shards = shards
self.replicas = replicas
self._ensure_db_table()
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
# "Best practices: Managing connections: a connection per request"
def _random_server_connection(self):
server = random.choice(self.servers)
try:
host, port = server.split(":")
return r.connect(host=host, port=port)
except ValueError:
return r.connect(host=server)
def _ensure_db_table(self):
with self._random_server_connection() as conn:
dbs = r.db_list().run(conn)
if not self.db in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.db))
r.db_create(self.db).run(conn)
tables = r.db(self.db).table_list().run(conn)
if not self.table in tables:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
r.db(db).table_create(table, shards=3, replicas=3).run(conn)
r.db(db).table(table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn)
r.db(db).table(table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn)
# r.db(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn)
# r.db(self.db).table(self.table).index_create("timestamp").run(conn)
# r.db(self.db).table(self.table).index_create("sha1base32").run(conn)

View File

@ -63,35 +63,12 @@ class ProxyingRecorder(object):
self.len = 0
self.url = url
def _update_payload_digest(self, hunk):
if self.payload_digest is None:
# convoluted handling of two newlines crossing hunks
# XXX write tests for this
if self._prev_hunk_last_two_bytes.endswith(b'\n'):
if hunk.startswith(b'\n'):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[1:])
self.payload_offset = self.len + 1
elif hunk.startswith(b'\r\n'):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[2:])
self.payload_offset = self.len + 2
elif self._prev_hunk_last_two_bytes == b'\n\r':
if hunk.startswith(b'\n'):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[1:])
self.payload_offset = self.len + 1
else:
m = re.search(br'\n\r?\n', hunk)
if m is not None:
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[m.end():])
self.payload_offset = self.len + m.end()
def payload_starts_now(self):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_offset = self.len
# if we still haven't found start of payload hold on to these bytes
if self.payload_digest is None:
self._prev_hunk_last_two_bytes = hunk[-2:]
else:
def _update_payload_digest(self, hunk):
if self.payload_digest:
self.payload_digest.update(hunk)
def _update(self, hunk):
@ -100,7 +77,7 @@ class ProxyingRecorder(object):
self.tempfile.write(hunk)
if self._proxy_dest_conn_open:
if self.payload_digest and self._proxy_dest_conn_open:
try:
self.proxy_dest.sendall(hunk)
except BaseException as e:
@ -147,6 +124,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None):
http_client.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, method=method)
self.proxy_dest = proxy_dest
self.url = url
# Keep around extra reference to self.fp because HTTPResponse sets
@ -154,6 +132,19 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm, url=url)
self.fp = self.recorder
def begin(self):
http_client.HTTPResponse.begin(self) # reads status line, headers
status_and_headers = 'HTTP/1.1 {} {}\r\n'.format(self.status, self.reason)
for k,v in self.msg.items():
if k.lower() not in ('connection', 'proxy-connection', 'keep-alive',
'proxy-authenticate', 'proxy-authorization', 'upgrade'):
status_and_headers += '{}: {}\r\n'.format(k, v)
status_and_headers += 'Connection: close\r\n\r\n'
self.proxy_dest.sendall(status_and_headers.encode('latin1'))
self.recorder.payload_starts_now()
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
# self.server is WarcProxy
@ -207,7 +198,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join('{}: {}'.format(k,v) for (k,v) in self.headers.items())
req = req_str.encode('utf-8') + b'\r\n\r\n'
req = req_str.encode('latin1') + b'\r\n\r\n'
# Append message body if present to the request
if 'Content-Length' in self.headers:
@ -261,14 +252,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
prox_rec_res.close()
self._proxy_sock.close()
# XXX Close connection to proxy client. Doing this because we were
# seeing some connection hangs and this seems to solve that problem.
# Not clear what the correct, optimal behavior is. One thing we
# should probably(?) do is add "Connection: close" to response
# headers. Right now the response is passed through blindly as raw
# bytes so it's not completely trivial to do that.
self.connection.close()
return recorded_url
# deprecated
@ -360,7 +343,7 @@ class RecordedUrl:
self.response_recorder = None
class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
class SingleThreadedWarcProxy(http_server.HTTPServer):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, server_address=('localhost', 8000),
@ -394,3 +377,6 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
self.logger.info('WarcProxy shutting down')
http_server.HTTPServer.server_close(self)
class WarcProxy(socketserver.ThreadingMixIn, SingleThreadedWarcProxy):
pass