use SpooledTemporaryFile for WARCPROX_WRITE_RECORD

payloads. because as of https://github.com/internetarchive/brozzler/pull/115
brozzler will be sending big videos via WARCPROX_WRITE_RECORD
This commit is contained in:
Noah Levitt 2018-08-16 11:06:58 -07:00
parent 0031091d4f
commit 17a5fabb75
6 changed files with 63 additions and 27 deletions

View File

@ -40,7 +40,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.4b3.dev180',
version='2.4b3.dev181',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -34,6 +34,7 @@ import warcprox
import io
import tempfile
import logging
import hashlib
def lock_file(queue, filename):
"""Try to lock file and return 1 if successful, else return 0.
@ -58,7 +59,7 @@ def test_warc_writer_locking(tmpdir):
url='http://example.com', content_type='text/plain', status=200,
client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
timestamp=datetime.utcnow(), payload_digest=hashlib.sha1())
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(
@ -247,7 +248,7 @@ def test_warc_writer_filename(tmpdir):
url='http://example.com', content_type='text/plain', status=200,
client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
timestamp=datetime.utcnow(), payload_digest=hashlib.sha1())
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, prefix='foo',

View File

@ -49,7 +49,7 @@ class CrawlLogger(object):
self.options.base32)
else:
# WARCPROX_WRITE_RECORD request
content_length = len(recorded_url.request_data)
content_length = int(records[0].get_header(b'Content-Length'))
payload_digest = records[0].get_header(b'WARC-Payload-Digest')
fields = [
'{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),

View File

@ -283,9 +283,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._remote_server_conn.sock)
except ssl.SSLError:
self.logger.warn(
"failed to establish ssl connection to %s; python "
"ssl library does not support SNI, considering "
"upgrading to python >= 2.7.9 or python 3.4",
"failed to establish ssl connection to %s; "
"python ssl library does not support SNI, "
"consider upgrading to python 2.7.9+ or 3.4+",
self.hostname)
raise
return self._remote_server_conn.sock
@ -424,8 +424,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.command, self.path, self.request_version)
# Swallow headers that don't make sense to forward on, i.e. most
# hop-by-hop headers, see
# http://tools.ietf.org/html/rfc2616#section-13.5.
# hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5.
# self.headers is an email.message.Message, which is case-insensitive
# and doesn't throw KeyError in __delitem__
for key in (

View File

@ -19,8 +19,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import warcprox
import hashlib
@ -83,16 +81,21 @@ class WarcRecordBuilder:
concurrent_to=principal_record.id)
return principal_record, request_record
else:
principal_record = self.build_warc_record(url=recorded_url.url,
principal_record = self.build_warc_record(
url=recorded_url.url,
warc_date=warc_date, data=recorded_url.request_data,
warc_type=recorded_url.custom_type,
content_type=recorded_url.content_type.encode("latin1"))
content_type=recorded_url.content_type.encode("latin1"),
payload_digest=warcprox.digest_str(
recorded_url.payload_digest, self.base32),
content_length=recorded_url.size)
return (principal_record,)
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
profile=None, refers_to=None, refers_to_target_uri=None,
refers_to_date=None, payload_digest=None, truncated=None):
refers_to_date=None, payload_digest=None, truncated=None,
content_length=None):
if warc_date is None:
warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow())
@ -126,21 +129,41 @@ class WarcRecordBuilder:
headers.append((b'WARC-Truncated', truncated))
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1')))
if content_length is not None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(content_length).encode('latin1')))
else:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(recorder.block_digest, self.base32)))
recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1')))
digest = hashlib.new(self.digest_algorithm, data)
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(digest, self.base32)))
if content_length is not None:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(content_length).encode('latin1')))
else:
headers.append((
warctools.WarcRecord.CONTENT_LENGTH,
str(len(data)).encode('latin1')))
# no http headers so block digest == payload digest
if not payload_digest:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
warcprox.digest_str(digest, self.base32)))
content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple)
payload_digest = warcprox.digest_str(
hashlib.new(self.digest_algorithm, data), self.base32)
headers.append((
warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest))
headers.append((warctools.WarcRecord.BLOCK_DIGEST, payload_digest))
if hasattr(data, 'read'):
record = warctools.WarcRecord(
headers=headers, content_file=data)
else:
content_tuple = content_type, data
record = warctools.WarcRecord(
headers=headers, content=content_tuple)
return record

View File

@ -44,6 +44,8 @@ import datetime
import urlcanon
import os
from urllib3 import PoolManager
import tempfile
import hashlib
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'''
@ -285,8 +287,16 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
and (warc_type or 'WARC-Type' in self.headers)):
timestamp = datetime.datetime.utcnow()
# stream this?
request_data = self.rfile.read(int(self.headers['Content-Length']))
request_data = tempfile.SpooledTemporaryFile(max_size=524288)
payload_digest = hashlib.new(self.server.digest_algorithm)
length = int(self.headers['Content-Length'])
buf = self.rfile.read(min(65536, length - request_data.tell()))
while buf != b'':
request_data.write(buf)
payload_digest.update(buf)
buf = self.rfile.read(
min(65536, length - request_data.tell()))
warcprox_meta = None
raw_warcprox_meta = self.headers.get('Warcprox-Meta')
@ -301,11 +311,14 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
warcprox_meta=warcprox_meta,
content_type=self.headers['Content-Type'],
custom_type=warc_type or self.headers['WARC-Type'].encode('utf-8'),
status=204, size=len(request_data),
status=204,
size=request_data.tell(),
client_ip=self.client_address[0],
method=self.command,
timestamp=timestamp,
duration=datetime.datetime.utcnow()-timestamp)
duration=datetime.datetime.utcnow()-timestamp,
payload_digest=payload_digest)
request_data.seek(0)
self.server.recorded_url_q.put(rec_custom)
self.send_response(204, 'OK')