diff --git a/README.md b/README.md index 88c2bea..39a3e04 100644 --- a/README.md +++ b/README.md @@ -49,3 +49,32 @@ incorporated into warctools mainline. 1000000000) -v, --verbose -q, --quiet + +###To do + +- integration tests, unit tests +- url-agnostic deduplication +- unchunk and/or ungzip before storing payload, or alter request to discourage server from chunking/gzipping +- check certs from proxied website, like browser does, and present browser-like warning if appropriate +- keep statistics, produce reports +- write cdx while crawling? +- performance testing +- base32 sha1 like heritrix? +- configurable timeouts and stuff +- evaluate ipv6 support +- more explicit handling of connection closed exception during transfer? other error cases? +- dns cache?? the system already does a fine job I'm thinking +- keepalive with remote servers? +- python3 + +#### To not do + +The features below could also be part of warcprox. But maybe they don't belong +here, since this is a proxy, not a crawler/robot. It can be used by a human +with a browser, or by something automated, i.e. a robot. My feeling is that +it's more appropriate to implement these in the robot. + +- politeness, i.e. throttle requests per server +- fetch and obey robots.txt +- alter user-agent, maybe insert something like "warcprox mitm archiving proxy; +http://archive.org/details/archive.org_bot" + diff --git a/warcprox.py b/warcprox.py index f64a915..f485215 100755 --- a/warcprox.py +++ b/warcprox.py @@ -310,11 +310,13 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): while buf != '': buf = h.read(8192) + remote_ip = self._proxy_sock.getpeername()[0] + # Let's close off the remote end h.close() self._proxy_sock.close() - self.server.recordset_q.create_and_queue(self.url, req, h.recorder) + self.server.recordset_q.create_and_queue(self.url, req, h.recorder, remote_ip) def __getattr__(self, item): @@ -352,13 +354,14 @@ class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): # consecutively in the same warc. class WarcRecordsetQueue(Queue.Queue): - def create_and_queue(self, url, request_data, response_recorder): + def create_and_queue(self, url, request_data, response_recorder, remote_ip): warc_date = warctools.warc.warc_datetime_str(datetime.now()) response_record, response_record_id = self.make_record(url=url, warc_date=warc_date, recorder=response_recorder, - warc_type=warctools.WarcRecord.RESPONSE, - content_type="application/http;msgtype=response") + warc_type=warctools.WarcRecord.RESPONSE, + content_type="application/http;msgtype=response", + remote_ip=remote_ip) request_record, request_record_id = self.make_record(url=url, warc_date=warc_date, data=request_data, @@ -372,7 +375,7 @@ class WarcRecordsetQueue(Queue.Queue): @staticmethod def make_record(url, warc_date=None, recorder=None, data=None, - concurrent_to=None, warc_type=None, content_type=None): + concurrent_to=None, warc_type=None, content_type=None, remote_ip=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.now()) @@ -380,12 +383,13 @@ class WarcRecordsetQueue(Queue.Queue): record_id = warctools.WarcRecord.random_warc_uuid() headers = [] + if warc_type is not None: + headers.append((warctools.WarcRecord.TYPE, warc_type)) headers.append((warctools.WarcRecord.ID, record_id)) headers.append((warctools.WarcRecord.DATE, warc_date)) headers.append((warctools.WarcRecord.URL, url)) - # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) - if warc_type is not None: - headers.append((warctools.WarcRecord.TYPE, warc_type)) + if remote_ip is not None: + headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if concurrent_to is not None: headers.append((warctools.WarcRecord.CONCURRENT_TO, concurrent_to)) if content_type is not None: @@ -412,15 +416,20 @@ class WarcRecordsetQueue(Queue.Queue): class WarcWriterThread(threading.Thread): - def __init__(self, recordset_q, directory, gzip, prefix, size, port): + # port is only used for warc filename + def __init__(self, recordset_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0): threading.Thread.__init__(self, name='WarcWriterThread') self.recordset_q = recordset_q - self.directory = directory + self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + self.gzip = gzip + + # warc path and filename stuff + self.directory = directory self.prefix = prefix - self.size = size self.port = port self._f = None @@ -457,7 +466,6 @@ class WarcWriterThread(threading.Thread): headers.append((warctools.WarcRecord.TYPE, warctools.WarcRecord.WARCINFO)) headers.append((warctools.WarcRecord.FILENAME, filename)) headers.append((warctools.WarcRecord.DATE, warc_record_date)) - # headers.append((warctools.WarcRecord.IP_ADDRESS, ip)) warcinfo_fields = [] warcinfo_fields.append('software: warcprox.py https://github.com/nlevitt/warcprox') @@ -477,7 +485,7 @@ class WarcWriterThread(threading.Thread): # def _writer(self): - if self._fpath and os.path.getsize(self._fpath) > self.size: + if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: self._close_writer() if self._f == None: @@ -497,12 +505,18 @@ class WarcWriterThread(threading.Thread): def run(self): - logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format( - os.path.abspath(self.directory), self.gzip, self.prefix, self.size, self.port)) + logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( + os.path.abspath(self.directory), self.gzip, self.rollover_size, + self.rollover_idle_time, self.prefix, self.port)) + + self._last_activity = time.time() while not self.stop.is_set(): try: recordset = self.recordset_q.get(block=True, timeout=0.5) + + self._last_activity = time.time() + writer = self._writer() for record in recordset: @@ -521,7 +535,12 @@ class WarcWriterThread(threading.Thread): self._f.flush() except Queue.Empty: - pass + if (self._fpath is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + logging.info('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) + self._close_writer() logging.info('WarcWriterThread shutting down') self._close_writer(); @@ -529,16 +548,31 @@ class WarcWriterThread(threading.Thread): if __name__ == '__main__': - arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy', + arg_parser = argparse.ArgumentParser( + description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on') - arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on') - arg_parser.add_argument('-c', '--cacert', dest='cacert', default='./warcprox-ca.pem', help='CA certificate file; if file does not exist, it will be created') - arg_parser.add_argument('--certs-dir', dest='certs_dir', default='./warcprox-ca', help='where to store and load generated certificates') - arg_parser.add_argument('-d', '--dir', dest='directory', default='./warcs', help='where to write warcs') - arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') - arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') - arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('-p', '--port', dest='port', default='8080', + help='port to listen on') + arg_parser.add_argument('-b', '--address', dest='address', + default='localhost', help='address to listen on') + arg_parser.add_argument('-c', '--cacert', dest='cacert', + default='./warcprox-ca.pem', + help='CA certificate file; if file does not exist, it will be created') + arg_parser.add_argument('--certs-dir', dest='certs_dir', + default='./warcprox-ca', + help='where to store and load generated certificates') + arg_parser.add_argument('-d', '--dir', dest='directory', + default='./warcs', help='where to write warcs') + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', + help='write gzip-compressed warc records') + arg_parser.add_argument('-n', '--prefix', dest='prefix', + default='WARCPROX', help='WARC filename prefix') + arg_parser.add_argument('-s', '--size', dest='size', + default=1000*1000*1000, + help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, + help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') # [--ispartof=warcinfo ispartof] @@ -565,7 +599,8 @@ if __name__ == '__main__': warc_writer = WarcWriterThread(recordset_q=recordset_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, - size=int(args.size), port=int(args.port)) + port=int(args.port), rollover_size=int(args.size), + rollover_idle_time=int(args.rollover_idle_time)) proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start()