2013-10-15 10:54:18 -07:00
# vim:set sw=4 et:
2013-10-15 15:52:26 -07:00
import BaseHTTPServer , SocketServer
import socket
import urlparse
2013-10-15 17:51:09 -07:00
import OpenSSL
2013-10-15 15:52:26 -07:00
import ssl
2013-10-15 10:54:18 -07:00
import logging
import sys
2013-10-15 15:52:26 -07:00
from hanzo import warctools
2013-10-15 10:54:18 -07:00
import uuid
import hashlib
from datetime import datetime
import Queue
import threading
2013-10-16 01:05:06 -07:00
import os
2013-10-15 14:11:31 -07:00
import argparse
2013-10-16 01:05:06 -07:00
import random
2013-10-16 18:13:56 -07:00
import httplib
2013-10-16 19:10:04 -07:00
import re
2013-10-17 01:58:07 -07:00
import signal
import time
2013-10-17 12:58:17 -07:00
import tempfile
2013-10-16 14:36:19 -07:00
class CertificateAuthority ( object ) :
def __init__ ( self , ca_file = ' warcprox-ca.pem ' , certs_dir = ' ./warcprox-ca ' ) :
self . ca_file = ca_file
self . certs_dir = certs_dir
if not os . path . exists ( ca_file ) :
self . _generate_ca ( )
else :
self . _read_ca ( ca_file )
if not os . path . exists ( certs_dir ) :
logging . info ( " directory for generated certs {} doesn ' t exist, creating it " . format ( certs_dir ) )
os . mkdir ( certs_dir )
def _generate_ca ( self ) :
# Generate key
self . key = OpenSSL . crypto . PKey ( )
self . key . generate_key ( OpenSSL . crypto . TYPE_RSA , 2048 )
# Generate certificate
self . cert = OpenSSL . crypto . X509 ( )
self . cert . set_version ( 3 )
# avoid sec_error_reused_issuer_and_serial
self . cert . set_serial_number ( random . randint ( 0 , 2 * * 64 - 1 ) )
self . cert . get_subject ( ) . CN = ' CA for warcprox MITM archiving proxy '
self . cert . gmtime_adj_notBefore ( 0 ) # now
self . cert . gmtime_adj_notAfter ( 100 * 365 * 24 * 60 * 60 ) # 100 yrs in future
self . cert . set_issuer ( self . cert . get_subject ( ) )
self . cert . set_pubkey ( self . key )
self . cert . add_extensions ( [
OpenSSL . crypto . X509Extension ( " basicConstraints " , True , " CA:TRUE, pathlen:0 " ) ,
OpenSSL . crypto . X509Extension ( " keyUsage " , True , " keyCertSign, cRLSign " ) ,
OpenSSL . crypto . X509Extension ( " subjectKeyIdentifier " , False , " hash " , subject = self . cert ) ,
] )
self . cert . sign ( self . key , " sha1 " )
with open ( self . ca_file , ' wb+ ' ) as f :
f . write ( OpenSSL . crypto . dump_privatekey ( OpenSSL . SSL . FILETYPE_PEM , self . key ) )
f . write ( OpenSSL . crypto . dump_certificate ( OpenSSL . SSL . FILETYPE_PEM , self . cert ) )
logging . info ( ' generated CA key+cert and wrote to {} ' . format ( self . ca_file ) )
2013-10-17 18:12:33 -07:00
def _read_ca ( self , filename ) :
self . cert = OpenSSL . crypto . load_certificate ( OpenSSL . SSL . FILETYPE_PEM , open ( filename ) . read ( ) )
self . key = OpenSSL . crypto . load_privatekey ( OpenSSL . SSL . FILETYPE_PEM , open ( filename ) . read ( ) )
2013-10-16 14:36:19 -07:00
logging . info ( ' read CA key+cert from {} ' . format ( self . ca_file ) )
def __getitem__ ( self , cn ) :
cnp = os . path . sep . join ( [ self . certs_dir , ' %s .pem ' % cn ] )
if not os . path . exists ( cnp ) :
# create certificate
key = OpenSSL . crypto . PKey ( )
key . generate_key ( OpenSSL . crypto . TYPE_RSA , 2048 )
# Generate CSR
req = OpenSSL . crypto . X509Req ( )
req . get_subject ( ) . CN = cn
req . set_pubkey ( key )
req . sign ( key , ' sha1 ' )
# Sign CSR
cert = OpenSSL . crypto . X509 ( )
cert . set_subject ( req . get_subject ( ) )
cert . set_serial_number ( random . randint ( 0 , 2 * * 64 - 1 ) )
cert . gmtime_adj_notBefore ( 0 )
cert . gmtime_adj_notAfter ( 10 * 365 * 24 * 60 * 60 )
cert . set_issuer ( self . cert . get_subject ( ) )
cert . set_pubkey ( req . get_pubkey ( ) )
cert . sign ( self . key , ' sha1 ' )
with open ( cnp , ' wb+ ' ) as f :
f . write ( OpenSSL . crypto . dump_privatekey ( OpenSSL . SSL . FILETYPE_PEM , key ) )
f . write ( OpenSSL . crypto . dump_certificate ( OpenSSL . SSL . FILETYPE_PEM , cert ) )
logging . info ( ' wrote generated key+cert to {} ' . format ( cnp ) )
return cnp
2012-07-19 11:08:14 -04:00
class UnsupportedSchemeException ( Exception ) :
2013-10-17 02:47:55 -07:00
# This class intercepts the raw bytes, so it's the easiest place to hook in to
# send the raw bytes on to the proxy destination.
class ProxyingRecorder :
2013-10-16 18:13:56 -07:00
2013-10-17 02:47:55 -07:00
def __init__ ( self , fp , proxy_dest ) :
2013-10-16 18:13:56 -07:00
self . fp = fp
2013-10-17 18:12:33 -07:00
# "The file has no name, and will cease to exist when it is closed."
self . tempfile = tempfile . SpooledTemporaryFile ( max_size = 512 * 1024 )
2013-10-16 19:10:04 -07:00
self . block_sha1 = hashlib . sha1 ( )
self . payload_sha1 = None
2013-10-17 02:47:55 -07:00
self . proxy_dest = proxy_dest
2013-10-17 12:58:17 -07:00
self . _prev_hunk_last_two_bytes = ' '
self . len = 0
2013-10-16 19:10:04 -07:00
2013-10-17 02:47:55 -07:00
def _update ( self , hunk ) :
2013-10-16 19:10:04 -07:00
if self . payload_sha1 is None :
2013-10-17 12:58:17 -07:00
# convoluted handling of two newlines crossing hunks
2013-10-16 19:10:04 -07:00
# XXX write tests for this
2013-10-17 12:58:17 -07:00
if self . _prev_hunk_last_two_bytes . endswith ( ' \n ' ) :
2013-10-17 02:47:55 -07:00
if hunk . startswith ( ' \n ' ) :
2013-10-16 19:10:04 -07:00
self . payload_sha1 = hashlib . sha1 ( )
2013-10-17 02:47:55 -07:00
self . payload_sha1 . update ( hunk [ 1 : ] )
elif hunk . startswith ( ' \r \n ' ) :
2013-10-16 19:10:04 -07:00
self . payload_sha1 = hashlib . sha1 ( )
2013-10-17 02:47:55 -07:00
self . payload_sha1 . update ( hunk [ 2 : ] )
2013-10-17 12:58:17 -07:00
elif self . _prev_hunk_last_two_bytes == ' \n \r ' :
2013-10-17 02:47:55 -07:00
if hunk . startswith ( ' \n ' ) :
2013-10-16 19:10:04 -07:00
self . payload_sha1 = hashlib . sha1 ( )
2013-10-17 02:47:55 -07:00
self . payload_sha1 . update ( hunk [ 1 : ] )
2013-10-16 19:10:04 -07:00
else :
2013-10-17 02:47:55 -07:00
m = re . search ( r ' \ n \ r? \ n ' , hunk )
2013-10-16 19:10:04 -07:00
if m is not None :
self . payload_sha1 = hashlib . sha1 ( )
2013-10-17 02:47:55 -07:00
self . payload_sha1 . update ( hunk [ m . end ( ) : ] )
2013-10-17 12:58:17 -07:00
# if we still haven't found start of payload hold on to these bytes
if self . payload_sha1 is None :
self . _prev_hunk_last_two_bytes = hunk [ - 2 : ]
2013-10-16 19:10:04 -07:00
else :
2013-10-17 02:47:55 -07:00
self . payload_sha1 . update ( hunk )
2013-10-16 19:10:04 -07:00
2013-10-17 02:47:55 -07:00
self . block_sha1 . update ( hunk )
2013-10-16 18:13:56 -07:00
2013-10-17 18:12:33 -07:00
self . tempfile . write ( hunk )
2013-10-17 02:47:55 -07:00
self . proxy_dest . sendall ( hunk )
2013-10-17 12:58:17 -07:00
self . len + = len ( hunk )
2013-10-16 19:10:04 -07:00
2013-10-17 02:47:55 -07:00
def read ( self , size = - 1 ) :
hunk = self . fp . read ( size = size )
self . _update ( hunk )
return hunk
2013-10-16 18:13:56 -07:00
def readline ( self , size = - 1 ) :
2013-10-16 19:10:04 -07:00
# XXX does not call self.read(); if it ever did this would break
2013-10-17 02:47:55 -07:00
hunk = self . fp . readline ( size = size )
self . _update ( hunk )
return hunk
2013-10-16 18:13:56 -07:00
def close ( self ) :
return self . fp . close ( )
2013-10-17 12:58:17 -07:00
def __len__ ( self ) :
return self . len
2013-10-16 18:13:56 -07:00
2013-10-17 02:47:55 -07:00
class ProxyingRecordingHTTPResponse ( httplib . HTTPResponse ) :
2013-10-16 18:13:56 -07:00
2013-10-17 02:47:55 -07:00
def __init__ ( self , sock , debuglevel = 0 , strict = 0 , method = None , buffering = False , proxy_dest = None ) :
2013-10-16 18:13:56 -07:00
httplib . HTTPResponse . __init__ ( self , sock , debuglevel = debuglevel , strict = strict , method = method , buffering = buffering )
# Keep around extra reference to self.fp because HTTPResponse sets
# self.fp=None after it finishes reading, but we still need it
2013-10-17 02:47:55 -07:00
self . recorder = ProxyingRecorder ( self . fp , proxy_dest )
2013-10-16 18:13:56 -07:00
self . fp = self . recorder
2013-10-17 18:12:33 -07:00
class WarcProxyHandler ( BaseHTTPServer . BaseHTTPRequestHandler ) :
2012-07-19 11:08:14 -04:00
2012-12-24 00:52:23 -05:00
def __init__ ( self , request , client_address , server ) :
self . is_connect = False
2013-10-15 15:52:26 -07:00
BaseHTTPServer . BaseHTTPRequestHandler . __init__ ( self , request , client_address , server )
2012-12-24 00:52:23 -05:00
2012-07-19 11:08:14 -04:00
def _connect_to_host ( self ) :
# Get hostname and port to connect to
if self . is_connect :
2012-12-24 00:52:23 -05:00
self . hostname , self . port = self . path . split ( ' : ' )
2012-07-19 11:08:14 -04:00
else :
2013-10-15 10:54:18 -07:00
self . url = self . path
2013-10-15 15:52:26 -07:00
u = urlparse . urlparse ( self . url )
2012-07-19 11:08:14 -04:00
if u . scheme != ' http ' :
raise UnsupportedSchemeException ( ' Unknown scheme %s ' % repr ( u . scheme ) )
2012-12-24 00:52:23 -05:00
self . hostname = u . hostname
self . port = u . port or 80
2013-10-15 15:52:26 -07:00
self . path = urlparse . urlunparse (
urlparse . ParseResult (
2012-12-24 00:52:23 -05:00
scheme = ' ' ,
netloc = ' ' ,
params = u . params ,
path = u . path or ' / ' ,
query = u . query ,
fragment = u . fragment
2012-07-19 11:08:14 -04:00
# Connect to destination
2013-10-15 15:52:26 -07:00
self . _proxy_sock = socket . socket ( )
2012-07-19 11:08:14 -04:00
self . _proxy_sock . settimeout ( 10 )
2012-12-24 00:52:23 -05:00
self . _proxy_sock . connect ( ( self . hostname , int ( self . port ) ) )
2012-07-19 11:08:14 -04:00
# Wrap socket if SSL is required
if self . is_connect :
2013-10-15 14:11:31 -07:00
self . _proxy_sock = ssl . wrap_socket ( self . _proxy_sock )
2012-07-19 11:08:14 -04:00
def _transition_to_ssl ( self ) :
2013-10-17 18:12:33 -07:00
self . connection = ssl . wrap_socket ( self . connection , server_side = True ,
certfile = self . server . ca [ self . hostname ] )
2012-07-19 11:08:14 -04:00
def do_CONNECT ( self ) :
self . is_connect = True
try :
# Connect to destination first
self . _connect_to_host ( )
# If successful, let's do this!
self . send_response ( 200 , ' Connection established ' )
self . end_headers ( )
self . _transition_to_ssl ( )
2013-10-15 10:54:18 -07:00
except Exception as e :
2012-07-19 11:08:14 -04:00
self . send_error ( 500 , str ( e ) )
# Reload!
self . setup ( )
2013-10-17 18:12:33 -07:00
self . handle_one_request ( )
def _construct_tunneled_url ( self ) :
if int ( self . port ) == 443 :
netloc = self . hostname
else :
netloc = ' {} : {} ' . format ( self . hostname , self . port )
result = urlparse . urlunparse (
urlparse . ParseResult (
scheme = ' https ' ,
netloc = netloc ,
params = ' ' ,
path = self . path ,
query = ' ' ,
fragment = ' '
return result
2012-07-19 11:08:14 -04:00
def do_COMMAND ( self ) :
if not self . is_connect :
try :
# Connect to destination
self . _connect_to_host ( )
2013-10-17 18:12:33 -07:00
assert self . url
2013-10-15 10:54:18 -07:00
except Exception as e :
2012-07-19 11:08:14 -04:00
self . send_error ( 500 , str ( e ) )
2013-10-17 18:12:33 -07:00
else :
self . url = _construct_tunneled_url ( )
2013-10-16 19:10:04 -07:00
2012-07-19 11:08:14 -04:00
# Build request
2012-12-24 00:52:23 -05:00
req = ' %s %s %s \r \n ' % ( self . command , self . path , self . request_version )
2013-10-15 10:54:18 -07:00
2012-07-19 11:08:14 -04:00
# Add headers to the request
req + = ' %s \r \n ' % self . headers
# Append message body if present to the request
if ' Content-Length ' in self . headers :
req + = self . rfile . read ( int ( self . headers [ ' Content-Length ' ] ) )
2013-10-17 02:47:55 -07:00
2012-07-19 11:08:14 -04:00
# Send it down the pipe!
2013-10-16 19:10:04 -07:00
self . _proxy_sock . sendall ( req )
2012-07-19 11:08:14 -04:00
2013-10-17 02:47:55 -07:00
# We want HTTPResponse's smarts about http and handling of
# non-compliant servers. But HTTPResponse.read() doesn't return the raw
# bytes read from the server, it unchunks them if they're chunked, and
# might do other stuff. We want to send the raw bytes back to the
# client. So we ignore the values returned by h.read() below. Instead
# the ProxyingRecordingHTTPResponse takes care of sending the raw bytes
# to the proxy client.
# Proxy and record the response
2013-10-17 18:12:33 -07:00
h = ProxyingRecordingHTTPResponse ( self . _proxy_sock , proxy_dest = self . connection )
2012-07-19 11:08:14 -04:00
h . begin ( )
2013-10-17 02:47:55 -07:00
2013-10-17 18:12:33 -07:00
buf = h . read ( 8192 )
2013-10-16 18:13:56 -07:00
while buf != ' ' :
2013-10-17 18:12:33 -07:00
buf = h . read ( 8192 )
2013-10-16 18:13:56 -07:00
2012-07-19 11:08:14 -04:00
# Let's close off the remote end
h . close ( )
self . _proxy_sock . close ( )
2013-10-17 18:12:33 -07:00
self . server . recordset_q . create_and_queue ( self . url , req , h . recorder )
2013-10-15 14:11:31 -07:00
2012-07-19 11:08:14 -04:00
def __getattr__ ( self , item ) :
if item . startswith ( ' do_ ' ) :
return self . do_COMMAND
2013-10-17 18:12:33 -07:00
def log_error ( self , fmt , * args ) :
2013-10-15 14:11:31 -07:00
logging . error ( " {0} - - [ {1} ] {2} " . format ( self . address_string ( ) ,
2013-10-17 18:12:33 -07:00
self . log_date_time_string ( ) , fmt % args ) )
2013-10-15 14:11:31 -07:00
2013-10-17 18:12:33 -07:00
def log_message ( self , fmt , * args ) :
2013-10-15 14:11:31 -07:00
logging . info ( " {0} - - [ {1} ] {2} " . format ( self . address_string ( ) ,
2013-10-17 18:12:33 -07:00
self . log_date_time_string ( ) , fmt % args ) )
2013-10-15 14:11:31 -07:00
2013-10-17 18:12:33 -07:00
class WarcProxy ( SocketServer . ThreadingMixIn , BaseHTTPServer . HTTPServer ) :
2012-07-19 11:08:14 -04:00
2013-10-17 18:12:33 -07:00
def __init__ ( self , server_address , req_handler_class = WarcProxyHandler ,
bind_and_activate = True , ca_file = ' ./warcprox-ca.pem ' ,
certs_dir = ' ./warcprox-ca ' , recordset_q = None ) :
2013-10-15 15:52:26 -07:00
BaseHTTPServer . HTTPServer . __init__ ( self , server_address , req_handler_class , bind_and_activate )
2013-10-16 15:36:53 -07:00
self . ca = CertificateAuthority ( ca_file , certs_dir )
2013-10-17 18:12:33 -07:00
self . recordset_q = recordset_q
2013-10-15 11:43:45 -07:00
2013-10-15 10:54:18 -07:00
def server_activate ( self ) :
2013-10-15 15:52:26 -07:00
BaseHTTPServer . HTTPServer . server_activate ( self )
2013-10-15 10:54:18 -07:00
logging . info ( ' listening on {0} : {1} ' . format ( self . server_address [ 0 ] , self . server_address [ 1 ] ) )
def server_close ( self ) :
2013-10-16 19:10:04 -07:00
logging . info ( ' shutting down ' )
2013-10-15 15:52:26 -07:00
BaseHTTPServer . HTTPServer . server_close ( self )
2013-10-15 10:54:18 -07:00
2013-10-17 18:12:33 -07:00
# Each item in the queue is a tuple of warc records, which should be written
# consecutively in the same warc.
class WarcRecordsetQueue ( Queue . Queue ) :
2013-10-15 18:37:26 -07:00
2013-10-17 18:12:33 -07:00
def create_and_queue ( self , url , request_data , response_recorder ) :
warc_date = warctools . warc . warc_datetime_str ( datetime . now ( ) )
2013-10-15 18:37:26 -07:00
2013-10-17 18:12:33 -07:00
response_record , response_record_id = self . make_record ( url = url ,
warc_date = warc_date , recorder = response_recorder ,
warc_type = warctools . WarcRecord . RESPONSE ,
content_type = " application/http;msgtype=response " )
2013-10-15 10:54:18 -07:00
2013-10-17 18:12:33 -07:00
request_record , request_record_id = self . make_record ( url = url ,
warc_date = warc_date , data = request_data ,
warc_type = warctools . WarcRecord . REQUEST ,
content_type = " application/http;msgtype=request " ,
concurrent_to = response_record_id )
2013-10-15 10:54:18 -07:00
2013-10-17 18:12:33 -07:00
record_group = ( response_record , request_record )
self . put ( record_group )
2013-10-15 10:54:18 -07:00
2013-10-15 18:37:26 -07:00
2013-10-17 18:12:33 -07:00
def make_record ( url , warc_date = None , recorder = None , data = None ,
concurrent_to = None , warc_type = None , content_type = None ) :
2013-10-15 18:37:26 -07:00
2013-10-17 18:12:33 -07:00
if warc_date is None :
warc_date = warctools . warc . warc_datetime_str ( datetime . now ( ) )
2012-07-19 11:08:14 -04:00
2013-10-17 18:12:33 -07:00
record_id = warctools . WarcRecord . random_warc_uuid ( )
2013-10-15 10:54:18 -07:00
2013-10-15 18:37:26 -07:00
headers = [ ]
headers . append ( ( warctools . WarcRecord . ID , record_id ) )
2013-10-17 18:12:33 -07:00
headers . append ( ( warctools . WarcRecord . DATE , warc_date ) )
headers . append ( ( warctools . WarcRecord . URL , url ) )
2013-10-15 18:37:26 -07:00
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
2013-10-17 18:12:33 -07:00
if warc_type is not None :
headers . append ( ( warctools . WarcRecord . TYPE , warc_type ) )
if concurrent_to is not None :
headers . append ( ( warctools . WarcRecord . CONCURRENT_TO , concurrent_to ) )
if content_type is not None :
headers . append ( ( warctools . WarcRecord . CONTENT_TYPE , content_type ) )
if recorder is not None :
headers . append ( ( warctools . WarcRecord . CONTENT_LENGTH , str ( len ( recorder ) ) ) )
headers . append ( ( warctools . WarcRecord . BLOCK_DIGEST , ' sha1: {} ' . format ( recorder . block_sha1 . hexdigest ( ) ) ) )
if recorder . payload_sha1 is not None :
headers . append ( ( warctools . WarcRecord . PAYLOAD_DIGEST , ' sha1: {} ' . format ( recorder . payload_sha1 . hexdigest ( ) ) ) )
recorder . tempfile . seek ( 0 )
record = warctools . WarcRecord ( headers = headers , content_file = recorder . tempfile )
2013-10-15 18:37:26 -07:00
2013-10-17 18:12:33 -07:00
else :
headers . append ( ( warctools . WarcRecord . CONTENT_LENGTH , str ( len ( data ) ) ) )
headers . append ( ( warctools . WarcRecord . BLOCK_DIGEST , ' sha1: {} ' . format ( hashlib . sha1 ( data ) . hexdigest ( ) ) ) )
2013-10-15 10:54:18 -07:00
2013-10-17 18:12:33 -07:00
content_tuple = content_type , data
record = warctools . WarcRecord ( headers = headers , content = content_tuple )
2013-10-15 18:37:26 -07:00
2013-10-17 18:12:33 -07:00
return record , record_id
2013-10-15 10:54:18 -07:00
2012-07-19 11:08:14 -04:00
2013-10-15 10:54:18 -07:00
class WarcWriterThread ( threading . Thread ) :
2013-10-17 18:12:33 -07:00
def __init__ ( self , recordset_q , directory , gzip , prefix , size , port ) :
2013-10-15 10:54:18 -07:00
threading . Thread . __init__ ( self , name = ' WarcWriterThread ' )
2013-10-15 15:52:26 -07:00
2013-10-17 18:12:33 -07:00
self . recordset_q = recordset_q
2013-10-15 15:52:26 -07:00
self . directory = directory
self . gzip = gzip
self . prefix = prefix
self . size = size
self . port = port
self . _f = None
self . _fpath = None
self . _serial = 0
if not os . path . exists ( directory ) :
2013-10-17 12:58:17 -07:00
logging . info ( " warc destination directory {} doesn ' t exist, creating it " . format ( directory ) )
2013-10-15 15:52:26 -07:00
os . mkdir ( directory )
2013-10-15 10:54:18 -07:00
self . stop = threading . Event ( )
2012-07-19 11:08:14 -04:00
2013-10-15 15:52:26 -07:00
def timestamp17 ( self ) :
now = datetime . now ( )
2013-10-17 12:58:17 -07:00
return ' {} {} ' . format ( now . strftime ( ' % Y % m %d % H % M % S ' ) , now . microsecond / / 1000 )
2013-10-15 15:52:26 -07:00
def _close_writer ( self ) :
2013-10-15 17:51:09 -07:00
if self . _fpath :
final_name = self . _fpath [ : - 5 ]
logging . info ( ' closing {0} ' . format ( final_name ) )
self . _f . close ( )
os . rename ( self . _fpath , final_name )
self . _fpath = None
self . _f = None
def _make_warcinfo_record ( self , filename ) :
warc_record_date = warctools . warc . warc_datetime_str ( datetime . now ( ) )
2013-10-17 18:12:33 -07:00
record_id = warctools . WarcRecord . random_warc_uuid ( )
2013-10-15 15:52:26 -07:00
2013-10-15 17:51:09 -07:00
headers = [ ]
2013-10-15 18:37:26 -07:00
headers . append ( ( warctools . WarcRecord . ID , record_id ) )
2013-10-15 17:51:09 -07:00
headers . append ( ( warctools . WarcRecord . TYPE , warctools . WarcRecord . WARCINFO ) )
headers . append ( ( warctools . WarcRecord . FILENAME , filename ) )
headers . append ( ( warctools . WarcRecord . DATE , warc_record_date ) )
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
warcinfo_fields = [ ]
warcinfo_fields . append ( ' software: warcprox.py https://github.com/nlevitt/warcprox ' )
hostname = socket . gethostname ( )
warcinfo_fields . append ( ' hostname: {0} ' . format ( hostname ) )
warcinfo_fields . append ( ' ip: {0} ' . format ( socket . gethostbyname ( hostname ) ) )
warcinfo_fields . append ( ' format: WARC File Format 1.0 ' )
warcinfo_fields . append ( ' robots: ignore ' ) # XXX implement robots support
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = ' \r \n ' . join ( warcinfo_fields ) + ' \r \n '
2013-10-15 18:37:26 -07:00
record = warctools . WarcRecord ( headers = headers , content = ( ' application/warc-fields ' , data ) )
2013-10-15 17:51:09 -07:00
2013-10-15 18:37:26 -07:00
return record
2013-10-15 15:52:26 -07:00
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer ( self ) :
if self . _fpath and os . path . getsize ( self . _fpath ) > self . size :
self . _close_writer ( )
if self . _f == None :
2013-10-15 17:51:09 -07:00
filename = ' {} - {} - {:05d} - {} - {} - {} .warc {} ' . format (
self . prefix , self . timestamp17 ( ) , self . _serial , os . getpid ( ) ,
socket . gethostname ( ) , self . port , ' .gz ' if self . gzip else ' ' )
2013-10-16 14:36:19 -07:00
self . _fpath = os . path . sep . join ( [ self . directory , filename + ' .open ' ] )
2013-10-15 15:52:26 -07:00
self . _f = open ( self . _fpath , ' wb ' )
2013-10-15 17:51:09 -07:00
warcinfo_record = self . _make_warcinfo_record ( filename )
warcinfo_record . write_to ( self . _f , gzip = self . gzip )
2013-10-15 15:52:26 -07:00
self . _serial + = 1
return self . _f
2013-10-15 10:54:18 -07:00
def run ( self ) :
2013-10-15 18:37:26 -07:00
logging . info ( ' WarcWriterThread starting, directory= {0} gzip= {1} prefix= {2} size= {3} port= {4} ' . format (
os . path . abspath ( self . directory ) , self . gzip , self . prefix , self . size , self . port ) )
2013-10-15 10:54:18 -07:00
while not self . stop . is_set ( ) :
try :
2013-10-17 18:12:33 -07:00
recordset = self . recordset_q . get ( block = True , timeout = 0.5 )
2013-10-17 01:58:07 -07:00
writer = self . _writer ( )
2013-10-17 18:12:33 -07:00
for record in recordset :
2013-10-17 01:58:07 -07:00
record . write_to ( writer , gzip = self . gzip )
2013-10-17 18:12:33 -07:00
logging . info ( ' wrote warc record {} ' . format ( record ) )
if record . content_file :
# XXX now we know we're done with this... messy to
# handle this here, but where else can it happen?
record . content_file . close ( )
2013-10-15 15:52:26 -07:00
self . _f . flush ( )
2013-10-15 10:54:18 -07:00
except Queue . Empty :
logging . info ( ' WarcWriterThread shutting down ' )
2013-10-15 15:52:26 -07:00
self . _close_writer ( ) ;
2012-07-19 11:08:14 -04:00
if __name__ == ' __main__ ' :
2013-10-17 18:12:33 -07:00
2013-10-15 15:52:26 -07:00
arg_parser = argparse . ArgumentParser ( description = ' warcprox - WARC writing MITM HTTP/S proxy ' ,
formatter_class = argparse . ArgumentDefaultsHelpFormatter )
2013-10-15 14:11:31 -07:00
arg_parser . add_argument ( ' -p ' , ' --port ' , dest = ' port ' , default = ' 8080 ' , help = ' port to listen on ' )
arg_parser . add_argument ( ' -b ' , ' --address ' , dest = ' address ' , default = ' localhost ' , help = ' address to listen on ' )
2013-10-16 14:36:19 -07:00
arg_parser . add_argument ( ' -c ' , ' --cacert ' , dest = ' cacert ' , default = ' ./warcprox-ca.pem ' , help = ' CA certificate file; if file does not exist, it will be created ' )
2013-10-16 15:36:53 -07:00
arg_parser . add_argument ( ' --certs-dir ' , dest = ' certs_dir ' , default = ' ./warcprox-ca ' , help = ' where to store and load generated certificates ' )
2013-10-16 14:36:19 -07:00
arg_parser . add_argument ( ' -d ' , ' --dir ' , dest = ' directory ' , default = ' ./warcs ' , help = ' where to write warcs ' )
2013-10-15 14:11:31 -07:00
arg_parser . add_argument ( ' -z ' , ' --gzip ' , dest = ' gzip ' , action = ' store_true ' , help = ' write gzip-compressed warc records ' )
arg_parser . add_argument ( ' -n ' , ' --prefix ' , dest = ' prefix ' , default = ' WARCPROX ' , help = ' WARC filename prefix ' )
2013-10-15 15:52:26 -07:00
arg_parser . add_argument ( ' -s ' , ' --size ' , dest = ' size ' , default = 1000 * 1000 * 1000 , help = ' WARC file rollover size threshold in bytes ' )
2013-10-17 02:51:51 -07:00
arg_parser . add_argument ( ' -v ' , ' --verbose ' , dest = ' verbose ' , action = ' store_true ' )
arg_parser . add_argument ( ' -q ' , ' --quiet ' , dest = ' quiet ' , action = ' store_true ' )
2013-10-15 14:11:31 -07:00
# [--ispartof=warcinfo ispartof]
# [--description=warcinfo description]
# [--operator=warcinfo operator]
# [--httpheader=warcinfo httpheader]
args = arg_parser . parse_args ( )
2013-10-17 02:51:51 -07:00
if args . verbose :
loglevel = logging . DEBUG
elif args . quiet :
loglevel = logging . WARNING
else :
loglevel = logging . INFO
2013-10-17 18:12:33 -07:00
logging . basicConfig ( stream = sys . stdout , level = loglevel ,
format = ' %(asctime)s %(process)d %(threadName)s %(levelname)s %(funcName)s ( %(filename)s : %(lineno)d ) %(message)s ' )
recordset_q = WarcRecordsetQueue ( )
2013-10-17 02:51:51 -07:00
2013-10-17 18:12:33 -07:00
proxy = WarcProxy ( server_address = ( args . address , int ( args . port ) ) ,
ca_file = args . cacert , certs_dir = args . certs_dir ,
recordset_q = recordset_q )
2013-10-15 10:54:18 -07:00
2013-10-17 18:12:33 -07:00
warc_writer = WarcWriterThread ( recordset_q = recordset_q ,
2013-10-16 19:10:04 -07:00
directory = args . directory , gzip = args . gzip , prefix = args . prefix ,
size = int ( args . size ) , port = int ( args . port ) )
2013-10-17 01:58:07 -07:00
proxy_thread = threading . Thread ( target = proxy . serve_forever , name = ' ProxyThread ' )
proxy_thread . start ( )
2013-10-15 10:54:18 -07:00
warc_writer . start ( )
2013-10-17 01:58:07 -07:00
stop = threading . Event ( )
signal . signal ( signal . SIGTERM , stop . set )
2012-07-19 11:08:14 -04:00
try :
2013-10-17 01:58:07 -07:00
while not stop . is_set ( ) :
time . sleep ( 0.5 )
except :
2013-10-15 10:54:18 -07:00
finally :
2013-10-17 01:58:07 -07:00
proxy . shutdown ( )
2013-10-15 10:54:18 -07:00
warc_writer . stop . set ( )
2012-07-19 11:08:14 -04:00
proxy . server_close ( )