diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py new file mode 100644 index 0000000..60be77a --- /dev/null +++ b/warcprox/bigtable.py @@ -0,0 +1,47 @@ +# vim:set sw=4 et: + +from __future__ import absolute_import + +import logging +from hanzo import warctools +import rethinkdb +r = rethinkdb +import random + +class RethinkCaptures: + logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") + + def __init__(self, servers=["localhost"], db="warcprox", table="captures", shards=3, replicas=3): + self.servers = servers + self.db = db + self.table = table + self.shards = shards + self.replicas = replicas + self._ensure_db_table() + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + server = random.choice(self.servers) + try: + host, port = server.split(":") + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + + def _ensure_db_table(self): + with self._random_server_connection() as conn: + dbs = r.db_list().run(conn) + if not self.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.db)) + r.db_create(self.db).run(conn) + tables = r.db(self.db).table_list().run(conn) + if not self.table in tables: + self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) + r.db(db).table_create(table, shards=3, replicas=3).run(conn) + r.db(db).table(table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) + r.db(db).table(table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn) + # r.db(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn) + # r.db(self.db).table(self.table).index_create("timestamp").run(conn) + # r.db(self.db).table(self.table).index_create("sha1base32").run(conn) + diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 6839ddc..21c72b8 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -63,35 +63,12 @@ class ProxyingRecorder(object): self.len = 0 self.url = url - def _update_payload_digest(self, hunk): - if self.payload_digest is None: - # convoluted handling of two newlines crossing hunks - # XXX write tests for this - if self._prev_hunk_last_two_bytes.endswith(b'\n'): - if hunk.startswith(b'\n'): - self.payload_digest = hashlib.new(self.digest_algorithm) - self.payload_digest.update(hunk[1:]) - self.payload_offset = self.len + 1 - elif hunk.startswith(b'\r\n'): - self.payload_digest = hashlib.new(self.digest_algorithm) - self.payload_digest.update(hunk[2:]) - self.payload_offset = self.len + 2 - elif self._prev_hunk_last_two_bytes == b'\n\r': - if hunk.startswith(b'\n'): - self.payload_digest = hashlib.new(self.digest_algorithm) - self.payload_digest.update(hunk[1:]) - self.payload_offset = self.len + 1 - else: - m = re.search(br'\n\r?\n', hunk) - if m is not None: - self.payload_digest = hashlib.new(self.digest_algorithm) - self.payload_digest.update(hunk[m.end():]) - self.payload_offset = self.len + m.end() + def payload_starts_now(self): + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_offset = self.len - # if we still haven't found start of payload hold on to these bytes - if self.payload_digest is None: - self._prev_hunk_last_two_bytes = hunk[-2:] - else: + def _update_payload_digest(self, hunk): + if self.payload_digest: self.payload_digest.update(hunk) def _update(self, hunk): @@ -100,7 +77,7 @@ class ProxyingRecorder(object): self.tempfile.write(hunk) - if self._proxy_dest_conn_open: + if self.payload_digest and self._proxy_dest_conn_open: try: self.proxy_dest.sendall(hunk) except BaseException as e: @@ -147,6 +124,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None): http_client.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, method=method) + self.proxy_dest = proxy_dest self.url = url # Keep around extra reference to self.fp because HTTPResponse sets @@ -154,6 +132,19 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm, url=url) self.fp = self.recorder + def begin(self): + http_client.HTTPResponse.begin(self) # reads status line, headers + + status_and_headers = 'HTTP/1.1 {} {}\r\n'.format(self.status, self.reason) + for k,v in self.msg.items(): + if k.lower() not in ('connection', 'proxy-connection', 'keep-alive', + 'proxy-authenticate', 'proxy-authorization', 'upgrade'): + status_and_headers += '{}: {}\r\n'.format(k, v) + status_and_headers += 'Connection: close\r\n\r\n' + self.proxy_dest.sendall(status_and_headers.encode('latin1')) + + self.recorder.payload_starts_now() + class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): # self.server is WarcProxy @@ -207,7 +198,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( req_str += '\r\n'.join('{}: {}'.format(k,v) for (k,v) in self.headers.items()) - req = req_str.encode('utf-8') + b'\r\n\r\n' + req = req_str.encode('latin1') + b'\r\n\r\n' # Append message body if present to the request if 'Content-Length' in self.headers: @@ -261,14 +252,6 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): prox_rec_res.close() self._proxy_sock.close() - # XXX Close connection to proxy client. Doing this because we were - # seeing some connection hangs and this seems to solve that problem. - # Not clear what the correct, optimal behavior is. One thing we - # should probably(?) do is add "Connection: close" to response - # headers. Right now the response is passed through blindly as raw - # bytes so it's not completely trivial to do that. - self.connection.close() - return recorded_url # deprecated @@ -360,7 +343,7 @@ class RecordedUrl: self.response_recorder = None -class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): +class SingleThreadedWarcProxy(http_server.HTTPServer): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__(self, server_address=('localhost', 8000), @@ -394,3 +377,6 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): self.logger.info('WarcProxy shutting down') http_server.HTTPServer.server_close(self) + +class WarcProxy(socketserver.ThreadingMixIn, SingleThreadedWarcProxy): + pass