diff --git a/setup.py b/setup.py index d8afc87..5bde72b 100755 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools.command.test import test as TestCommand import sys import setuptools -VERSION_BYTES = b'1.4' +VERSION_BYTES = b'1.5' def full_version_bytes(): import subprocess, time diff --git a/warcprox/__init__.py b/warcprox/__init__.py index e061a70..c3379c6 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,3 +1,18 @@ +# vim:set sw=4 et: + +import warcprox.controller as controller +import warcprox.playback as playback +import warcprox.dedup as dedup +import warcprox.warcproxy as warcproxy +import warcprox.mitmproxy as mitmproxy +import warcprox.writer as writer +import warcprox.warc as warc +import warcprox.writerthread as writerthread + +def digest_str(hash_obj, base32): + import base64 + return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if base32 else hash_obj.hexdigest().encode('ascii')) + def _read_version_bytes(): import os version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt']) diff --git a/warcprox/controller.py b/warcprox/controller.py index db76135..ba73859 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -5,9 +5,7 @@ from __future__ import absolute_import import logging import threading import time - -import warcprox.warcprox -import warcprox.warcwriter +import warcprox class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") @@ -61,8 +59,8 @@ class WarcproxController(object): self.proxy.shutdown() self.proxy.server_close() - if self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db is not None: - self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.close() + if self.warc_writer_thread.dedup_db is not None: + self.warc_writer_thread.dedup_db.close() if self.playback_proxy is not None: self.playback_proxy.shutdown() diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 99a8d55..65962f9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -14,6 +14,7 @@ import logging import os import json from hanzo import warctools +import warcprox class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -44,17 +45,21 @@ class DedupDb(object): json_value = json.dumps(py_value, separators=(',',':')) self.db[key] = json_value.encode('utf-8') - self.logger.debug('dedup db saved {}:{}'.format(key, json_value)) + self.logger.debug('dedup db saved %s:%s', key, json_value) def lookup(self, key): + result = None if key in self.db: json_result = self.db[key] result = json.loads(json_result.decode('utf-8')) result['i'] = result['i'].encode('latin1') result['u'] = result['u'].encode('latin1') result['d'] = result['d'].encode('latin1') - return result - else: - return None + self.logger.debug('dedup db lookup of key=%s returning %s', key, result) + return result +def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): + if recorded_url.response_recorder.payload_digest: + key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) + recorded_url.dedup_info = dedup_db.lookup(key) diff --git a/warcprox/main.py b/warcprox/main.py index 7a9f0b3..a98691d 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -18,14 +18,8 @@ import pprint import traceback import signal import threading - import certauth.certauth - -import warcprox.playback -import warcprox.dedup -import warcprox.warcwriter -import warcprox.warcprox -import warcprox.controller +import warcprox def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser = argparse.ArgumentParser(prog=prog, @@ -124,7 +118,7 @@ def main(argv=sys.argv): ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, ca_name=ca_name) - proxy = warcprox.warcprox.WarcProxy( + proxy = warcprox.warcproxy.WarcProxy( server_address=(args.address, int(args.port)), ca=ca, recorded_url_q=recorded_url_q, digest_algorithm=args.digest_algorithm) @@ -139,15 +133,15 @@ def main(argv=sys.argv): playback_index_db = None playback_proxy = None - default_warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory, + default_warc_writer = warcprox.writer.WarcWriter(directory=args.directory, gzip=args.gzip, prefix=args.prefix, port=int(args.port), rollover_size=int(args.size), base32=args.base32, - dedup_db=dedup_db, digest_algorithm=args.digest_algorithm, - playback_index_db=playback_index_db, + digest_algorithm=args.digest_algorithm, rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) - writer_pool=warcprox.warcwriter.WarcWriterPool(default_warc_writer) - warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, - writer_pool=writer_pool) + writer_pool=warcprox.writer.WarcWriterPool(default_warc_writer) + warc_writer_thread = warcprox.writerthread.WarcWriterThread( + recorded_url_q=recorded_url_q, writer_pool=writer_pool, + dedup_db=dedup_db, playback_index_db=playback_index_db) controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) diff --git a/warcprox/warc.py b/warcprox/warc.py new file mode 100644 index 0000000..91843a7 --- /dev/null +++ b/warcprox/warc.py @@ -0,0 +1,149 @@ +# vim:set sw=4 et: + +from __future__ import absolute_import + +import logging +import warcprox +import hashlib +import socket +import hanzo.httptools +from hanzo import warctools +import warcprox +from datetime import datetime + +class WarcRecordBuilder: + logger = logging.getLogger("warcprox.warc.WarcRecordBuilder") + + def __init__(self, digest_algorithm="sha1", base32=False): + self.digest_algorithm = digest_algorithm + self.base32 = base32 + + def _build_response_principal_record(self, recorded_url, warc_date): + """Builds response or revisit record, whichever is appropriate.""" + if hasattr(recorded_url, "dedup_info") and recorded_url.dedup_info: + # revisit record + recorded_url.response_recorder.tempfile.seek(0) + if recorded_url.response_recorder.payload_offset is not None: + response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset) + else: + response_header_block = recorded_url.response_recorder.tempfile.read() + + return self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + data=response_header_block, + warc_type=warctools.WarcRecord.REVISIT, + refers_to=recorded_url.dedup_info['i'], + refers_to_target_uri=recorded_url.dedup_info['u'], + refers_to_date=recorded_url.dedup_info['d'], + payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), + profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST, + content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, + remote_ip=recorded_url.remote_ip) + else: + # response record + return self.build_warc_record( + url=recorded_url.url, warc_date=warc_date, + recorder=recorded_url.response_recorder, + warc_type=warctools.WarcRecord.RESPONSE, + content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, + remote_ip=recorded_url.remote_ip) + + def build_warc_records(self, recorded_url): + """Returns a tuple of hanzo.warctools.warc.WarcRecord (principal_record, ...)""" + warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) + + if recorded_url.response_recorder: + principal_record = self._build_response_principal_record(recorded_url, warc_date) + request_record = self.build_warc_record(url=recorded_url.url, + warc_date=warc_date, data=recorded_url.request_data, + warc_type=warctools.WarcRecord.REQUEST, + content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE, + concurrent_to=principal_record.id) + return principal_record, request_record + else: + principal_record = self.build_warc_record(url=recorded_url.url, + warc_date=warc_date, data=recorded_url.request_data, + warc_type=recorded_url.custom_type, + content_type=recorded_url.content_type) + return (principal_record,) + + def build_warc_record(self, url, warc_date=None, recorder=None, data=None, + concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, + profile=None, refers_to=None, refers_to_target_uri=None, + refers_to_date=None, payload_digest=None): + + if warc_date is None: + warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) + + record_id = warctools.WarcRecord.random_warc_uuid() + + headers = [] + if warc_type is not None: + headers.append((warctools.WarcRecord.TYPE, warc_type)) + headers.append((warctools.WarcRecord.ID, record_id)) + headers.append((warctools.WarcRecord.DATE, warc_date)) + headers.append((warctools.WarcRecord.URL, url)) + if remote_ip is not None: + headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) + if profile is not None: + headers.append((warctools.WarcRecord.PROFILE, profile)) + if refers_to is not None: + headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) + if refers_to_target_uri is not None: + headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) + if refers_to_date is not None: + headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date)) + if concurrent_to is not None: + headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) + if content_type is not None: + headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type)) + if payload_digest is not None: + headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, payload_digest)) + + if recorder is not None: + headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1'))) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, + warcprox.digest_str(recorder.block_digest, self.base32))) + if recorder.payload_digest is not None: + headers.append((warctools.WarcRecord.PAYLOAD_DIGEST, + warcprox.digest_str(recorder.payload_digest, self.base32))) + + recorder.tempfile.seek(0) + record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile) + + else: + headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1'))) + block_digest = hashlib.new(self.digest_algorithm, data) + headers.append((warctools.WarcRecord.BLOCK_DIGEST, + warcprox.digest_str(block_digest, self.base32))) + + content_tuple = content_type, data + record = warctools.WarcRecord(headers=headers, content=content_tuple) + + return record + + def build_warcinfo_record(self, filename): + warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow()) + record_id = warctools.WarcRecord.random_warc_uuid() + + headers = [] + headers.append((warctools.WarcRecord.ID, record_id)) + headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO)) + headers.append((warctools.WarcRecord.FILENAME, filename.encode('latin1'))) + headers.append((warctools.WarcRecord.DATE, warc_record_date)) + + warcinfo_fields = [] + warcinfo_fields.append(b'software: warcprox ' + warcprox.version_bytes) + hostname = socket.gethostname() + warcinfo_fields.append('hostname: {}'.format(hostname).encode('latin1')) + warcinfo_fields.append('ip: {}'.format(socket.gethostbyname(hostname)).encode('latin1')) + warcinfo_fields.append(b'format: WARC File Format 1.0') + # warcinfo_fields.append('robots: ignore') + # warcinfo_fields.append('description: {0}'.format(self.description)) + # warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of)) + data = b'\r\n'.join(warcinfo_fields) + b'\r\n' + + record = warctools.WarcRecord(headers=headers, content=(b'application/warc-fields', data)) + + return record + diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 5a0dc58..19e207f 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -39,7 +39,7 @@ import socket from hanzo import warctools from certauth.certauth import CertificateAuthority -import warcprox.mitmproxy +import warcprox class ProxyingRecorder(object): """ @@ -47,7 +47,7 @@ class ProxyingRecorder(object): calculating digests, and sending them on to the proxy client. """ - logger = logging.getLogger("warcprox.warcprox.ProxyingRecorder") + logger = logging.getLogger("warcprox.warcproxy.ProxyingRecorder") def __init__(self, fp, proxy_dest, digest_algorithm='sha1', url=None): self.fp = fp @@ -153,7 +153,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): - logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler") + logger = logging.getLogger("warcprox.warcproxy.WarcProxyHandler") def _proxy_request(self): # Build request @@ -273,7 +273,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): pass -class RecordedUrl(object): +class RecordedUrl: def __init__(self, url, request_data, response_recorder, remote_ip, warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None): @@ -305,8 +305,15 @@ class RecordedUrl(object): self.client_ip = client_ip self.method = method + def __del__(self): + self.logger.info("finished with %s", self) + if self.response_recorder: + self.response_recorder.tempfile.close() + self.response_recorder = None + + class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): - logger = logging.getLogger("warcprox.warcprox.WarcProxy") + logger = logging.getLogger("warcprox.warcproxy.WarcProxy") def __init__(self, server_address=('localhost', 8000), req_handler_class=WarcProxyHandler, bind_and_activate=True, diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py new file mode 100644 index 0000000..c47e11c --- /dev/null +++ b/warcprox/warcproxy.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python +# vim:set sw=4 et: +# +""" +WARC writing MITM HTTP/S proxy + +See README.rst or https://github.com/internetarchive/warcprox +""" + +from __future__ import absolute_import + +try: + import http.server as http_server +except ImportError: + import BaseHTTPServer as http_server + +try: + import socketserver +except ImportError: + import SocketServer as socketserver + +try: + import queue +except ImportError: + import Queue as queue + +try: + import http.client as http_client +except ImportError: + import httplib as http_client + +import logging +import re +import tempfile +import traceback +import hashlib +import json +import socket +from hanzo import warctools + +from certauth.certauth import CertificateAuthority +import warcprox.mitmproxy + +class ProxyingRecorder(object): + """ + Wraps a socket._fileobject, recording the bytes as they are read, + calculating digests, and sending them on to the proxy client. + """ + + logger = logging.getLogger("warcprox.warcproxy.ProxyingRecorder") + + def __init__(self, fp, proxy_dest, digest_algorithm='sha1', url=None): + self.fp = fp + # "The file has no name, and will cease to exist when it is closed." + self.tempfile = tempfile.SpooledTemporaryFile(max_size=512*1024) + self.digest_algorithm = digest_algorithm + self.block_digest = hashlib.new(digest_algorithm) + self.payload_offset = None + self.payload_digest = None + self.proxy_dest = proxy_dest + self._proxy_dest_conn_open = True + self._prev_hunk_last_two_bytes = b'' + self.len = 0 + self.url = url + + def _update_payload_digest(self, hunk): + if self.payload_digest is None: + # convoluted handling of two newlines crossing hunks + # XXX write tests for this + if self._prev_hunk_last_two_bytes.endswith(b'\n'): + if hunk.startswith(b'\n'): + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[1:]) + self.payload_offset = self.len + 1 + elif hunk.startswith(b'\r\n'): + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[2:]) + self.payload_offset = self.len + 2 + elif self._prev_hunk_last_two_bytes == b'\n\r': + if hunk.startswith(b'\n'): + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[1:]) + self.payload_offset = self.len + 1 + else: + m = re.search(br'\n\r?\n', hunk) + if m is not None: + self.payload_digest = hashlib.new(self.digest_algorithm) + self.payload_digest.update(hunk[m.end():]) + self.payload_offset = self.len + m.end() + + # if we still haven't found start of payload hold on to these bytes + if self.payload_digest is None: + self._prev_hunk_last_two_bytes = hunk[-2:] + else: + self.payload_digest.update(hunk) + + def _update(self, hunk): + self._update_payload_digest(hunk) + self.block_digest.update(hunk) + + self.tempfile.write(hunk) + + if self._proxy_dest_conn_open: + try: + self.proxy_dest.sendall(hunk) + except BaseException as e: + self._proxy_dest_conn_open = False + self.logger.warn('{} sending data to proxy client for url {}'.format(e, self.url)) + self.logger.info('will continue downloading from remote server without sending to client {}'.format(self.url)) + + self.len += len(hunk) + + def read(self, size=-1): + hunk = self.fp.read(size) + self._update(hunk) + return hunk + + def readinto(self, b): + n = self.fp.readinto(b) + self._update(b[:n]) + return n + + def readline(self, size=-1): + # XXX depends on implementation details of self.fp.readline(), in + # particular that it doesn't call self.fp.read() + hunk = self.fp.readline(size) + self._update(hunk) + return hunk + + def close(self): + return self.fp.close() + + def __len__(self): + return self.len + + def payload_size(self): + if self.payload_offset is not None: + return self.len - self.payload_offset + else: + return 0 + + +class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): + + def __init__(self, sock, debuglevel=0, method=None, proxy_dest=None, digest_algorithm='sha1', url=None): + http_client.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, method=method) + self.url = url + + # Keep around extra reference to self.fp because HTTPResponse sets + # self.fp=None after it finishes reading, but we still need it + self.recorder = ProxyingRecorder(self.fp, proxy_dest, digest_algorithm, url=url) + self.fp = self.recorder + + +class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): + logger = logging.getLogger("warcprox.warcprox.WarcProxyHandler") + + def _proxy_request(self): + # Build request + req_str = '{} {} {}\r\n'.format(self.command, self.path, self.request_version) + + warcprox_meta = self.headers.get('Warcprox-Meta') + + # Swallow headers that don't make sense to forward on, i.e. most + # hop-by-hop headers, see http://tools.ietf.org/html/rfc2616#section-13.5 + # self.headers is an email.message.Message, which is case-insensitive + # and doesn't throw KeyError in __delitem__ + for h in ('Connection', 'Proxy-Connection', 'Keep-Alive', + 'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade', + 'Warcprox-Meta'): + del self.headers[h] + + # Add headers to the request + # XXX in at least python3.3 str(self.headers) uses \n not \r\n :( + req_str += '\r\n'.join('{}: {}'.format(k,v) for (k,v) in self.headers.items()) + + req = req_str.encode('utf-8') + b'\r\n\r\n' + + # Append message body if present to the request + if 'Content-Length' in self.headers: + req += self.rfile.read(int(self.headers['Content-Length'])) + + self.logger.debug('sending to remote server req={}'.format(repr(req))) + + # Send it down the pipe! + self._proxy_sock.sendall(req) + + # We want HTTPResponse's smarts about http and handling of + # non-compliant servers. But HTTPResponse.read() doesn't return the raw + # bytes read from the server, it unchunks them if they're chunked, and + # might do other stuff. We want to send the raw bytes back to the + # client. So we ignore the values returned by h.read() below. Instead + # the ProxyingRecordingHTTPResponse takes care of sending the raw bytes + # to the proxy client. + + # Proxy and record the response + h = ProxyingRecordingHTTPResponse(self._proxy_sock, + proxy_dest=self.connection, + digest_algorithm=self.server.digest_algorithm, + url=self.url) + h.begin() + + buf = h.read(8192) + while buf != b'': + buf = h.read(8192) + + self.log_request(h.status, h.recorder.len) + + remote_ip = self._proxy_sock.getpeername()[0] + + # Let's close off the remote end + h.close() + self._proxy_sock.close() + + # XXX Close connection to proxy client. Doing this because we were + # seeing some connection hangs and this seems to solve that problem. + # Not clear what the correct, optimal behavior is. + self.connection.close() + + recorded_url = RecordedUrl(url=self.url, request_data=req, + response_recorder=h.recorder, remote_ip=remote_ip, + warcprox_meta=warcprox_meta, + status=h.status, size=h.recorder.len, + client_ip=self.client_address[0], + content_type=h.getheader("Content-Type"), + method=self.command) + self.server.recorded_url_q.put(recorded_url) + + # deprecated + def do_PUTMETA(self): + self.do_WARCPROX_WRITE_RECORD(warc_type=warctools.WarcRecord.METADATA) + + def do_WARCPROX_WRITE_RECORD(self, warc_type=None): + try: + self.url = self.path + + if ('Content-Length' in self.headers and 'Content-Type' in self.headers + and (warc_type or 'WARC-Type' in self.headers)): + # stream this? + request_data = self.rfile.read(int(self.headers['Content-Length'])) + + warcprox_meta = self.headers.get('Warcprox-Meta') + + rec_custom = RecordedUrl(url=self.url, + request_data=request_data, + response_recorder=None, + remote_ip=b'', + warcprox_meta=warcprox_meta, + content_type=self.headers['Content-Type'].encode('latin1'), + custom_type=warc_type or self.headers['WARC-Type'], + status=204, size=len(request_data), + client_ip=self.client_address[0], + method=self.command) + + self.server.recorded_url_q.put(rec_custom) + self.send_response(204, 'OK') + else: + self.send_error(400, 'Bad request') + + self.end_headers() + except: + self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) + raise + + def log_error(self, fmt, *args): + # logging better handled elsewhere? + pass + + def log_message(self, fmt, *args): + # logging better handled elsewhere? + pass + + +class RecordedUrl: + logger = logging.getLogger("warcprox.warcproxy.RecordedUrl") + + def __init__(self, url, request_data, response_recorder, remote_ip, + warcprox_meta=None, content_type=None, custom_type=None, + status=None, size=None, client_ip=None, method=None): + # XXX should test what happens with non-ascii url (when does + # url-encoding happen?) + if type(url) is not bytes: + self.url = url.encode('ascii') + else: + self.url = url + + if type(remote_ip) is not bytes: + self.remote_ip = remote_ip.encode('ascii') + else: + self.remote_ip = remote_ip + + self.request_data = request_data + self.response_recorder = response_recorder + + if warcprox_meta: + self.warcprox_meta = json.loads(warcprox_meta) + else: + self.warcprox_meta = {} + + self.content_type = content_type + self.custom_type = custom_type + + self.status = status + self.size = size + self.client_ip = client_ip + self.method = method + + def __del__(self): + self.logger.debug("finished with %s", self) + if self.response_recorder: + self.response_recorder.tempfile.close() + self.response_recorder = None + + +class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): + logger = logging.getLogger("warcprox.warcproxy.WarcProxy") + + def __init__(self, server_address=('localhost', 8000), + req_handler_class=WarcProxyHandler, bind_and_activate=True, + ca=None, recorded_url_q=None, digest_algorithm='sha1'): + http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) + + self.digest_algorithm = digest_algorithm + + if ca is not None: + self.ca = ca + else: + ca_name = 'Warcprox CA on {}'.format(socket.gethostname())[:64] + self.ca = CertificateAuthority(ca_file='warcprox-ca.pem', + certs_dir='./warcprox-ca', + ca_name=ca_name) + + if recorded_url_q is not None: + self.recorded_url_q = recorded_url_q + else: + self.recorded_url_q = queue.Queue() + + def server_activate(self): + http_server.HTTPServer.server_activate(self) + self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) + + def server_close(self): + self.logger.info('WarcProxy shutting down') + http_server.HTTPServer.server_close(self) + diff --git a/warcprox/writer.py b/warcprox/writer.py new file mode 100644 index 0000000..02dee72 --- /dev/null +++ b/warcprox/writer.py @@ -0,0 +1,158 @@ +# vim:set sw=4 et: + +from __future__ import absolute_import + +import logging +from datetime import datetime +from hanzo import warctools +import time +import warcprox +import os +import socket + +class WarcWriter: + logger = logging.getLogger("warcprox.writer.WarcWriter") + + # port is only used for warc filename + def __init__(self, directory='./warcs', rollover_size=1000000000, + gzip=False, prefix='WARCPROX', port=0, digest_algorithm='sha1', + base32=False, rollover_idle_time=None): + + self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + self._last_activity = time.time() + + self.gzip = gzip + self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32) + + # warc path and filename stuff + self.directory = directory + self.prefix = prefix + self.port = port + + self._f = None + self._fpath = None + self._f_finalname = None + self._serial = 0 + + if not os.path.exists(directory): + self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) + os.mkdir(directory) + + def timestamp17(self): + now = datetime.utcnow() + return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) + + def close_writer(self): + if self._fpath: + self.logger.info('closing {0}'.format(self._f_finalname)) + self._f.close() + finalpath = os.path.sep.join([self.directory, self._f_finalname]) + os.rename(self._fpath, finalpath) + + self._fpath = None + self._f = None + + # + def _writer(self): + if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: + self.close_writer() + + if self._f == None: + self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( + self.prefix, self.timestamp17(), self._serial, os.getpid(), + socket.gethostname(), self.port, '.gz' if self.gzip else '') + self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open']) + + self._f = open(self._fpath, 'wb') + + warcinfo_record = self.record_builder.build_warcinfo_record(self._f_finalname) + self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers)) + warcinfo_record.write_to(self._f, gzip=self.gzip) + + self._serial += 1 + + return self._f + + def write_records(self, recorded_url): + """Returns tuple of records written, which are instances of + hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and + "offset" attributes.""" + records = self.record_builder.build_warc_records(recorded_url) + + writer = self._writer() + recordset_offset = writer.tell() + + for record in records: + offset = writer.tell() + record.write_to(writer, gzip=self.gzip) + record.offset = offset + record.warc_filename = self._f_finalname + self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d', + record.get_header(warctools.WarcRecord.TYPE), + record.get_header(warctools.WarcRecord.CONTENT_LENGTH), + record.get_header(warctools.WarcRecord.URL), + self._fpath, record.offset) + + self._f.flush() + self._last_activity = time.time() + + return records + + def maybe_idle_rollover(self): + if (self._fpath is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity)) + self.close_writer() + +class WarcWriterPool: + logger = logging.getLogger("warcprox.writer.WarcWriterPool") + + def __init__(self, default_warc_writer=None): + if default_warc_writer: + self.default_warc_writer = default_warc_writer + else: + self.default_warc_writer = WarcWriter() + self.warc_writers = {} # {prefix:WarcWriter} + self._last_sync = time.time() + + self.logger.info('directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( + os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, + self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) + + # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set + def _writer(self, recorded_url): + w = self.default_warc_writer + if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta: + # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) + prefix = recorded_url.warcprox_meta["warc-prefix"] + if not prefix in self.warc_writers: + self.warc_writers[prefix] = WarcWriter(prefix=prefix, + directory=self.default_warc_writer.directory, + rollover_size=self.default_warc_writer.rollover_size, + rollover_idle_time=self.default_warc_writer.rollover_idle_time, + gzip=self.default_warc_writer.gzip, + port=self.default_warc_writer.port, + digest_algorithm=self.default_warc_writer.record_builder.digest_algorithm, + base32=self.default_warc_writer.record_builder.base32) + w = self.warc_writers[prefix] + return w + + def write_records(self, recorded_url): + """Returns tuple of records written, which are instances of + hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and + "offset" attributes.""" + return self._writer(recorded_url).write_records(recorded_url) + + def maybe_idle_rollover(self): + self.default_warc_writer.maybe_idle_rollover() + for w in self.warc_writers.values(): + w.maybe_idle_rollover() + + def close_writers(self): + self.default_warc_writer.close_writer() + for w in self.warc_writers.values(): + w.close_writer() + diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py new file mode 100644 index 0000000..ceb34cd --- /dev/null +++ b/warcprox/writerthread.py @@ -0,0 +1,112 @@ +# vim:set sw=4 et: + +from __future__ import absolute_import + +try: + import queue +except ImportError: + import Queue as queue + +import logging +import threading +import os +import hashlib +import time +import socket +import base64 +from datetime import datetime +import hanzo.httptools +from hanzo import warctools +import warcprox + +class WarcWriterThread(threading.Thread): + logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") + + def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, playback_index_db=None): + """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" + threading.Thread.__init__(self, name='WarcWriterThread') + self.recorded_url_q = recorded_url_q + self.stop = threading.Event() + if writer_pool: + self.writer_pool = writer_pool + else: + self.writer_pool = WarcWriterPool() + self.dedup_db = dedup_db + self.playback_index_db = playback_index_db + self._last_sync = time.time() + + def run(self): + try: + while not self.stop.is_set(): + try: + recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + if self.dedup_db: + warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url, + base32=self.writer_pool.default_warc_writer.record_builder.base32) + records = self.writer_pool.write_records(recorded_url) + self._final_tasks(recorded_url, records) + except queue.Empty: + self.writer_pool.maybe_idle_rollover() + self._sync() + + self.logger.info('WarcWriterThread shutting down') + self.writer_pool.close_writers() + except: + self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True) + + def _sync(self): + # XXX prob doesn't belong here (do we need it at all?) + if time.time() - self._last_sync > 60: + if self.dedup_db: + self.dedup_db.sync() + if self.playback_index_db: + self.playback_index_db.sync() + self._last_sync = time.time() + + def _save_dedup_info(self, recorded_url, records): + if (self.dedup_db + and records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, + self.writer_pool.default_warc_writer.record_builder.base32) + self.dedup_db.save(key, records[0], records[0].offset) + + def _save_playback_info(self, recorded_url, records): + if self.playback_index_db is not None: + self.playback_index_db.save(records[0].warc_filename, records, records[0].offset) + + # closest thing we have to heritrix crawl log at the moment + def _log(self, recorded_url, records): + def _decode(x): + if isinstance(x, bytes): + return x.decode("utf-8") + else: + return x + + try: + payload_digest = records[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8") + except: + payload_digest = "-" + mimetype = _decode(recorded_url.content_type) + if mimetype: + n = mimetype.find(";") + if n >= 0: + mimetype = mimetype[:n] + + # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} + self.logger.info("{} {} {} {} {} size={} {} {} {} offset={}".format( + _decode(recorded_url.client_ip), + _decode(recorded_url.status), + _decode(recorded_url.method), + _decode(recorded_url.url), + mimetype, + recorded_url.size, + _decode(payload_digest), + _decode(records[0].get_header(warctools.WarcRecord.TYPE)), + _decode(records[0].warc_filename), + records[0].offset)) + + def _final_tasks(self, recorded_url, records): + self._save_dedup_info(recorded_url, records) + self._save_playback_info(recorded_url, records) + self._log(recorded_url, records)