refactor warc writing, deduplication for somewhat cleaner separation of concerns

This commit is contained in:
Noah Levitt 2015-07-30 00:12:59 +00:00
parent 10c724637f
commit 274a2f6b1d
10 changed files with 812 additions and 29 deletions

View File

@ -5,7 +5,7 @@ from setuptools.command.test import test as TestCommand
import sys
import setuptools
VERSION_BYTES = b'1.4'
VERSION_BYTES = b'1.5'
def full_version_bytes():
import subprocess, time

View File

@ -1,3 +1,18 @@
# vim:set sw=4 et:
import warcprox.controller as controller
import warcprox.playback as playback
import warcprox.dedup as dedup
import warcprox.warcproxy as warcproxy
import warcprox.mitmproxy as mitmproxy
import warcprox.writer as writer
import warcprox.warc as warc
import warcprox.writerthread as writerthread
def digest_str(hash_obj, base32):
import base64
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if base32 else hash_obj.hexdigest().encode('ascii'))
def _read_version_bytes():
import os
version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt'])

View File

@ -5,9 +5,7 @@ from __future__ import absolute_import
import logging
import threading
import time
import warcprox.warcprox
import warcprox.warcwriter
import warcprox
class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController")
@ -61,8 +59,8 @@ class WarcproxController(object):
self.proxy.shutdown()
self.proxy.server_close()
if self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db is not None:
self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.close()
if self.warc_writer_thread.dedup_db is not None:
self.warc_writer_thread.dedup_db.close()
if self.playback_proxy is not None:
self.playback_proxy.shutdown()

View File

@ -14,6 +14,7 @@ import logging
import os
import json
from hanzo import warctools
import warcprox
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -44,17 +45,21 @@ class DedupDb(object):
json_value = json.dumps(py_value, separators=(',',':'))
self.db[key] = json_value.encode('utf-8')
self.logger.debug('dedup db saved {}:{}'.format(key, json_value))
self.logger.debug('dedup db saved %s:%s', key, json_value)
def lookup(self, key):
result = None
if key in self.db:
json_result = self.db[key]
result = json.loads(json_result.decode('utf-8'))
result['i'] = result['i'].encode('latin1')
result['u'] = result['u'].encode('latin1')
result['d'] = result['d'].encode('latin1')
return result
else:
return None
self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
return result
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if recorded_url.response_recorder.payload_digest:
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
recorded_url.dedup_info = dedup_db.lookup(key)

View File

@ -18,14 +18,8 @@ import pprint
import traceback
import signal
import threading
import certauth.certauth
import warcprox.playback
import warcprox.dedup
import warcprox.warcwriter
import warcprox.warcprox
import warcprox.controller
import warcprox
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(prog=prog,
@ -124,7 +118,7 @@ def main(argv=sys.argv):
ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir,
ca_name=ca_name)
proxy = warcprox.warcprox.WarcProxy(
proxy = warcprox.warcproxy.WarcProxy(
server_address=(args.address, int(args.port)), ca=ca,
recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm)
@ -139,15 +133,15 @@ def main(argv=sys.argv):
playback_index_db = None
playback_proxy = None
default_warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory,
default_warc_writer = warcprox.writer.WarcWriter(directory=args.directory,
gzip=args.gzip, prefix=args.prefix, port=int(args.port),
rollover_size=int(args.size), base32=args.base32,
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
playback_index_db=playback_index_db,
digest_algorithm=args.digest_algorithm,
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
writer_pool=warcprox.warcwriter.WarcWriterPool(default_warc_writer)
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
writer_pool=writer_pool)
writer_pool=warcprox.writer.WarcWriterPool(default_warc_writer)
warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, playback_index_db=playback_index_db)
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)

149
warcprox/warc.py Normal file
View File

@ -0,0 +1,149 @@
# vim:set sw=4 et:
from __future__ import absolute_import
import logging
import warcprox
import hashlib
import socket
import hanzo.httptools
from hanzo import warctools
import warcprox
from datetime import datetime
class WarcRecordBuilder:
logger = logging.getLogger("warcprox.warc.WarcRecordBuilder")
def __init__(self, digest_algorithm="sha1", base32=False):
self.digest_algorithm = digest_algorithm
self.base32 = base32
def _build_response_principal_record(self, recorded_url, warc_date):
"""Builds response or revisit record, whichever is appropriate."""
if hasattr(recorded_url, "dedup_info") and recorded_url.dedup_info:
# revisit record
recorded_url.response_recorder.tempfile.seek(0)
if recorded_url.response_recorder.payload_offset is not None:
response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset)
else:
response_header_block = recorded_url.response_recorder.tempfile.read()
return self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=recorded_url.dedup_info['i'],
refers_to_target_uri=recorded_url.dedup_info['u'],
refers_to_date=recorded_url.dedup_info['d'],
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
else:
# response record
return self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
def build_warc_records(self, recorded_url):
"""Returns a tuple of hanzo.warctools.warc.WarcRecord (principal_record, ...)"""
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
if recorded_url.response_recorder:
principal_record = self._build_response_principal_record(recorded_url, warc_date)
request_record = self.build_warc_record(url=recorded_url.url,
warc_date=warc_date, data=recorded_url.request_data,
warc_type=warctools.WarcRecord.REQUEST,
content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE,
concurrent_to=principal_record.id)
return principal_record, request_record
else:
principal_record = self.build_warc_record(url=recorded_url.url,
warc_date=warc_date, data=recorded_url.request_data,
warc_type=recorded_url.custom_type,
content_type=recorded_url.content_type)
return (principal_record,)
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
profile=None, refers_to=None, refers_to_target_uri=None,
refers_to_date=None, payload_digest=None):
if warc_date is None:
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.DATE, warc_date))
headers.append((warctools.WarcRecord.URL, url))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if profile is not None:
headers.append((warctools.WarcRecord.PROFILE, profile))
if refers_to is not None:
headers.append((warctools.WarcRecord.REFERS_TO, refers_to))
if refers_to_target_uri is not None:
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
if refers_to_date is not None:
headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date))
if concurrent_to is not None:
headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to))
if content_type is not None:
headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type))
if payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest))
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(recorder.block_digest, self.base32)))
if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
warcprox.digest_str(recorder.payload_digest, self.base32)))
recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1')))
block_digest = hashlib.new(self.digest_algorithm, data)
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(block_digest, self.base32)))
content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple)
return record
def build_warcinfo_record(self, filename):
warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow())
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO))
headers.append((warctools.WarcRecord.FILENAME, filename.encode('latin1')))
headers.append((warctools.WarcRecord.DATE, warc_record_date))
warcinfo_fields = []
warcinfo_fields.append(b'software: warcprox ' + warcprox.version_bytes)
hostname = socket.gethostname()
warcinfo_fields.append('hostname: {}'.format(hostname).encode('latin1'))
warcinfo_fields.append('ip: {}'.format(socket.gethostbyname(hostname)).encode('latin1'))
warcinfo_fields.append(b'format: WARC File Format 1.0')
# warcinfo_fields.append('robots: ignore')
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = b'\r\n'.join(warcinfo_fields) + b'\r\n'
record = warctools.WarcRecord(headers=headers, content=(b'application/warc-fields', data))
return record

View File

@ -39,7 +39,7 @@ import socket
from hanzo import warctools
from certauth.certauth import CertificateAuthority
import warcprox.mitmproxy
import warcprox
class ProxyingRecorder(object):
"""
@ -47,7 +47,7 @@ class ProxyingRecorder(object):
calculating digests, and sending them on to the proxy client.
"""
logger = logging.getLogger("warcprox.warcprox.ProxyingRecorder")
logger = logging.getLogger("warcprox.warcproxy.ProxyingRecorder")
def __init__(self, fp, proxy_dest, digest_algorithm='sha1', url=None):
self.fp = fp
@ -153,7 +153,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler")
logger = logging.getLogger("warcprox.warcproxy.WarcProxyHandler")
def _proxy_request(self):
# Build request
@ -273,7 +273,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
pass
class RecordedUrl(object):
class RecordedUrl:
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None):
@ -305,8 +305,15 @@ class RecordedUrl(object):
self.client_ip = client_ip
self.method = method
def __del__(self):
self.logger.info("finished with %s", self)
if self.response_recorder:
self.response_recorder.tempfile.close()
self.response_recorder = None
class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
logger = logging.getLogger("warcprox.warcprox.WarcProxy")
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, server_address=('localhost', 8000),
req_handler_class=WarcProxyHandler, bind_and_activate=True,

345
warcprox/warcproxy.py Normal file
View File

@ -0,0 +1,345 @@
#!/usr/bin/env python
# vim:set sw=4 et:
#
"""
WARC writing MITM HTTP/S proxy
See README.rst or https://github.com/internetarchive/warcprox
"""
from __future__ import absolute_import
try:
import http.server as http_server
except ImportError:
import BaseHTTPServer as http_server
try:
import socketserver
except ImportError:
import SocketServer as socketserver
try:
import queue
except ImportError:
import Queue as queue
try:
import http.client as http_client
except ImportError:
import httplib as http_client
import logging
import re
import tempfile
import traceback
import hashlib
import json
import socket
from hanzo import warctools
from certauth.certauth import CertificateAuthority
import warcprox.mitmproxy
class ProxyingRecorder(object):
"""
Wraps a socket._fileobject, recording the bytes as they are read,
calculating digests, and sending them on to the proxy client.
"""
logger = logging.getLogger("warcprox.warcproxy.ProxyingRecorder")
def __init__(self, fp, proxy_dest, digest_algorithm='sha1', url=None):
self.fp = fp
# "The file has no name, and will cease to exist when it is closed."
self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024)
self.digest_algorithm = digest_algorithm
self.block_digest = hashlib.new(digest_algorithm)
self.payload_offset = None
self.payload_digest = None
self.proxy_dest = proxy_dest
self._proxy_dest_conn_open = True
self._prev_hunk_last_two_bytes = b''
self.len = 0
self.url = url
def _update_payload_digest(self, hunk):
if self.payload_digest is None:
# convoluted handling of two newlines crossing hunks
# XXX write tests for this
if self._prev_hunk_last_two_bytes.endswith(b'\n'):
if hunk.startswith(b'\n'):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[1:])
self.payload_offset = self.len + 1
elif hunk.startswith(b'\r\n'):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[2:])
self.payload_offset = self.len + 2
elif self._prev_hunk_last_two_bytes == b'\n\r':
if hunk.startswith(b'\n'):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[1:])
self.payload_offset = self.len + 1
else:
m = re.search(br'\n\r?\n', hunk)
if m is not None:
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_digest.update(hunk[m.end():])
self.payload_offset = self.len + m.end()
# if we still haven't found start of payload hold on to these bytes
if self.payload_digest is None:
self._prev_hunk_last_two_bytes = hunk[-2:]
else:
self.payload_digest.update(hunk)
def _update(self, hunk):
self._update_payload_digest(hunk)
self.block_digest.update(hunk)
self.tempfile.write(hunk)
if self._proxy_dest_conn_open:
try:
self.proxy_dest.sendall(hunk)
except BaseException as e:
self._proxy_dest_conn_open = False
self.logger.warn('{} sending data to proxy client for url {}'.format(e, self.url))
self.logger.info('will continue downloading from remote server without sending to client {}'.format(self.url))
self.len += len(hunk)
def read(self, size=-1):
hunk = self.fp.read(size)
self._update(hunk)
return hunk
def readinto(self, b):
n = self.fp.readinto(b)
self._update(b[:n])
return n
def readline(self, size=-1):
# XXX depends on implementation details of self.fp.readline(), in
# particular that it doesn't call self.fp.read()
hunk = self.fp.readline(size)
self._update(hunk)
return hunk
def close(self):
return self.fp.close()
def __len__(self):
return self.len
def payload_size(self):
if self.payload_offset is not None:
return self.len - self.payload_offset
else:
return 0
class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None):
http_client.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, method=method)
self.url = url
# Keep around extra reference to self.fp because HTTPResponse sets
# self.fp=None after it finishes reading, but we still need it
self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm, url=url)
self.fp = self.recorder
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler")
def _proxy_request(self):
# Build request
req_str = '{} {} {}\r\n'.format(self.command, self.path, self.request_version)
warcprox_meta = self.headers.get('Warcprox-Meta')
# Swallow headers that don't make sense to forward on, i.e. most
# hop-by-hop headers, see http://tools.ietf.org/html/rfc2616#section-13.5
# self.headers is an email.message.Message, which is case-insensitive
# and doesn't throw KeyError in __delitem__
for h in ('Connection', 'Proxy-Connection', 'Keep-Alive',
'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade',
'Warcprox-Meta'):
del self.headers[h]
# Add headers to the request
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join('{}: {}'.format(k,v) for (k,v) in self.headers.items())
req = req_str.encode('utf-8') + b'\r\n\r\n'
# Append message body if present to the request
if 'Content-Length' in self.headers:
req += self.rfile.read(int(self.headers['Content-Length']))
self.logger.debug('sending to remote server req={}'.format(repr(req)))
# Send it down the pipe!
self._proxy_sock.sendall(req)
# We want HTTPResponse's smarts about http and handling of
# non-compliant servers. But HTTPResponse.read() doesn't return the raw
# bytes read from the server, it unchunks them if they're chunked, and
# might do other stuff. We want to send the raw bytes back to the
# client. So we ignore the values returned by h.read() below. Instead
# the ProxyingRecordingHTTPResponse takes care of sending the raw bytes
# to the proxy client.
# Proxy and record the response
h = ProxyingRecordingHTTPResponse(self._proxy_sock,
proxy_dest=self.connection,
digest_algorithm=self.server.digest_algorithm,
url=self.url)
h.begin()
buf = h.read(8192)
while buf != b'':
buf = h.read(8192)
self.log_request(h.status, h.recorder.len)
remote_ip = self._proxy_sock.getpeername()[0]
# Let's close off the remote end
h.close()
self._proxy_sock.close()
# XXX Close connection to proxy client. Doing this because we were
# seeing some connection hangs and this seems to solve that problem.
# Not clear what the correct, optimal behavior is.
self.connection.close()
recorded_url = RecordedUrl(url=self.url, request_data=req,
response_recorder=h.recorder, remote_ip=remote_ip,
warcprox_meta=warcprox_meta,
status=h.status, size=h.recorder.len,
client_ip=self.client_address[0],
content_type=h.getheader("Content-Type"),
method=self.command)
self.server.recorded_url_q.put(recorded_url)
# deprecated
def do_PUTMETA(self):
self.do_WARCPROX_WRITE_RECORD(warc_type=warctools.WarcRecord.METADATA)
def do_WARCPROX_WRITE_RECORD(self, warc_type=None):
try:
self.url = self.path
if ('Content-Length' in self.headers and 'Content-Type' in self.headers
and (warc_type or 'WARC-Type' in self.headers)):
# stream this?
request_data = self.rfile.read(int(self.headers['Content-Length']))
warcprox_meta = self.headers.get('Warcprox-Meta')
rec_custom = RecordedUrl(url=self.url,
request_data=request_data,
response_recorder=None,
remote_ip=b'',
warcprox_meta=warcprox_meta,
content_type=self.headers['Content-Type'].encode('latin1'),
custom_type=warc_type or self.headers['WARC-Type'],
status=204, size=len(request_data),
client_ip=self.client_address[0],
method=self.command)
self.server.recorded_url_q.put(rec_custom)
self.send_response(204, 'OK')
else:
self.send_error(400, 'Bad request')
self.end_headers()
except:
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
raise
def log_error(self, fmt, *args):
# logging better handled elsewhere?
pass
def log_message(self, fmt, *args):
# logging better handled elsewhere?
pass
class RecordedUrl:
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None):
# XXX should test what happens with non-ascii url (when does
# url-encoding happen?)
if type(url) is not bytes:
self.url = url.encode('ascii')
else:
self.url = url
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
self.request_data = request_data
self.response_recorder = response_recorder
if warcprox_meta:
self.warcprox_meta = json.loads(warcprox_meta)
else:
self.warcprox_meta = {}
self.content_type = content_type
self.custom_type = custom_type
self.status = status
self.size = size
self.client_ip = client_ip
self.method = method
def __del__(self):
self.logger.debug("finished with %s", self)
if self.response_recorder:
self.response_recorder.tempfile.close()
self.response_recorder = None
class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, server_address=('localhost', 8000),
req_handler_class=WarcProxyHandler, bind_and_activate=True,
ca=None, recorded_url_q=None, digest_algorithm='sha1'):
http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.digest_algorithm = digest_algorithm
if ca is not None:
self.ca = ca
else:
ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
self.ca = CertificateAuthority(ca_file='warcprox-ca.pem',
certs_dir='./warcprox-ca',
ca_name=ca_name)
if recorded_url_q is not None:
self.recorded_url_q = recorded_url_q
else:
self.recorded_url_q = queue.Queue()
def server_activate(self):
http_server.HTTPServer.server_activate(self)
self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self):
self.logger.info('WarcProxy shutting down')
http_server.HTTPServer.server_close(self)

158
warcprox/writer.py Normal file
View File

@ -0,0 +1,158 @@
# vim:set sw=4 et:
from __future__ import absolute_import
import logging
from datetime import datetime
from hanzo import warctools
import time
import warcprox
import os
import socket
class WarcWriter:
logger = logging.getLogger("warcprox.writer.WarcWriter")
# port is only used for warc filename
def __init__(self, directory='./warcs', rollover_size=1000000000,
gzip=False, prefix='WARCPROX', port=0, digest_algorithm='sha1',
base32=False, rollover_idle_time=None):
self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self._last_activity = time.time()
self.gzip = gzip
self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32)
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
self.port = port
self._f = None
self._fpath = None
self._f_finalname = None
self._serial = 0
if not os.path.exists(directory):
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory)
def timestamp17(self):
now = datetime.utcnow()
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
def close_writer(self):
if self._fpath:
self.logger.info('closing {0}'.format(self._f_finalname))
self._f.close()
finalpath = os.path.sep.join([self.directory, self._f_finalname])
os.rename(self._fpath, finalpath)
self._fpath = None
self._f = None
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
self.close_writer()
if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial, os.getpid(),
socket.gethostname(), self.port, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open'])
self._f = open(self._fpath, 'wb')
warcinfo_record = self.record_builder.build_warcinfo_record(self._f_finalname)
self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers))
warcinfo_record.write_to(self._f, gzip=self.gzip)
self._serial += 1
return self._f
def write_records(self, recorded_url):
"""Returns tuple of records written, which are instances of
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
"offset" attributes."""
records = self.record_builder.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
for record in records:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
record.offset = offset
record.warc_filename = self._f_finalname
self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d',
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, record.offset)
self._f.flush()
self._last_activity = time.time()
return records
def maybe_idle_rollover(self):
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity))
self.close_writer()
class WarcWriterPool:
logger = logging.getLogger("warcprox.writer.WarcWriterPool")
def __init__(self, default_warc_writer=None):
if default_warc_writer:
self.default_warc_writer = default_warc_writer
else:
self.default_warc_writer = WarcWriter()
self.warc_writers = {} # {prefix:WarcWriter}
self._last_sync = time.time()
self.logger.info('directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
# chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
def _writer(self, recorded_url):
w = self.default_warc_writer
if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta:
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
prefix = recorded_url.warcprox_meta["warc-prefix"]
if not prefix in self.warc_writers:
self.warc_writers[prefix] = WarcWriter(prefix=prefix,
directory=self.default_warc_writer.directory,
rollover_size=self.default_warc_writer.rollover_size,
rollover_idle_time=self.default_warc_writer.rollover_idle_time,
gzip=self.default_warc_writer.gzip,
port=self.default_warc_writer.port,
digest_algorithm=self.default_warc_writer.record_builder.digest_algorithm,
base32=self.default_warc_writer.record_builder.base32)
w = self.warc_writers[prefix]
return w
def write_records(self, recorded_url):
"""Returns tuple of records written, which are instances of
hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and
"offset" attributes."""
return self._writer(recorded_url).write_records(recorded_url)
def maybe_idle_rollover(self):
self.default_warc_writer.maybe_idle_rollover()
for w in self.warc_writers.values():
w.maybe_idle_rollover()
def close_writers(self):
self.default_warc_writer.close_writer()
for w in self.warc_writers.values():
w.close_writer()

112
warcprox/writerthread.py Normal file
View File

@ -0,0 +1,112 @@
# vim:set sw=4 et:
from __future__ import absolute_import
try:
import queue
except ImportError:
import Queue as queue
import logging
import threading
import os
import hashlib
import time
import socket
import base64
from datetime import datetime
import hanzo.httptools
from hanzo import warctools
import warcprox
class WarcWriterThread(threading.Thread):
logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread")
def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, playback_index_db=None):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
self.stop = threading.Event()
if writer_pool:
self.writer_pool = writer_pool
else:
self.writer_pool = WarcWriterPool()
self.dedup_db = dedup_db
self.playback_index_db = playback_index_db
self._last_sync = time.time()
def run(self):
try:
while not self.stop.is_set():
try:
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url,
base32=self.writer_pool.default_warc_writer.record_builder.base32)
records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records)
except queue.Empty:
self.writer_pool.maybe_idle_rollover()
self._sync()
self.logger.info('WarcWriterThread shutting down')
self.writer_pool.close_writers()
except:
self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)
def _sync(self):
# XXX prob doesn't belong here (do we need it at all?)
if time.time() - self._last_sync > 60:
if self.dedup_db:
self.dedup_db.sync()
if self.playback_index_db:
self.playback_index_db.sync()
self._last_sync = time.time()
def _save_dedup_info(self, recorded_url, records):
if (self.dedup_db
and records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.writer_pool.default_warc_writer.record_builder.base32)
self.dedup_db.save(key, records[0], records[0].offset)
def _save_playback_info(self, recorded_url, records):
if self.playback_index_db is not None:
self.playback_index_db.save(records[0].warc_filename, records, records[0].offset)
# closest thing we have to heritrix crawl log at the moment
def _log(self, recorded_url, records):
def _decode(x):
if isinstance(x, bytes):
return x.decode("utf-8")
else:
return x
try:
payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8")
except:
payload_digest = "-"
mimetype = _decode(recorded_url.content_type)
if mimetype:
n = mimetype.find(";")
if n >= 0:
mimetype = mimetype[:n]
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
self.logger.info("{} {} {} {} {} size={} {} {} {} offset={}".format(
_decode(recorded_url.client_ip),
_decode(recorded_url.status),
_decode(recorded_url.method),
_decode(recorded_url.url),
mimetype,
recorded_url.size,
_decode(payload_digest),
_decode(records[0].get_header(warctools.WarcRecord.TYPE)),
_decode(records[0].warc_filename),
records[0].offset))
def _final_tasks(self, recorded_url, records):
self._save_dedup_info(recorded_url, records)
self._save_playback_info(recorded_url, records)
self._log(recorded_url, records)