add length field to each record in big captures table (size in bytes of compressed warc record) because pywayback needs it

This commit is contained in:
Noah Levitt 2015-09-01 00:53:38 +00:00
parent a9986e4ce3
commit decb985250
3 changed files with 13 additions and 4 deletions

View File

@ -63,6 +63,13 @@ class RethinkCaptures:
bucket = "__unspecified__" bucket = "__unspecified__"
canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False) canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False)
mimetype = recorded_url.content_type
if mimetype:
n = mimetype.find(";")
if n >= 0:
mimetype = mimetype[:n]
entry = { entry = {
# id only specified for rethinkdb partitioning # id only specified for rethinkdb partitioning
"id": "{} {}".format(canon_surt[:20], records[0].id.decode("utf-8")[10:-1]), "id": "{} {}".format(canon_surt[:20], records[0].id.decode("utf-8")[10:-1]),
@ -75,10 +82,11 @@ class RethinkCaptures:
"warc_type": records[0].type.decode("utf-8"), "warc_type": records[0].type.decode("utf-8"),
"warc_id": records[0].id.decode("utf-8"), "warc_id": records[0].id.decode("utf-8"),
"sha1base32": base64.b32encode(recorded_url.response_recorder.payload_digest.digest()).decode("utf-8"), "sha1base32": base64.b32encode(recorded_url.response_recorder.payload_digest.digest()).decode("utf-8"),
"content_type": recorded_url.content_type, "content_type": mimetype,
"response_code": recorded_url.status, "response_code": recorded_url.status,
"http_method": recorded_url.method, "http_method": recorded_url.method,
"bucket": bucket, "bucket": bucket,
"length": records[0].length,
} }
result = self.r.run(r.table(self.table).insert(entry)) result = self.r.run(r.table(self.table).insert(entry))

View File

@ -147,25 +147,25 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler") logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler")
def _enforce_limits(self, warcprox_meta): def _enforce_limits(self, warcprox_meta):
if (warcprox_meta and "limits" in warcprox_meta): if warcprox_meta and "limits" in warcprox_meta:
# self.logger.info("warcprox_meta['limits']=%s", warcprox_meta['limits']) # self.logger.info("warcprox_meta['limits']=%s", warcprox_meta['limits'])
for item in warcprox_meta["limits"].items(): for item in warcprox_meta["limits"].items():
key, limit = item key, limit = item
bucket0, bucket1, bucket2 = key.rsplit(".", 2) bucket0, bucket1, bucket2 = key.rsplit(".", 2)
value = self.server.stats_db.value(bucket0, bucket1, bucket2) value = self.server.stats_db.value(bucket0, bucket1, bucket2)
if value and value >= limit: if value and value >= limit:
self.logger.info('sending "420 Reached limit" %s=%s', key, limit)
body = "request rejected by warcprox: reached limit {}={}\n".format(key, limit).encode("utf-8") body = "request rejected by warcprox: reached limit {}={}\n".format(key, limit).encode("utf-8")
self.send_response(420, "Reached limit") self.send_response(420, "Reached limit")
self.send_header("Content-Type", "text/plain;charset=utf-8") self.send_header("Content-Type", "text/plain;charset=utf-8")
self.send_header("Connection", "close") self.send_header("Connection", "close")
self.send_header("Content-Length", len(body)) self.send_header("Content-Length", len(body))
response_meta = {"reached-limit":{key:limit}, "stats":{bucket0: self.server.stats_db.value(bucket0)}} response_meta = {"reached-limit":{key:limit}, "stats":{bucket0:self.server.stats_db.value(bucket0)}}
self.send_header("Warcprox-Meta", json.dumps(response_meta, separators=(",",":"))) self.send_header("Warcprox-Meta", json.dumps(response_meta, separators=(",",":")))
self.end_headers() self.end_headers()
if self.command != "HEAD": if self.command != "HEAD":
self.wfile.write(body) self.wfile.write(body)
self.connection.close() self.connection.close()
self.logger.info("%s 420 %s %s -- reached limit %s=%s", self.client_address[0], self.command, self.url, key, limit)
return True return True
return False return False

View File

@ -89,6 +89,7 @@ class WarcWriter:
offset = writer.tell() offset = writer.tell()
record.write_to(writer, gzip=self.gzip) record.write_to(writer, gzip=self.gzip)
record.offset = offset record.offset = offset
record.length = writer.tell() - offset
record.warc_filename = self._f_finalname record.warc_filename = self._f_finalname
self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d', self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d',
record.get_header(warctools.WarcRecord.TYPE), record.get_header(warctools.WarcRecord.TYPE),