From ebb9b6d625af4d0f0faed0b7a00a1c97d558d55d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Sat, 19 Oct 2013 15:25:42 -0700 Subject: [PATCH] new option --rollover-idle-time - WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data) (default: None) --- warcprox.py | 66 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/warcprox.py b/warcprox.py index 007906d..c335446 100755 --- a/warcprox.py +++ b/warcprox.py @@ -416,15 +416,20 @@ class WarcRecordsetQueue(Queue.Queue): class WarcWriterThread(threading.Thread): - def __init__(self, recordset_q, directory, gzip, prefix, size, port): + # port is only used for warc filename + def __init__(self, recordset_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0): threading.Thread.__init__(self, name='WarcWriterThread') self.recordset_q = recordset_q - self.directory = directory + self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + self.gzip = gzip + + # warc path and filename stuff + self.directory = directory self.prefix = prefix - self.size = size self.port = port self._f = None @@ -480,7 +485,7 @@ class WarcWriterThread(threading.Thread): # def _writer(self): - if self._fpath and os.path.getsize(self._fpath) > self.size: + if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: self._close_writer() if self._f == None: @@ -500,12 +505,18 @@ class WarcWriterThread(threading.Thread): def run(self): - logging.info('WarcWriterThread starting, directory={0} gzip={1} prefix={2} size={3} port={4}'.format( - os.path.abspath(self.directory), self.gzip, self.prefix, self.size, self.port)) + logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( + os.path.abspath(self.directory), self.gzip, self.rollover_size, + self.rollover_idle_time, self.prefix, self.port)) + + self._last_activity = time.time() while not self.stop.is_set(): try: recordset = self.recordset_q.get(block=True, timeout=0.5) + + self._last_activity = time.time() + writer = self._writer() for record in recordset: @@ -524,7 +535,12 @@ class WarcWriterThread(threading.Thread): self._f.flush() except Queue.Empty: - pass + if (self._fpath is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + logging.info('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) + self._close_writer() logging.info('WarcWriterThread shutting down') self._close_writer(); @@ -532,16 +548,31 @@ class WarcWriterThread(threading.Thread): if __name__ == '__main__': - arg_parser = argparse.ArgumentParser(description='warcprox - WARC writing MITM HTTP/S proxy', + arg_parser = argparse.ArgumentParser( + description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - arg_parser.add_argument('-p', '--port', dest='port', default='8080', help='port to listen on') - arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on') - arg_parser.add_argument('-c', '--cacert', dest='cacert', default='./warcprox-ca.pem', help='CA certificate file; if file does not exist, it will be created') - arg_parser.add_argument('--certs-dir', dest='certs_dir', default='./warcprox-ca', help='where to store and load generated certificates') - arg_parser.add_argument('-d', '--dir', dest='directory', default='./warcs', help='where to write warcs') - arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') - arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') - arg_parser.add_argument('-s', '--size', dest='size', default=1000*1000*1000, help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('-p', '--port', dest='port', default='8080', + help='port to listen on') + arg_parser.add_argument('-b', '--address', dest='address', + default='localhost', help='address to listen on') + arg_parser.add_argument('-c', '--cacert', dest='cacert', + default='./warcprox-ca.pem', + help='CA certificate file; if file does not exist, it will be created') + arg_parser.add_argument('--certs-dir', dest='certs_dir', + default='./warcprox-ca', + help='where to store and load generated certificates') + arg_parser.add_argument('-d', '--dir', dest='directory', + default='./warcs', help='where to write warcs') + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', + help='write gzip-compressed warc records') + arg_parser.add_argument('-n', '--prefix', dest='prefix', + default='WARCPROX', help='WARC filename prefix') + arg_parser.add_argument('-s', '--size', dest='size', + default=1000*1000*1000, + help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, + help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') # [--ispartof=warcinfo ispartof] @@ -568,7 +599,8 @@ if __name__ == '__main__': warc_writer = WarcWriterThread(recordset_q=recordset_q, directory=args.directory, gzip=args.gzip, prefix=args.prefix, - size=int(args.size), port=int(args.port)) + port=int(args.port), rollover_size=int(args.size), + rollover_idle_time=int(args.rollover_idle_time)) proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread') proxy_thread.start()