Merge branch 'master' of github.com:nlevitt/warcprox

This commit is contained in:
Noah Levitt 2013-10-21 15:09:05 -07:00
commit bb148cce4c
2 changed files with 90 additions and 26 deletions

View File

@ -49,3 +49,32 @@ incorporated into warctools mainline.
1000000000)
-v, --verbose
-q, --quiet
###To do
- integration tests, unit tests
- url-agnostic deduplication
- unchunk and/or ungzip before storing payload, or alter request to discourage server from chunking/gzipping
- check certs from proxied website, like browser does, and present browser-like warning if appropriate
- keep statistics, produce reports
- write cdx while crawling?
- performance testing
- base32 sha1 like heritrix?
- configurable timeouts and stuff
- evaluate ipv6 support
- more explicit handling of connection closed exception during transfer? other error cases?
- dns cache?? the system already does a fine job I'm thinking
- keepalive with remote servers?
- python3
#### To not do
The features below could also be part of warcprox. But maybe they don't belong
here, since this is a proxy, not a crawler/robot. It can be used by a human
with a browser, or by something automated, i.e. a robot. My feeling is that
it's more appropriate to implement these in the robot.
- politeness, i.e. throttle requests per server
- fetch and obey robots.txt
- alter user-agent, maybe insert something like "warcprox mitm archiving proxy; +http://archive.org/details/archive.org_bot"

View File

@ -310,11 +310,13 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
while buf != '':
buf = h.read(8192)
remote_ip = self._proxy_sock.getpeername()[0]
# Let's close off the remote end
h.close()
self._proxy_sock.close()
self.server.recordset_q.create_and_queue(self.url, req, h.recorder)
self.server.recordset_q.create_and_queue(self.url, req, h.recorder, remote_ip)
def __getattr__(self, item):
@ -352,13 +354,14 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
# consecutively in the same warc.
class WarcRecordsetQueue(Queue.Queue):
def create_and_queue(self, url, request_data, response_recorder):
def create_and_queue(self, url, request_data, response_recorder, remote_ip):
warc_date = warctools.warc.warc_datetime_str(datetime.now())
response_record, response_record_id = self.make_record(url=url,
warc_date=warc_date, recorder=response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type="application/http;msgtype=response")
warc_type=warctools.WarcRecord.RESPONSE,
content_type="application/http;msgtype=response",
remote_ip=remote_ip)
request_record, request_record_id = self.make_record(url=url,
warc_date=warc_date, data=request_data,
@ -372,7 +375,7 @@ class WarcRecordsetQueue(Queue.Queue):
@staticmethod
def make_record(url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None):
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None):
if warc_date is None:
warc_date = warctools.warc.warc_datetime_str(datetime.now())
@ -380,12 +383,13 @@ class WarcRecordsetQueue(Queue.Queue):
record_id = warctools.WarcRecord.random_warc_uuid()
headers = []
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
headers.append((warctools.WarcRecord.ID, record_id))
headers.append((warctools.WarcRecord.DATE, warc_date))
headers.append((warctools.WarcRecord.URL, url))
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
if warc_type is not None:
headers.append((warctools.WarcRecord.TYPE, warc_type))
if remote_ip is not None:
headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip))
if concurrent_to is not None:
headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to))
if content_type is not None:
@ -412,15 +416,20 @@ class WarcRecordsetQueue(Queue.Queue):
class WarcWriterThread(threading.Thread):
def __init__(self, recordset_q, directory, gzip, prefix, size, port):
# port is only used for warc filename
def __init__(self, recordset_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recordset_q = recordset_q
self.directory = directory
self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self.gzip = gzip
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
self.size = size
self.port = port
self._f = None
@ -457,7 +466,6 @@ class WarcWriterThread(threading.Thread):
headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO))
headers.append((warctools.WarcRecord.FILENAME, filename))
headers.append((warctools.WarcRecord.DATE, warc_record_date))
# headers.append((warctools.WarcRecord.IP_ADDRESS, ip))
warcinfo_fields = []
warcinfo_fields.append('software: warcprox.py https://github.com/nlevitt/warcprox')
@ -477,7 +485,7 @@ class WarcWriterThread(threading.Thread):
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.size:
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
self._close_writer()
if self._f == None:
@ -497,12 +505,18 @@ class WarcWriterThread(threading.Thread):
def run(self):
logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format(
os.path.abspath(self.directory), self.gzip, self.prefix, self.size, self.port))
logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.directory), self.gzip, self.rollover_size,
self.rollover_idle_time, self.prefix, self.port))
self._last_activity = time.time()
while not self.stop.is_set():
try:
recordset = self.recordset_q.get(block=True, timeout=0.5)
self._last_activity = time.time()
writer = self._writer()
for record in recordset:
@ -521,7 +535,12 @@ class WarcWriterThread(threading.Thread):
self._f.flush()
except Queue.Empty:
pass
if (self._fpath is not None
and self.rollover_idle_time is not None
and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time):
logging.info('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
self._close_writer()
logging.info('WarcWriterThread shutting down')
self._close_writer();
@ -529,16 +548,31 @@ class WarcWriterThread(threading.Thread):
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy',
arg_parser = argparse.ArgumentParser(
description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on')
arg_parser.add_argument('-c', '--cacert', dest='cacert', default='./warcprox-ca.pem', help='CA certificate file; if file does not exist, it will be created')
arg_parser.add_argument('--certs-dir', dest='certs_dir', default='./warcprox-ca', help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory', default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('-p', '--port', dest='port', default='8080',
help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address',
default='localhost', help='address to listen on')
arg_parser.add_argument('-c', '--cacert', dest='cacert',
default='./warcprox-ca.pem',
help='CA certificate file; if file does not exist, it will be created')
arg_parser.add_argument('--certs-dir', dest='certs_dir',
default='./warcprox-ca',
help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory',
default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-s', '--size', dest='size',
default=1000*1000*1000,
help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# [--ispartof=warcinfo ispartof]
@ -565,7 +599,8 @@ if __name__ == '__main__':
warc_writer = WarcWriterThread(recordset_q=recordset_q,
directory=args.directory, gzip=args.gzip, prefix=args.prefix,
size=int(args.size), port=int(args.port))
port=int(args.port), rollover_size=int(args.size),
rollover_idle_time=int(args.rollover_idle_time))
proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread')
proxy_thread.start()