diff --git a/warcprox/dedup.py b/warcprox/dedup.py index dea2fbb..731f618 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -28,8 +28,10 @@ class DedupDb(object): self.db.close() def sync(self): - if hasattr(self.db, 'sync'): + try: self.db.sync() + except: + pass def save(self, key, response_record, offset): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') diff --git a/warcprox/playback.py b/warcprox/playback.py index fc7e479..65e1d28 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -210,8 +210,10 @@ class PlaybackIndexDb(object): self.db.close() def sync(self): - if hasattr(self.db, 'sync'): + try: self.db.sync() + except: + pass def save(self, warcfile, recordset, offset): response_record = recordset[0] diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index e2364d0..35f48e4 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -139,11 +139,13 @@ class WarcproxTest(unittest.TestCase): self._dedup_db_file = f.name dedup_db = warcprox.dedup.DedupDb(self._dedup_db_file) - warc_writer = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, - directory=self._warcs_dir, port=proxy.server_port, - dedup_db=dedup_db, playback_index_db=playback_index_db) + warc_writer = warcprox.warcwriter.WarcWriter(directory=self._warcs_dir, + port=proxy.server_port, dedup_db=dedup_db, + playback_index_db=playback_index_db) + warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, + warc_writer=warc_writer) - self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer, playback_proxy) + self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer_thread, playback_proxy) self.logger.info('starting warcprox') self.warcprox_thread = threading.Thread(name='WarcproxThread', target=self.warcprox.run_until_shutdown) @@ -279,7 +281,7 @@ class WarcproxTest(unittest.TestCase): self.assertEqual(response.content, b'404 Not in Archive\n') # check not in dedup db - dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') self.assertIsNone(dedup_lookup) # archive @@ -296,7 +298,7 @@ class WarcproxTest(unittest.TestCase): # check in dedup db # {u'i': u'', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'} - dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertRegexpMatches(dedup_lookup['i'], br'^$') self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$') @@ -317,7 +319,7 @@ class WarcproxTest(unittest.TestCase): time.sleep(2.0) # check in dedup db (no change from prev) - dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') + dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertEqual(dedup_lookup['i'], record_id) self.assertEqual(dedup_lookup['d'], dedup_date) @@ -341,7 +343,7 @@ class WarcproxTest(unittest.TestCase): self.assertEqual(response.content, b'404 Not in Archive\n') # check not in dedup db - dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') self.assertIsNone(dedup_lookup) # archive @@ -358,7 +360,7 @@ class WarcproxTest(unittest.TestCase): # check in dedup db # {u'i': u'', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'} - dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertRegexpMatches(dedup_lookup['i'], br'^$') self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$') @@ -379,7 +381,7 @@ class WarcproxTest(unittest.TestCase): time.sleep(2.0) # check in dedup db (no change from prev) - dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') + dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertEqual(dedup_lookup['i'], record_id) self.assertEqual(dedup_lookup['d'], dedup_date) diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 35dee56..76f7c78 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -43,12 +43,11 @@ import tempfile import json import traceback -import warcprox -from warcprox.mitmproxy import MitmProxyHandler -from warcprox.dedup import DedupDb -from warcprox.certauth import CertificateAuthority -from warcprox.warcwriter import WarcWriterThread -from warcprox import playback +import warcprox.certauth +import warcprox.mitmproxy +import warcprox.playback +import warcprox.dedup +import warcprox.warcwriter class ProxyingRecorder(object): @@ -159,7 +158,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp = self.recorder -class WarcProxyHandler(MitmProxyHandler): +class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): logger = logging.getLogger(__module__ + "." + __qualname__) def _proxy_request(self): @@ -243,7 +242,7 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): if ca is not None: self.ca = ca else: - self.ca = CertificateAuthority() + self.ca = warcprox.certauth.CertificateAuthority() if recorded_url_q is not None: self.recorded_url_q = recorded_url_q @@ -262,13 +261,13 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): class WarcproxController(object): logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, proxy=None, warc_writer=None, playback_proxy=None): + def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None): """ Create warcprox controller. - If supplied, proxy should be an instance of WarcProxy, and warc_writer - should be an instance of WarcWriterThread. If not supplied, they are - created with default values. + If supplied, proxy should be an instance of WarcProxy, and + warc_writer_thread should be an instance of WarcWriterThread. If not + supplied, they are created with default values. If supplied, playback_proxy should be an instance of PlaybackProxy. If not supplied, no playback proxy will run. @@ -278,10 +277,10 @@ class WarcproxController(object): else: self.proxy = WarcProxy() - if warc_writer is not None: - self.warc_writer = warc_writer + if warc_writer_thread is not None: + self.warc_writer_thread = warc_writer_thread else: - self.warc_writer = WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) + self.warc_writer_thread = WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) self.playback_proxy = playback_proxy @@ -294,7 +293,7 @@ class WarcproxController(object): """ proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') proxy_thread.start() - self.warc_writer.start() + self.warc_writer_thread.start() if self.playback_proxy is not None: playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread') @@ -314,12 +313,12 @@ class WarcproxController(object): except: pass finally: - self.warc_writer.stop.set() + self.warc_writer_thread.stop.set() self.proxy.shutdown() self.proxy.server_close() - if self.warc_writer.dedup_db is not None: - self.warc_writer.dedup_db.close() + if self.warc_writer_thread.warc_writer.dedup_db is not None: + self.warc_writer_thread.warc_writer.dedup_db.close() if self.playback_proxy is not None: self.playback_proxy.shutdown() @@ -328,7 +327,7 @@ class WarcproxController(object): self.playback_proxy.playback_index_db.close() # wait for threads to finish - self.warc_writer.join() + self.warc_writer_thread.join() proxy_thread.join() if self.playback_proxy is not None: playback_proxy_thread.join() @@ -411,36 +410,37 @@ def main(argv=sys.argv): logging.info('deduplication disabled') dedup_db = None else: - dedup_db = DedupDb(args.dedup_db_file) + dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file) recorded_url_q = queue.Queue() - ca = CertificateAuthority(args.cacert, args.certs_dir) + ca = warcprox.certauth.CertificateAuthority(args.cacert, args.certs_dir) proxy = WarcProxy(server_address=(args.address, int(args.port)), ca=ca, recorded_url_q=recorded_url_q, digest_algorithm=args.digest_algorithm) if args.playback_port is not None: - playback_index_db = playback.PlaybackIndexDb(args.playback_index_db_file) + playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file) playback_server_address=(args.address, int(args.playback_port)) - playback_proxy = playback.PlaybackProxy(server_address=playback_server_address, + playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address, ca=ca, playback_index_db=playback_index_db, warcs_dir=args.directory) else: playback_index_db = None playback_proxy = None - warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, - directory=args.directory, gzip=args.gzip, prefix=args.prefix, - port=int(args.port), rollover_size=int(args.size), - rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, - base32=args.base32, dedup_db=dedup_db, - digest_algorithm=args.digest_algorithm, + warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory, + gzip=args.gzip, prefix=args.prefix, port=int(args.port), + rollover_size=int(args.size), base32=args.base32, + dedup_db=dedup_db, digest_algorithm=args.digest_algorithm, playback_index_db=playback_index_db) + warc_writer_thread = warcprox.warcwriter.WarcWriterThread( + recorded_url_q=recorded_url_q, warc_writer=warc_writer, + rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) - warcprox = WarcproxController(proxy, warc_writer, playback_proxy) - warcprox.run_until_shutdown() + controller = WarcproxController(proxy, warc_writer_thread, playback_proxy) + controller.run_until_shutdown() if __name__ == '__main__': diff --git a/warcprox/warcwriter.py b/warcprox/warcwriter.py index d7b4da6..80c9b9e 100644 --- a/warcprox/warcwriter.py +++ b/warcprox/warcwriter.py @@ -17,22 +17,16 @@ import hanzo.httptools from hanzo import warctools import warcprox - -class WarcWriterThread(threading.Thread): +class WarcWriter: logger = logging.getLogger(__module__ + "." + __qualname__) # port is only used for warc filename - def __init__(self, recorded_url_q=None, directory='./warcs', - rollover_size=1000000000, rollover_idle_time=None, gzip=False, - prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False, - dedup_db=None, playback_index_db=None): - - threading.Thread.__init__(self, name='WarcWriterThread') - - self.recorded_url_q = recorded_url_q + def __init__(self, directory='./warcs', rollover_size=1000000000, + gzip=False, prefix='WARCPROX', port=0, + digest_algorithm='sha1', base32=False, dedup_db=None, + playback_index_db=None): self.rollover_size = rollover_size - self.rollover_idle_time = rollover_idle_time self.gzip = gzip self.digest_algorithm = digest_algorithm @@ -54,8 +48,6 @@ class WarcWriterThread(threading.Thread): self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) os.mkdir(directory) - self.stop = threading.Event() - # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record def build_warc_records(self, recorded_url): @@ -165,7 +157,7 @@ class WarcWriterThread(threading.Thread): now = datetime.utcnow() return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) - def _close_writer(self): + def close_writer(self): if self._fpath: self.logger.info('closing {0}'.format(self._f_finalname)) self._f.close() @@ -204,7 +196,7 @@ class WarcWriterThread(threading.Thread): # def _writer(self): if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: - self._close_writer() + self.close_writer() if self._f == None: self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( @@ -235,53 +227,69 @@ class WarcWriterThread(threading.Thread): recorded_url.response_recorder.tempfile.close() + def write_records(self, recorded_url): + recordset = self.build_warc_records(recorded_url) + + writer = self._writer() + recordset_offset = writer.tell() + + for record in recordset: + offset = writer.tell() + record.write_to(writer, gzip=self.gzip) + self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format( + record.get_header(warctools.WarcRecord.TYPE), + record.get_header(warctools.WarcRecord.CONTENT_LENGTH), + record.get_header(warctools.WarcRecord.URL), + self._fpath, offset)) + + self._f.flush() + + self._final_tasks(recorded_url, recordset, recordset_offset) + + + +class WarcWriterThread(threading.Thread): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, recorded_url_q=None, warc_writer=None, rollover_idle_time=None): + """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" + threading.Thread.__init__(self, name='WarcWriterThread') + self.recorded_url_q = recorded_url_q + self.rollover_idle_time = rollover_idle_time + self.stop = threading.Event() + if warc_writer: + self.warc_writer = warc_writer + else: + self.warc_writer = WarcWriter() + def run(self): self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.directory), self.gzip, self.rollover_size, - self.rollover_idle_time, self.prefix, self.port)) + os.path.abspath(self.warc_writer.directory), self.warc_writer.gzip, self.warc_writer.rollover_size, + self.rollover_idle_time, self.warc_writer.prefix, self.warc_writer.port)) self._last_sync = self._last_activity = time.time() while not self.stop.is_set(): try: recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - + self.warc_writer.write_records(recorded_url) self._last_activity = time.time() - - recordset = self.build_warc_records(recorded_url) - - writer = self._writer() - recordset_offset = writer.tell() - - for record in recordset: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) - self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format( - record.get_header(warctools.WarcRecord.TYPE), - record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - self._fpath, offset)) - - self._f.flush() - - self._final_tasks(recorded_url, recordset, recordset_offset) - except queue.Empty: - if (self._fpath is not None + if (self.warc_writer._fpath is not None and self.rollover_idle_time is not None and self.rollover_idle_time > 0 and time.time() - self._last_activity > self.rollover_idle_time): self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) - self._close_writer() + self.warc_writer.close_writer() if time.time() - self._last_sync > 60: - if self.dedup_db: - self.dedup_db.sync() - if self.playback_index_db: - self.playback_index_db.sync() + if self.warc_writer.dedup_db: + self.warc_writer.dedup_db.sync() + if self.warc_writer.playback_index_db: + self.warc_writer.playback_index_db.sync() self._last_sync = time.time() self.logger.info('WarcWriterThread shutting down') - self._close_writer(); + self.warc_writer.close_writer();