split into multiple files

This commit is contained in:
Noah Levitt 2014-11-15 03:20:05 -08:00
parent e8438dc8ad
commit b34edf8fb1
11 changed files with 899 additions and 845 deletions

View File

@ -13,7 +13,7 @@ pymiproxy, warcprox is also GPL.
Install
~~~~~~~
Warcprox runs on python 2.7 or python 3.2+.
Warcprox runs on python 3.4.
To install latest release run:

View File

@ -58,10 +58,6 @@ setuptools.setup(name='warcprox',
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'License :: OSI Approved :: GNU General Public License (GPL)',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.2',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Topic :: Internet :: Proxy Servers',
'Topic :: Internet :: WWW/HTTP',

View File

@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
envlist = py27, py32, py33, py34
envlist = py34
[testenv]
commands = py.test

View File

@ -0,0 +1,8 @@
def _read_version_bytes():
import os
version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt'])
with open(version_txt, 'rb') as fin:
return fin.read().strip()
version_bytes = _read_version_bytes().strip()
version_str = version_bytes.decode('utf-8')

91
warcprox/certauth.py Normal file
View File

@ -0,0 +1,91 @@
# vim:set sw=4 et:
import logging
import os
import OpenSSL
import socket
import random
class CertificateAuthority(object):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca'):
self.ca_file = ca_file
self.certs_dir = certs_dir
if not os.path.exists(ca_file):
self._generate_ca()
else:
self._read_ca(ca_file)
if not os.path.exists(certs_dir):
self.logger.info("directory for generated certs {} doesn't exist, creating it".format(certs_dir))
os.mkdir(certs_dir)
def _generate_ca(self):
# Generate key
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
# Generate certificate
self.cert = OpenSSL.crypto.X509()
self.cert.set_version(2)
# avoid sec_error_reused_issuer_and_serial
self.cert.set_serial_number(random.randint(0,2**64-1))
self.cert.get_subject().CN = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
self.cert.gmtime_adj_notBefore(0) # now
self.cert.gmtime_adj_notAfter(100*365*24*60*60) # 100 yrs in future
self.cert.set_issuer(self.cert.get_subject())
self.cert.set_pubkey(self.key)
self.cert.add_extensions([
OpenSSL.crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
OpenSSL.crypto.X509Extension(b"keyUsage", True, b"keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=self.cert),
])
self.cert.sign(self.key, "sha1")
with open(self.ca_file, 'wb+') as f:
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, self.key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, self.cert))
self.logger.info('generated CA key+cert and wrote to {}'.format(self.ca_file))
def _read_ca(self, filename):
self.cert = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, open(filename).read())
self.key = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, open(filename).read())
self.logger.info('read CA key+cert from {}'.format(self.ca_file))
def __getitem__(self, cn):
cnp = os.path.sep.join([self.certs_dir, '%s.pem' % cn])
if not os.path.exists(cnp):
# create certificate
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
# Generate CSR
req = OpenSSL.crypto.X509Req()
req.get_subject().CN = cn
req.set_pubkey(key)
req.sign(key, 'sha1')
# Sign CSR
cert = OpenSSL.crypto.X509()
cert.set_subject(req.get_subject())
cert.set_serial_number(random.randint(0,2**64-1))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10*365*24*60*60)
cert.set_issuer(self.cert.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(self.key, 'sha1')
with open(cnp, 'wb+') as f:
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
self.logger.info('wrote generated key+cert to {}'.format(cnp))
return cnp

56
warcprox/dedup.py Normal file
View File

@ -0,0 +1,56 @@
# vim:set sw=4 et:
try:
import dbm.gnu as dbm_gnu
except ImportError:
try:
import gdbm as dbm_gnu
except ImportError:
import anydbm as dbm_gnu
import logging
import os
import json
from hanzo import warctools
class DedupDb(object):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, dbm_file='./warcprox-dedup.db'):
if os.path.exists(dbm_file):
self.logger.info('opening existing deduplication database {}'.format(dbm_file))
else:
self.logger.info('creating new deduplication database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c')
def close(self):
self.db.close()
def sync(self):
if hasattr(self.db, 'sync'):
self.db.sync()
def save(self, key, response_record, offset):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
py_value = {'i':record_id, 'u':url, 'd':date}
json_value = json.dumps(py_value, separators=(',',':'))
self.db[key] = json_value.encode('utf-8')
self.logger.debug('dedup db saved {}:{}'.format(key, json_value))
def lookup(self, key):
if key in self.db:
json_result = self.db[key]
result = json.loads(json_result.decode('utf-8'))
result['i'] = result['i'].encode('latin1')
result['u'] = result['u'].encode('latin1')
result['d'] = result['d'].encode('latin1')
return result
else:
return None

145
warcprox/mitmproxy.py Normal file
View File

@ -0,0 +1,145 @@
# vim:set sw=4 et:
try:
import http.server as http_server
except ImportError:
import BaseHTTPServer as http_server
try:
import urllib.parse as urllib_parse
except ImportError:
import urlparse as urllib_parse
import socket
import logging
import ssl
class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, request, client_address, server):
self.is_connect = False
## XXX hack around bizarre bug on my mac python 3.2 in http.server
## where hasattr returns true in the code snippet below, but
## self._headers_buffer is None
#
# if not hasattr(self, '_headers_buffer'):
# self._headers_buffer = []
# self._headers_buffer.append(
self._headers_buffer = []
http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def _determine_host_port(self):
# Get hostname and port to connect to
if self.is_connect:
self.hostname, self.port = self.path.split(':')
else:
self.url = self.path
u = urllib_parse.urlparse(self.url)
if u.scheme != 'http':
raise Exception('Unknown scheme %s' % repr(u.scheme))
self.hostname = u.hostname
self.port = u.port or 80
self.path = urllib_parse.urlunparse(
urllib_parse.ParseResult(
scheme='',
netloc='',
params=u.params,
path=u.path or '/',
query=u.query,
fragment=u.fragment
)
)
def _connect_to_host(self):
# Connect to destination
self._proxy_sock = socket.socket()
self._proxy_sock.settimeout(10)
self._proxy_sock.connect((self.hostname, int(self.port)))
# Wrap socket if SSL is required
if self.is_connect:
self._proxy_sock = ssl.wrap_socket(self._proxy_sock)
def _transition_to_ssl(self):
self.request = self.connection = ssl.wrap_socket(self.connection,
server_side=True, certfile=self.server.ca[self.hostname])
def do_CONNECT(self):
self.is_connect = True
try:
# Connect to destination first
self._determine_host_port()
self._connect_to_host()
# If successful, let's do this!
self.send_response(200, 'Connection established')
self.end_headers()
self._transition_to_ssl()
except Exception as e:
self.send_error(500, str(e))
return
# Reload!
self.setup()
self.handle_one_request()
def _construct_tunneled_url(self):
if int(self.port) == 443:
netloc = self.hostname
else:
netloc = '{}:{}'.format(self.hostname, self.port)
result = urllib_parse.urlunparse(
urllib_parse.ParseResult(
scheme='https',
netloc=netloc,
params='',
path=self.path,
query='',
fragment=''
)
)
return result
def do_COMMAND(self):
if not self.is_connect:
try:
# Connect to destination
self._determine_host_port()
self._connect_to_host()
assert self.url
except Exception as e:
self.send_error(500, str(e))
return
else:
# if self.is_connect we already connected in do_CONNECT
self.url = self._construct_tunneled_url()
self._proxy_request()
def _proxy_request(self):
raise Exception('_proxy_request() not implemented in MitmProxyHandler, must be implemented in subclass!')
def __getattr__(self, item):
if item.startswith('do_'):
return self.do_COMMAND
def log_error(self, fmt, *args):
self.logger.error("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
def log_message(self, fmt, *args):
self.logger.info("{} {} - - [{}] {}".format(self.__class__.__name__,
self.address_string(), self.log_date_time_string(), fmt % args))

280
warcprox/playback.py Normal file
View File

@ -0,0 +1,280 @@
# vim:set sw=4 et:
try:
import http.server as http_server
except ImportError:
import BaseHTTPServer as http_server
try:
import socketserver
except ImportError:
import SocketServer as socketserver
try:
import dbm.gnu as dbm_gnu
except ImportError:
try:
import gdbm as dbm_gnu
except ImportError:
import anydbm as dbm_gnu
import logging
import os
from hanzo import warctools
import json
import traceback
import re
from warcprox.mitmproxy import MitmProxyHandler
class PlaybackProxyHandler(MitmProxyHandler):
logger = logging.getLogger(__module__ + "." + __qualname__)
# @Override
def _connect_to_host(self):
# don't connect to host!
pass
# @Override
def _proxy_request(self):
date, location = self.server.playback_index_db.lookup_latest(self.url.encode('utf-8'))
self.logger.debug('lookup_latest returned {}:{}'.format(date, location))
status = None
if location is not None:
try:
status, sz = self._send_response_from_warc(location['f'], location['o'])
except:
status = 500
self.logger.error('PlaybackProxyHandler problem playing back {}'.format(self.url), exc_info=1)
payload = '500 Warcprox Error\n\n{}\n'.format(traceback.format_exc()).encode('utf-8')
headers = (b'HTTP/1.1 500 Internal Server Error\r\n'
+ b'Content-Type: text/plain;charset=utf-8\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('utf-8') + b'\r\n'
+ b'\r\n')
self.connection.sendall(headers)
self.connection.sendall(payload)
sz = len(headers) + len(payload)
else:
status = 404
payload = b'404 Not in Archive\n'
headers = (b'HTTP/1.1 404 Not Found\r\n'
+ b'Content-Type: text/plain;charset=utf-8\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n'
+ b'\r\n')
self.connection.sendall(headers)
self.connection.sendall(payload)
sz = len(headers) + len(payload)
self.log_message('"%s" %s %s %s',
self.requestline, str(status), str(sz), repr(location) if location else '-')
def _open_warc_at_offset(self, warcfilename, offset):
self.logger.debug('opening {} at offset {}'.format(warcfilename, offset))
warcpath = None
for p in (os.path.sep.join([self.server.warcs_dir, warcfilename]),
os.path.sep.join([self.server.warcs_dir, '{}.open'.format(warcfilename)])):
if os.path.exists(p):
warcpath = p
if warcpath is None:
raise Exception('{} not found'.format(warcfilename))
return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset)
def _send_response(self, headers, payload_fh):
status = '-'
m = re.match(br'^HTTP/\d\.\d (\d{3})', headers)
if m is not None:
status = m.group(1)
self.connection.sendall(headers)
sz = len(headers)
while True:
buf = payload_fh.read(8192)
if buf == b'': break
self.connection.sendall(buf)
sz += len(buf)
return status, sz
def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date):
location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to)
self.logger.debug('loading http payload from {}'.format(location))
fh = self._open_warc_at_offset(location['f'], location['o'])
try:
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type))
# find end of headers
while True:
line = record.content_file.readline()
if line == b'' or re.match(br'^\r?\n$', line):
break
return self._send_response(headers, record.content_file)
finally:
fh.close()
def _send_response_from_warc(self, warcfilename, offset):
fh = self._open_warc_at_offset(warcfilename, offset)
try:
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type == warctools.WarcRecord.RESPONSE:
headers_buf = bytearray()
while True:
line = record.content_file.readline()
headers_buf.extend(line)
if line == b'' or re.match(b'^\r?\n$', line):
break
return self._send_response(headers_buf, record.content_file)
elif warc_type == warctools.WarcRecord.REVISIT:
# response consists of http headers from revisit record and
# payload from the referenced record
warc_profile = record.get_header(warctools.WarcRecord.PROFILE)
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
raise Exception('unknown revisit record profile {}'.format(warc_profile))
refers_to = record.get_header(warctools.WarcRecord.REFERS_TO)
refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI)
refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE)
self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri))
return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date)
else:
raise Exception('unknown warc record type {}'.format(warc_type))
finally:
fh.close()
raise Exception('should not reach this point')
class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, server_address, req_handler_class=PlaybackProxyHandler,
bind_and_activate=True, ca=None, playback_index_db=None,
warcs_dir=None):
http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = ca
self.playback_index_db = playback_index_db
self.warcs_dir = warcs_dir
def server_activate(self):
http_server.HTTPServer.server_activate(self)
self.logger.info('PlaybackProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self):
self.logger.info('PlaybackProxy shutting down')
http_server.HTTPServer.server_close(self)
class PlaybackIndexDb(object):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, dbm_file='./warcprox-playback-index.db'):
if os.path.exists(dbm_file):
self.logger.info('opening existing playback index database {}'.format(dbm_file))
else:
self.logger.info('creating new playback index database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c')
def close(self):
self.db.close()
def sync(self):
if hasattr(self.db, 'sync'):
self.db.sync()
def save(self, warcfile, recordset, offset):
response_record = recordset[0]
# XXX canonicalize url?
url = response_record.get_header(warctools.WarcRecord.URL)
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
# there could be two visits of same url in the same second, and WARC-Date is
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
if url in self.db:
existing_json_value = self.db[url].decode('utf-8')
py_value = json.loads(existing_json_value)
else:
py_value = {}
if date_str in py_value:
py_value[date_str].append({'f':warcfile, 'o':offset, 'i':record_id_str})
else:
py_value[date_str] = [{'f':warcfile, 'o':offset, 'i':record_id_str}]
json_value = json.dumps(py_value, separators=(',',':'))
self.db[url] = json_value.encode('utf-8')
self.logger.debug('playback index saved: {}:{}'.format(url, json_value))
def lookup_latest(self, url):
if url not in self.db:
return None, None
json_value = self.db[url].decode('utf-8')
self.logger.debug("{}:{}".format(repr(url), repr(json_value)))
py_value = json.loads(json_value)
latest_date = max(py_value)
result = py_value[latest_date][0]
result['i'] = result['i'].encode('ascii')
return latest_date, result
# in python3 params are bytes
def lookup_exact(self, url, warc_date, record_id):
if url not in self.db:
return None
json_value = self.db[url].decode('utf-8')
self.logger.debug("{}:{}".format(repr(url), repr(json_value)))
py_value = json.loads(json_value)
warc_date_str = warc_date.decode('ascii')
if warc_date_str in py_value:
for record in py_value[warc_date_str]:
if record['i'].encode('ascii') == record_id:
self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url)))
record['i'] = record['i'].encode('ascii')
return record
else:
self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url)))
return None

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python
# vim: set sw=4 et:
from warcprox import warcprox
import warcprox.warcprox
import warcprox.certauth
import unittest
import threading
import time
@ -117,11 +118,11 @@ class WarcproxTest(unittest.TestCase):
f.close() # delete it, or CertificateAuthority will try to read it
self._ca_file = f.name
self._ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca')
ca = warcprox.CertificateAuthority(self._ca_file, self._ca_dir)
ca = warcprox.certauth.CertificateAuthority(self._ca_file, self._ca_dir)
recorded_url_q = queue.Queue()
proxy = warcprox.WarcProxy(server_address=('localhost', 0), ca=ca,
proxy = warcprox.warcprox.WarcProxy(server_address=('localhost', 0), ca=ca,
recorded_url_q=recorded_url_q)
self._warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-warcs-')
@ -129,20 +130,20 @@ class WarcproxTest(unittest.TestCase):
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False)
f.close()
self._playback_index_db_file = f.name
playback_index_db = warcprox.PlaybackIndexDb(self._playback_index_db_file)
playback_proxy = warcprox.PlaybackProxy(server_address=('localhost', 0), ca=ca,
playback_index_db = warcprox.playback.PlaybackIndexDb(self._playback_index_db_file)
playback_proxy = warcprox.playback.PlaybackProxy(server_address=('localhost', 0), ca=ca,
playback_index_db=playback_index_db, warcs_dir=self._warcs_dir)
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False)
f.close()
self._dedup_db_file = f.name
dedup_db = warcprox.DedupDb(self._dedup_db_file)
dedup_db = warcprox.dedup.DedupDb(self._dedup_db_file)
warc_writer = warcprox.WarcWriterThread(recorded_url_q=recorded_url_q,
warc_writer = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
directory=self._warcs_dir, port=proxy.server_port,
dedup_db=dedup_db, playback_index_db=playback_index_db)
self.warcprox = warcprox.WarcproxController(proxy, warc_writer, playback_proxy)
self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer, playback_proxy)
self.logger.info('starting warcprox')
self.warcprox_thread = threading.Thread(name='WarcproxThread',
target=self.warcprox.run_until_shutdown)

View File

@ -7,164 +7,48 @@ WARC writing MITM HTTP/S proxy
See README.rst or https://github.com/internetarchive/warcprox
"""
from __future__ import absolute_import
try:
import http.server
http_server = http.server
import http.server as http_server
except ImportError:
import BaseHTTPServer
http_server = BaseHTTPServer
import BaseHTTPServer as http_server
try:
import socketserver
except ImportError:
import SocketServer
socketserver = SocketServer
try:
import urllib.parse
urllib_parse = urllib.parse
except ImportError:
import urlparse
urllib_parse = urlparse
import SocketServer as socketserver
try:
import queue
except ImportError:
import Queue
queue = Queue
import Queue as queue
try:
import http.client
http_client = http.client
import http.client as http_client
except ImportError:
import httplib
http_client = httplib
try:
import dbm.gnu
dbm_gnu = dbm.gnu
except ImportError:
try:
import gdbm
dbm_gnu = gdbm
except ImportError:
import anydbm
dbm_gnu = anydbm
try:
from io import StringIO
except ImportError:
from StringIO import StringIO
import httplib as http_client
import socket
import OpenSSL
import ssl
import logging
import sys
from hanzo import warctools, httptools
import hashlib
from datetime import datetime
import threading
import os
import argparse
import random
import re
import signal
import time
import tempfile
import base64
import json
import traceback
def _read_version_bytes():
version_txt = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['version.txt'])
with open(version_txt, 'rb') as fin:
return fin.read().strip()
version_bytes = _read_version_bytes()
version_str = version_bytes.strip().decode('utf-8')
class CertificateAuthority(object):
logger = logging.getLogger('warcprox.CertificateAuthority')
def __init__(self, ca_file='warcprox-ca.pem', certs_dir='./warcprox-ca'):
self.ca_file = ca_file
self.certs_dir = certs_dir
if not os.path.exists(ca_file):
self._generate_ca()
else:
self._read_ca(ca_file)
if not os.path.exists(certs_dir):
self.logger.info("directory for generated certs {} doesn't exist, creating it".format(certs_dir))
os.mkdir(certs_dir)
def _generate_ca(self):
# Generate key
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
# Generate certificate
self.cert = OpenSSL.crypto.X509()
self.cert.set_version(2)
# avoid sec_error_reused_issuer_and_serial
self.cert.set_serial_number(random.randint(0,2**64-1))
self.cert.get_subject().CN = 'Warcprox CA on {}'.format(socket.gethostname())[:64]
self.cert.gmtime_adj_notBefore(0) # now
self.cert.gmtime_adj_notAfter(100*365*24*60*60) # 100 yrs in future
self.cert.set_issuer(self.cert.get_subject())
self.cert.set_pubkey(self.key)
self.cert.add_extensions([
OpenSSL.crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
OpenSSL.crypto.X509Extension(b"keyUsage", True, b"keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=self.cert),
])
self.cert.sign(self.key, "sha1")
with open(self.ca_file, 'wb+') as f:
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, self.key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, self.cert))
self.logger.info('generated CA key+cert and wrote to {}'.format(self.ca_file))
def _read_ca(self, filename):
self.cert = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, open(filename).read())
self.key = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, open(filename).read())
self.logger.info('read CA key+cert from {}'.format(self.ca_file))
def __getitem__(self, cn):
cnp = os.path.sep.join([self.certs_dir, '%s.pem' % cn])
if not os.path.exists(cnp):
# create certificate
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
# Generate CSR
req = OpenSSL.crypto.X509Req()
req.get_subject().CN = cn
req.set_pubkey(key)
req.sign(key, 'sha1')
# Sign CSR
cert = OpenSSL.crypto.X509()
cert.set_subject(req.get_subject())
cert.set_serial_number(random.randint(0,2**64-1))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10*365*24*60*60)
cert.set_issuer(self.cert.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(self.key, 'sha1')
with open(cnp, 'wb+') as f:
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.SSL.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.SSL.FILETYPE_PEM, cert))
self.logger.info('wrote generated key+cert to {}'.format(cnp))
return cnp
import warcprox
from warcprox.mitmproxy import MitmProxyHandler
from warcprox.dedup import DedupDb
from warcprox.certauth import CertificateAuthority
from warcprox.warcwriter import WarcWriterThread
from warcprox import playback
class ProxyingRecorder(object):
@ -173,7 +57,7 @@ class ProxyingRecorder(object):
calculating digests, and sending them on to the proxy client.
"""
logger = logging.getLogger('warcprox.ProxyingRecordingHTTPResponse')
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, fp, proxy_dest, digest_algorithm='sha1'):
self.fp = fp
@ -275,138 +159,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.fp = self.recorder
class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
logger = logging.getLogger('warcprox.MitmProxyHandler')
def __init__(self, request, client_address, server):
self.is_connect = False
## XXX hack around bizarre bug on my mac python 3.2 in http.server
## where hasattr returns true in the code snippet below, but
## self._headers_buffer is None
#
# if not hasattr(self, '_headers_buffer'):
# self._headers_buffer = []
# self._headers_buffer.append(
self._headers_buffer = []
http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server)
def _determine_host_port(self):
# Get hostname and port to connect to
if self.is_connect:
self.hostname, self.port = self.path.split(':')
else:
self.url = self.path
u = urllib_parse.urlparse(self.url)
if u.scheme != 'http':
raise Exception('Unknown scheme %s' % repr(u.scheme))
self.hostname = u.hostname
self.port = u.port or 80
self.path = urllib_parse.urlunparse(
urllib_parse.ParseResult(
scheme='',
netloc='',
params=u.params,
path=u.path or '/',
query=u.query,
fragment=u.fragment
)
)
def _connect_to_host(self):
# Connect to destination
self._proxy_sock = socket.socket()
self._proxy_sock.settimeout(10)
self._proxy_sock.connect((self.hostname, int(self.port)))
# Wrap socket if SSL is required
if self.is_connect:
self._proxy_sock = ssl.wrap_socket(self._proxy_sock)
def _transition_to_ssl(self):
self.request = self.connection = ssl.wrap_socket(self.connection,
server_side=True, certfile=self.server.ca[self.hostname])
def do_CONNECT(self):
self.is_connect = True
try:
# Connect to destination first
self._determine_host_port()
self._connect_to_host()
# If successful, let's do this!
self.send_response(200, 'Connection established')
self.end_headers()
self._transition_to_ssl()
except Exception as e:
self.send_error(500, str(e))
return
# Reload!
self.setup()
self.handle_one_request()
def _construct_tunneled_url(self):
if int(self.port) == 443:
netloc = self.hostname
else:
netloc = '{}:{}'.format(self.hostname, self.port)
result = urllib_parse.urlunparse(
urllib_parse.ParseResult(
scheme='https',
netloc=netloc,
params='',
path=self.path,
query='',
fragment=''
)
)
return result
def do_COMMAND(self):
if not self.is_connect:
try:
# Connect to destination
self._determine_host_port()
self._connect_to_host()
assert self.url
except Exception as e:
self.send_error(500, str(e))
return
else:
# if self.is_connect we already connected in do_CONNECT
self.url = self._construct_tunneled_url()
self._proxy_request()
def _proxy_request(self):
raise Exception('_proxy_request() not implemented in MitmProxyHandler, must be implemented in subclass!')
def __getattr__(self, item):
if item.startswith('do_'):
return self.do_COMMAND
def log_error(self, fmt, *args):
self.logger.error("{0} - - [{1}] {2}".format(self.address_string(),
self.log_date_time_string(), fmt % args))
def log_message(self, fmt, *args):
self.logger.info("{} {} - - [{}] {}".format(self.__class__.__name__,
self.address_string(), self.log_date_time_string(), fmt % args))
class WarcProxyHandler(MitmProxyHandler):
logger = logging.getLogger('warcprox.WarcProxyHandler')
logger = logging.getLogger(__module__ + "." + __qualname__)
def _proxy_request(self):
# Build request
@ -477,7 +231,7 @@ class RecordedUrl(object):
class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
logger = logging.getLogger('warcprox.WarcProxy')
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, server_address=('localhost', 8000),
req_handler_class=WarcProxyHandler, bind_and_activate=True,
@ -505,572 +259,8 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
http_server.HTTPServer.server_close(self)
class PlaybackProxyHandler(MitmProxyHandler):
logger = logging.getLogger('warcprox.PlaybackProxyHandler')
# @Override
def _connect_to_host(self):
# don't connect to host!
pass
# @Override
def _proxy_request(self):
date, location = self.server.playback_index_db.lookup_latest(self.url.encode('utf-8'))
self.logger.debug('lookup_latest returned {}:{}'.format(date, location))
status = None
if location is not None:
try:
status, sz = self._send_response_from_warc(location['f'], location['o'])
except:
status = 500
self.logger.error('PlaybackProxyHandler problem playing back {}'.format(self.url), exc_info=1)
payload = b'500 Warcprox Error\n\n{}\n'.format(traceback.format_exc()).encode('utf-8')
headers = (b'HTTP/1.1 500 Internal Server Error\r\n'
+ b'Content-Type: text/plain;charset=utf-8\r\n'
+ b'Content-Length: ' + len(payload) + b'\r\n'
+ b'\r\n')
self.connection.sendall(headers)
self.connection.sendall(payload)
sz = len(headers) + len(payload)
else:
status = 404
payload = b'404 Not in Archive\n'
headers = (b'HTTP/1.1 404 Not Found\r\n'
+ b'Content-Type: text/plain;charset=utf-8\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n'
+ b'\r\n')
self.connection.sendall(headers)
self.connection.sendall(payload)
sz = len(headers) + len(payload)
self.log_message('"%s" %s %s %s',
self.requestline, str(status), str(sz), repr(location) if location else '-')
def _open_warc_at_offset(self, warcfilename, offset):
self.logger.debug('opening {} at offset {}'.format(warcfilename, offset))
warcpath = None
for p in (os.path.sep.join([self.server.warcs_dir, warcfilename]),
os.path.sep.join([self.server.warcs_dir, '{}.open'.format(warcfilename)])):
if os.path.exists(p):
warcpath = p
if warcpath is None:
raise Exception('{} not found'.format(warcfilename))
return warctools.warc.WarcRecord.open_archive(filename=warcpath, mode='rb', offset=offset)
def _send_response(self, headers, payload_fh):
status = '-'
m = re.match(br'^HTTP/\d\.\d (\d{3})', headers)
if m is not None:
status = m.group(1)
self.connection.sendall(headers)
sz = len(headers)
while True:
buf = payload_fh.read(8192)
if buf == b'': break
self.connection.sendall(buf)
sz += len(buf)
return status, sz
def _send_headers_and_refd_payload(self, headers, refers_to, refers_to_target_uri, refers_to_date):
location = self.server.playback_index_db.lookup_exact(refers_to_target_uri, refers_to_date, record_id=refers_to)
self.logger.debug('loading http payload from {}'.format(location))
fh = self._open_warc_at_offset(location['f'], location['o'])
try:
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
raise Exception('invalid attempt to retrieve http payload of "{}" record'.format(warc_type))
# find end of headers
while True:
line = record.content_file.readline()
if line == b'' or re.match(br'^\r?\n$', line):
break
return self._send_response(headers, record.content_file)
finally:
fh.close()
def _send_response_from_warc(self, warcfilename, offset):
fh = self._open_warc_at_offset(warcfilename, offset)
try:
for (offset, record, errors) in fh.read_records(limit=1, offsets=True):
pass
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type == warctools.WarcRecord.RESPONSE:
headers_buf = bytearray()
while True:
line = record.content_file.readline()
headers_buf.extend(line)
if line == b'' or re.match(b'^\r?\n$', line):
break
return self._send_response(headers_buf, record.content_file)
elif warc_type == warctools.WarcRecord.REVISIT:
# response consists of http headers from revisit record and
# payload from the referenced record
warc_profile = record.get_header(warctools.WarcRecord.PROFILE)
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
raise Exception('unknown revisit record profile {}'.format(warc_profile))
refers_to = record.get_header(warctools.WarcRecord.REFERS_TO)
refers_to_target_uri = record.get_header(warctools.WarcRecord.REFERS_TO_TARGET_URI)
refers_to_date = record.get_header(warctools.WarcRecord.REFERS_TO_DATE)
self.logger.debug('revisit record references {}:{} capture of {}'.format(refers_to_date, refers_to, refers_to_target_uri))
return self._send_headers_and_refd_payload(record.content[1], refers_to, refers_to_target_uri, refers_to_date)
else:
raise Exception('unknown warc record type {}'.format(warc_type))
finally:
fh.close()
raise Exception('should not reach this point')
class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
logger = logging.getLogger('warcprox.PlaybackProxy')
def __init__(self, server_address, req_handler_class=PlaybackProxyHandler,
bind_and_activate=True, ca=None, playback_index_db=None,
warcs_dir=None):
http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = ca
self.playback_index_db = playback_index_db
self.warcs_dir = warcs_dir
def server_activate(self):
http_server.HTTPServer.server_activate(self)
self.logger.info('PlaybackProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))
def server_close(self):
self.logger.info('PlaybackProxy shutting down')
http_server.HTTPServer.server_close(self)
class DedupDb(object):
logger = logging.getLogger('warcprox.DedupDb')
def __init__(self, dbm_file='./warcprox-dedup.db'):
if os.path.exists(dbm_file):
self.logger.info('opening existing deduplication database {}'.format(dbm_file))
else:
self.logger.info('creating new deduplication database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c')
def close(self):
self.db.close()
def sync(self):
if hasattr(self.db, 'sync'):
self.db.sync()
def save(self, key, response_record, offset):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
py_value = {'i':record_id, 'u':url, 'd':date}
json_value = json.dumps(py_value, separators=(',',':'))
self.db[key] = json_value.encode('utf-8')
self.logger.debug('dedup db saved {}:{}'.format(key, json_value))
def lookup(self, key):
if key in self.db:
json_result = self.db[key]
result = json.loads(json_result.decode('utf-8'))
result['i'] = result['i'].encode('latin1')
result['u'] = result['u'].encode('latin1')
result['d'] = result['d'].encode('latin1')
return result
else:
return None
class WarcWriterThread(threading.Thread):
logger = logging.getLogger('warcprox.WarcWriterThread')
# port is only used for warc filename
def __init__(self, recorded_url_q=None, directory='./warcs',
rollover_size=1000000000, rollover_idle_time=None, gzip=False,
prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False,
dedup_db=None, playback_index_db=None):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self.gzip = gzip
self.digest_algorithm = digest_algorithm
self.base32 = base32
self.dedup_db = dedup_db
self.playback_index_db = playback_index_db
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
self.port = port
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory)
self.stop = threading.Event()
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
def build_warc_records(self, recorded_url):
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
dedup_info = None
if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None:
key = self.digest_str(recorded_url.response_recorder.payload_digest)
dedup_info = self.dedup_db.lookup(key)
if dedup_info is not None:
# revisit record
recorded_url.response_recorder.tempfile.seek(0)
if recorded_url.response_recorder.payload_offset is not None:
response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset)
else:
response_header_block = recorded_url.response_recorder.tempfile.read()
principal_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=dedup_info['i'],
refers_to_target_uri=dedup_info['u'],
refers_to_date=dedup_info['d'],
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
else:
# response record
principal_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
request_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=recorded_url.request_data,
warc_type=warctools.WarcRecord.REQUEST,
content_type=httptools.RequestMessage.CONTENT_TYPE,
concurrent_to=principal_record.id)
return principal_record, request_record
def digest_str(self, hash_obj):
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii'))
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
profile=None, refers_to=None, refers_to_target_uri=None,
refers_to_date=None):
if warc_date is None:
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.DATE, warc_date))
headers.append((warctools.WarcRecord.URL, url))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if profile is not None:
headers.append((warctools.WarcRecord.PROFILE, profile))
if refers_to is not None:
headers.append((warctools.WarcRecord.REFERS_TO, refers_to))
if refers_to_target_uri is not None:
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
if refers_to_date is not None:
headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date))
if concurrent_to is not None:
headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to))
if content_type is not None:
headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type))
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
self.digest_str(recorder.block_digest)))
if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
self.digest_str(recorder.payload_digest)))
recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1')))
block_digest = hashlib.new(self.digest_algorithm, data)
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
self.digest_str(block_digest)))
content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple)
return record
def timestamp17(self):
now = datetime.utcnow()
return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
def _close_writer(self):
if self._fpath:
self.logger.info('closing {0}'.format(self._f_finalname))
self._f.close()
finalpath = os.path.sep.join([self.directory, self._f_finalname])
os.rename(self._fpath, finalpath)
self._fpath = None
self._f = None
def _build_warcinfo_record(self, filename):
warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow())
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO))
headers.append((warctools.WarcRecord.FILENAME, filename.encode('latin1')))
headers.append((warctools.WarcRecord.DATE, warc_record_date))
warcinfo_fields = []
warcinfo_fields.append(b'software: warcprox ' + version_bytes)
hostname = socket.gethostname()
warcinfo_fields.append('hostname: {}'.format(hostname).encode('latin1'))
warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)).encode('latin1'))
warcinfo_fields.append(b'format: WARC File Format 1.0')
# warcinfo_fields.append('robots: ignore')
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = b'\r\n'.join(warcinfo_fields) + b'\r\n'
record = warctools.WarcRecord(headers=headers, content=(b'application/warc-fields', data))
return record
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
self._close_writer()
if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial, os.getpid(),
socket.gethostname(), self.port, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open'])
self._f = open(self._fpath, 'wb')
warcinfo_record = self._build_warcinfo_record(self._f_finalname)
self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers))
warcinfo_record.write_to(self._f, gzip=self.gzip)
self._serial += 1
return self._f
def _final_tasks(self, recorded_url, recordset, recordset_offset):
if (self.dedup_db is not None
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = self.digest_str(recorded_url.response_recorder.payload_digest)
self.dedup_db.save(key, recordset[0], recordset_offset)
if self.playback_index_db is not None:
self.playback_index_db.save(self._f_finalname, recordset, recordset_offset)
recorded_url.response_recorder.tempfile.close()
def run(self):
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.directory), self.gzip, self.rollover_size,
self.rollover_idle_time, self.prefix, self.port))
self._last_sync = self._last_activity = time.time()
while not self.stop.is_set():
try:
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self._last_activity = time.time()
recordset = self.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
for record in recordset:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format(
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, offset))
self._f.flush()
self._final_tasks(recorded_url, recordset, recordset_offset)
except queue.Empty:
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
self._close_writer()
if time.time() - self._last_sync > 60:
if self.dedup_db:
self.dedup_db.sync()
if self.playback_index_db:
self.playback_index_db.sync()
self._last_sync = time.time()
self.logger.info('WarcWriterThread shutting down')
self._close_writer();
class PlaybackIndexDb(object):
logger = logging.getLogger('warcprox.PlaybackIndexDb')
def __init__(self, dbm_file='./warcprox-playback-index.db'):
if os.path.exists(dbm_file):
self.logger.info('opening existing playback index database {}'.format(dbm_file))
else:
self.logger.info('creating new playback index database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c')
def close(self):
self.db.close()
def sync(self):
if hasattr(self.db, 'sync'):
self.db.sync()
def save(self, warcfile, recordset, offset):
response_record = recordset[0]
# XXX canonicalize url?
url = response_record.get_header(warctools.WarcRecord.URL)
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
# there could be two visits of same url in the same second, and WARC-Date is
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
if url in self.db:
existing_json_value = self.db[url].decode('utf-8')
py_value = json.loads(existing_json_value)
else:
py_value = {}
if date_str in py_value:
py_value[date_str].append({'f':warcfile, 'o':offset, 'i':record_id_str})
else:
py_value[date_str] = [{'f':warcfile, 'o':offset, 'i':record_id_str}]
json_value = json.dumps(py_value, separators=(',',':'))
self.db[url] = json_value.encode('utf-8')
self.logger.debug('playback index saved: {}:{}'.format(url, json_value))
def lookup_latest(self, url):
if url not in self.db:
return None, None
json_value = self.db[url].decode('utf-8')
self.logger.debug("{}:{}".format(repr(url), repr(json_value)))
py_value = json.loads(json_value)
latest_date = max(py_value)
result = py_value[latest_date][0]
result['i'] = result['i'].encode('ascii')
return latest_date, result
# in python3 params are bytes
def lookup_exact(self, url, warc_date, record_id):
if url not in self.db:
return None
json_value = self.db[url].decode('utf-8')
self.logger.debug("{}:{}".format(repr(url), repr(json_value)))
py_value = json.loads(json_value)
warc_date_str = warc_date.decode('ascii')
if warc_date_str in py_value:
for record in py_value[warc_date_str]:
if record['i'].encode('ascii') == record_id:
self.logger.debug("found exact match for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url)))
record['i'] = record['i'].encode('ascii')
return record
else:
self.logger.info("match not found for ({},{},{})".format(repr(warc_date), repr(record_id), repr(url)))
return None
class WarcproxController(object):
logger = logging.getLogger('warcprox.WarcproxController')
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, proxy=None, warc_writer=None, playback_proxy=None):
"""
@ -1186,7 +376,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(version_str))
version="warcprox {}".format(warcprox.version_str))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
@ -1232,9 +422,9 @@ def main(argv=sys.argv):
digest_algorithm=args.digest_algorithm)
if args.playback_port is not None:
playback_index_db = PlaybackIndexDb(args.playback_index_db_file)
playback_index_db = playback.PlaybackIndexDb(args.playback_index_db_file)
playback_server_address=(args.address, int(args.playback_port))
playback_proxy = PlaybackProxy(server_address=playback_server_address,
playback_proxy = playback.PlaybackProxy(server_address=playback_server_address,
ca=ca, playback_index_db=playback_index_db,
warcs_dir=args.directory)
else:

287
warcprox/warcwriter.py Normal file
View File

@ -0,0 +1,287 @@
# vim:set sw=4 et:
try:
import queue
except ImportError:
import Queue as queue
import logging
import threading
import os
import hashlib
import time
import socket
import base64
from datetime import datetime
import hanzo.httptools
from hanzo import warctools
import warcprox
class WarcWriterThread(threading.Thread):
logger = logging.getLogger(__module__ + "." + __qualname__)
# port is only used for warc filename
def __init__(self, recorded_url_q=None, directory='./warcs',
rollover_size=1000000000, rollover_idle_time=None, gzip=False,
prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False,
dedup_db=None, playback_index_db=None):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self.gzip = gzip
self.digest_algorithm = digest_algorithm
self.base32 = base32
self.dedup_db = dedup_db
self.playback_index_db = playback_index_db
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
self.port = port
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory)
self.stop = threading.Event()
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
def build_warc_records(self, recorded_url):
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
dedup_info = None
if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None:
key = self.digest_str(recorded_url.response_recorder.payload_digest)
dedup_info = self.dedup_db.lookup(key)
if dedup_info is not None:
# revisit record
recorded_url.response_recorder.tempfile.seek(0)
if recorded_url.response_recorder.payload_offset is not None:
response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset)
else:
response_header_block = recorded_url.response_recorder.tempfile.read()
principal_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=dedup_info['i'],
refers_to_target_uri=dedup_info['u'],
refers_to_date=dedup_info['d'],
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
else:
# response record
principal_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
request_record = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=recorded_url.request_data,
warc_type=warctools.WarcRecord.REQUEST,
content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE,
concurrent_to=principal_record.id)
return principal_record, request_record
def digest_str(self, hash_obj):
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii'))
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
profile=None, refers_to=None, refers_to_target_uri=None,
refers_to_date=None):
if warc_date is None:
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.DATE, warc_date))
headers.append((warctools.WarcRecord.URL, url))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if profile is not None:
headers.append((warctools.WarcRecord.PROFILE, profile))
if refers_to is not None:
headers.append((warctools.WarcRecord.REFERS_TO, refers_to))
if refers_to_target_uri is not None:
headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri))
if refers_to_date is not None:
headers.append((warctools.WarcRecord.REFERS_TO_DATE, refers_to_date))
if concurrent_to is not None:
headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to))
if content_type is not None:
headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type))
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
self.digest_str(recorder.block_digest)))
if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
self.digest_str(recorder.payload_digest)))
recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1')))
block_digest = hashlib.new(self.digest_algorithm, data)
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
self.digest_str(block_digest)))
content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple)
return record
def timestamp17(self):
now = datetime.utcnow()
return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
def _close_writer(self):
if self._fpath:
self.logger.info('closing {0}'.format(self._f_finalname))
self._f.close()
finalpath = os.path.sep.join([self.directory, self._f_finalname])
os.rename(self._fpath, finalpath)
self._fpath = None
self._f = None
def _build_warcinfo_record(self, filename):
warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow())
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO))
headers.append((warctools.WarcRecord.FILENAME, filename.encode('latin1')))
headers.append((warctools.WarcRecord.DATE, warc_record_date))
warcinfo_fields = []
warcinfo_fields.append(b'software: warcprox ' + warcprox.version_bytes)
hostname = socket.gethostname()
warcinfo_fields.append('hostname: {}'.format(hostname).encode('latin1'))
warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)).encode('latin1'))
warcinfo_fields.append(b'format: WARC File Format 1.0')
# warcinfo_fields.append('robots: ignore')
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = b'\r\n'.join(warcinfo_fields) + b'\r\n'
record = warctools.WarcRecord(headers=headers, content=(b'application/warc-fields', data))
return record
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
self._close_writer()
if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
self.prefix, self.timestamp17(), self._serial, os.getpid(),
socket.gethostname(), self.port, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([self.directory, self._f_finalname + '.open'])
self._f = open(self._fpath, 'wb')
warcinfo_record = self._build_warcinfo_record(self._f_finalname)
self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers))
warcinfo_record.write_to(self._f, gzip=self.gzip)
self._serial += 1
return self._f
def _final_tasks(self, recorded_url, recordset, recordset_offset):
if (self.dedup_db is not None
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = self.digest_str(recorded_url.response_recorder.payload_digest)
self.dedup_db.save(key, recordset[0], recordset_offset)
if self.playback_index_db is not None:
self.playback_index_db.save(self._f_finalname, recordset, recordset_offset)
recorded_url.response_recorder.tempfile.close()
def run(self):
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.directory), self.gzip, self.rollover_size,
self.rollover_idle_time, self.prefix, self.port))
self._last_sync = self._last_activity = time.time()
while not self.stop.is_set():
try:
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self._last_activity = time.time()
recordset = self.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
for record in recordset:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format(
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, offset))
self._f.flush()
self._final_tasks(recorded_url, recordset, recordset_offset)
except queue.Empty:
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
self._close_writer()
if time.time() - self._last_sync > 60:
if self.dedup_db:
self.dedup_db.sync()
if self.playback_index_db:
self.playback_index_db.sync()
self._last_sync = time.time()
self.logger.info('WarcWriterThread shutting down')
self._close_writer();