separate WarcWriter and WarcWriterThread

This commit is contained in:
Noah Levitt 2014-11-15 04:47:26 -08:00
parent b34edf8fb1
commit 9b8ffbbb51
5 changed files with 102 additions and 88 deletions

View File

@ -28,8 +28,10 @@ class DedupDb(object):
self.db.close() self.db.close()
def sync(self): def sync(self):
if hasattr(self.db, 'sync'): try:
self.db.sync() self.db.sync()
except:
pass
def save(self, key, response_record, offset): def save(self, key, response_record, offset):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')

View File

@ -210,8 +210,10 @@ class PlaybackIndexDb(object):
self.db.close() self.db.close()
def sync(self): def sync(self):
if hasattr(self.db, 'sync'): try:
self.db.sync() self.db.sync()
except:
pass
def save(self, warcfile, recordset, offset): def save(self, warcfile, recordset, offset):
response_record = recordset[0] response_record = recordset[0]

View File

@ -139,11 +139,13 @@ class WarcproxTest(unittest.TestCase):
self._dedup_db_file = f.name self._dedup_db_file = f.name
dedup_db = warcprox.dedup.DedupDb(self._dedup_db_file) dedup_db = warcprox.dedup.DedupDb(self._dedup_db_file)
warc_writer = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, warc_writer = warcprox.warcwriter.WarcWriter(directory=self._warcs_dir,
directory=self._warcs_dir, port=proxy.server_port, port=proxy.server_port, dedup_db=dedup_db,
dedup_db=dedup_db, playback_index_db=playback_index_db) playback_index_db=playback_index_db)
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
warc_writer=warc_writer)
self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer, playback_proxy) self.warcprox = warcprox.warcprox.WarcproxController(proxy, warc_writer_thread, playback_proxy)
self.logger.info('starting warcprox') self.logger.info('starting warcprox')
self.warcprox_thread = threading.Thread(name='WarcproxThread', self.warcprox_thread = threading.Thread(name='WarcproxThread',
target=self.warcprox.run_until_shutdown) target=self.warcprox.run_until_shutdown)
@ -279,7 +281,7 @@ class WarcproxTest(unittest.TestCase):
self.assertEqual(response.content, b'404 Not in Archive\n') self.assertEqual(response.content, b'404 Not in Archive\n')
# check not in dedup db # check not in dedup db
dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
self.assertIsNone(dedup_lookup) self.assertIsNone(dedup_lookup)
# archive # archive
@ -296,7 +298,7 @@ class WarcproxTest(unittest.TestCase):
# check in dedup db # check in dedup db
# {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'} # {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'}
dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
self.assertRegexpMatches(dedup_lookup['i'], br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$') self.assertRegexpMatches(dedup_lookup['i'], br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$')
self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$') self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$')
@ -317,7 +319,7 @@ class WarcproxTest(unittest.TestCase):
time.sleep(2.0) time.sleep(2.0)
# check in dedup db (no change from prev) # check in dedup db (no change from prev)
dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
self.assertEqual(dedup_lookup['i'], record_id) self.assertEqual(dedup_lookup['i'], record_id)
self.assertEqual(dedup_lookup['d'], dedup_date) self.assertEqual(dedup_lookup['d'], dedup_date)
@ -341,7 +343,7 @@ class WarcproxTest(unittest.TestCase):
self.assertEqual(response.content, b'404 Not in Archive\n') self.assertEqual(response.content, b'404 Not in Archive\n')
# check not in dedup db # check not in dedup db
dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
self.assertIsNone(dedup_lookup) self.assertIsNone(dedup_lookup)
# archive # archive
@ -358,7 +360,7 @@ class WarcproxTest(unittest.TestCase):
# check in dedup db # check in dedup db
# {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'} # {u'i': u'<urn:uuid:e691dc0f-4bb9-4ad8-9afb-2af836aa05e4>', u'u': u'https://localhost:62841/c/d', u'd': u'2013-11-22T00:14:37Z'}
dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
self.assertRegexpMatches(dedup_lookup['i'], br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$') self.assertRegexpMatches(dedup_lookup['i'], br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$')
self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$') self.assertRegexpMatches(dedup_lookup['d'], br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$')
@ -379,7 +381,7 @@ class WarcproxTest(unittest.TestCase):
time.sleep(2.0) time.sleep(2.0)
# check in dedup db (no change from prev) # check in dedup db (no change from prev)
dedup_lookup = self.warcprox.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') dedup_lookup = self.warcprox.warc_writer_thread.warc_writer.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
self.assertEqual(dedup_lookup['u'], url.encode('ascii')) self.assertEqual(dedup_lookup['u'], url.encode('ascii'))
self.assertEqual(dedup_lookup['i'], record_id) self.assertEqual(dedup_lookup['i'], record_id)
self.assertEqual(dedup_lookup['d'], dedup_date) self.assertEqual(dedup_lookup['d'], dedup_date)

View File

@ -43,12 +43,11 @@ import tempfile
import json import json
import traceback import traceback
import warcprox import warcprox.certauth
from warcprox.mitmproxy import MitmProxyHandler import warcprox.mitmproxy
from warcprox.dedup import DedupDb import warcprox.playback
from warcprox.certauth import CertificateAuthority import warcprox.dedup
from warcprox.warcwriter import WarcWriterThread import warcprox.warcwriter
from warcprox import playback
class ProxyingRecorder(object): class ProxyingRecorder(object):
@ -159,7 +158,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.fp = self.recorder self.fp = self.recorder
class WarcProxyHandler(MitmProxyHandler): class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
def _proxy_request(self): def _proxy_request(self):
@ -243,7 +242,7 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
if ca is not None: if ca is not None:
self.ca = ca self.ca = ca
else: else:
self.ca = CertificateAuthority() self.ca = warcprox.certauth.CertificateAuthority()
if recorded_url_q is not None: if recorded_url_q is not None:
self.recorded_url_q = recorded_url_q self.recorded_url_q = recorded_url_q
@ -262,13 +261,13 @@ class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
class WarcproxController(object): class WarcproxController(object):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, proxy=None, warc_writer=None, playback_proxy=None): def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None):
""" """
Create warcprox controller. Create warcprox controller.
If supplied, proxy should be an instance of WarcProxy, and warc_writer If supplied, proxy should be an instance of WarcProxy, and
should be an instance of WarcWriterThread. If not supplied, they are warc_writer_thread should be an instance of WarcWriterThread. If not
created with default values. supplied, they are created with default values.
If supplied, playback_proxy should be an instance of PlaybackProxy. If not If supplied, playback_proxy should be an instance of PlaybackProxy. If not
supplied, no playback proxy will run. supplied, no playback proxy will run.
@ -278,10 +277,10 @@ class WarcproxController(object):
else: else:
self.proxy = WarcProxy() self.proxy = WarcProxy()
if warc_writer is not None: if warc_writer_thread is not None:
self.warc_writer = warc_writer self.warc_writer_thread = warc_writer_thread
else: else:
self.warc_writer = WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) self.warc_writer_thread = WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q)
self.playback_proxy = playback_proxy self.playback_proxy = playback_proxy
@ -294,7 +293,7 @@ class WarcproxController(object):
""" """
proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread') proxy_thread = threading.Thread(target=self.proxy.serve_forever, name='ProxyThread')
proxy_thread.start() proxy_thread.start()
self.warc_writer.start() self.warc_writer_thread.start()
if self.playback_proxy is not None: if self.playback_proxy is not None:
playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread') playback_proxy_thread = threading.Thread(target=self.playback_proxy.serve_forever, name='PlaybackProxyThread')
@ -314,12 +313,12 @@ class WarcproxController(object):
except: except:
pass pass
finally: finally:
self.warc_writer.stop.set() self.warc_writer_thread.stop.set()
self.proxy.shutdown() self.proxy.shutdown()
self.proxy.server_close() self.proxy.server_close()
if self.warc_writer.dedup_db is not None: if self.warc_writer_thread.warc_writer.dedup_db is not None:
self.warc_writer.dedup_db.close() self.warc_writer_thread.warc_writer.dedup_db.close()
if self.playback_proxy is not None: if self.playback_proxy is not None:
self.playback_proxy.shutdown() self.playback_proxy.shutdown()
@ -328,7 +327,7 @@ class WarcproxController(object):
self.playback_proxy.playback_index_db.close() self.playback_proxy.playback_index_db.close()
# wait for threads to finish # wait for threads to finish
self.warc_writer.join() self.warc_writer_thread.join()
proxy_thread.join() proxy_thread.join()
if self.playback_proxy is not None: if self.playback_proxy is not None:
playback_proxy_thread.join() playback_proxy_thread.join()
@ -411,36 +410,37 @@ def main(argv=sys.argv):
logging.info('deduplication disabled') logging.info('deduplication disabled')
dedup_db = None dedup_db = None
else: else:
dedup_db = DedupDb(args.dedup_db_file) dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file)
recorded_url_q = queue.Queue() recorded_url_q = queue.Queue()
ca = CertificateAuthority(args.cacert, args.certs_dir) ca = warcprox.certauth.CertificateAuthority(args.cacert, args.certs_dir)
proxy = WarcProxy(server_address=(args.address, int(args.port)), proxy = WarcProxy(server_address=(args.address, int(args.port)),
ca=ca, recorded_url_q=recorded_url_q, ca=ca, recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm) digest_algorithm=args.digest_algorithm)
if args.playback_port is not None: if args.playback_port is not None:
playback_index_db = playback.PlaybackIndexDb(args.playback_index_db_file) playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file)
playback_server_address=(args.address, int(args.playback_port)) playback_server_address=(args.address, int(args.playback_port))
playback_proxy = playback.PlaybackProxy(server_address=playback_server_address, playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address,
ca=ca, playback_index_db=playback_index_db, ca=ca, playback_index_db=playback_index_db,
warcs_dir=args.directory) warcs_dir=args.directory)
else: else:
playback_index_db = None playback_index_db = None
playback_proxy = None playback_proxy = None
warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q, warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory,
directory=args.directory, gzip=args.gzip, prefix=args.prefix, gzip=args.gzip, prefix=args.prefix, port=int(args.port),
port=int(args.port), rollover_size=int(args.size), rollover_size=int(args.size), base32=args.base32,
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None, dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
base32=args.base32, dedup_db=dedup_db,
digest_algorithm=args.digest_algorithm,
playback_index_db=playback_index_db) playback_index_db=playback_index_db)
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(
recorded_url_q=recorded_url_q, warc_writer=warc_writer,
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
warcprox = WarcproxController(proxy, warc_writer, playback_proxy) controller = WarcproxController(proxy, warc_writer_thread, playback_proxy)
warcprox.run_until_shutdown() controller.run_until_shutdown()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -17,22 +17,16 @@ import hanzo.httptools
from hanzo import warctools from hanzo import warctools
import warcprox import warcprox
class WarcWriter:
class WarcWriterThread(threading.Thread):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
# port is only used for warc filename # port is only used for warc filename
def __init__(self, recorded_url_q=None, directory='./warcs', def __init__(self, directory='./warcs', rollover_size=1000000000,
rollover_size=1000000000, rollover_idle_time=None, gzip=False, gzip=False, prefix='WARCPROX', port=0,
prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False, digest_algorithm='sha1', base32=False, dedup_db=None,
dedup_db=None, playback_index_db=None): playback_index_db=None):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
self.rollover_size = rollover_size self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self.gzip = gzip self.gzip = gzip
self.digest_algorithm = digest_algorithm self.digest_algorithm = digest_algorithm
@ -54,8 +48,6 @@ class WarcWriterThread(threading.Thread):
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory) os.mkdir(directory)
self.stop = threading.Event()
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
def build_warc_records(self, recorded_url): def build_warc_records(self, recorded_url):
@ -165,7 +157,7 @@ class WarcWriterThread(threading.Thread):
now = datetime.utcnow() now = datetime.utcnow()
return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000) return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
def _close_writer(self): def close_writer(self):
if self._fpath: if self._fpath:
self.logger.info('closing {0}'.format(self._f_finalname)) self.logger.info('closing {0}'.format(self._f_finalname))
self._f.close() self._f.close()
@ -204,7 +196,7 @@ class WarcWriterThread(threading.Thread):
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> --> # <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
def _writer(self): def _writer(self):
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
self._close_writer() self.close_writer()
if self._f == None: if self._f == None:
self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format( self._f_finalname = '{}-{}-{:05d}-{}-{}-{}.warc{}'.format(
@ -235,53 +227,69 @@ class WarcWriterThread(threading.Thread):
recorded_url.response_recorder.tempfile.close() recorded_url.response_recorder.tempfile.close()
def write_records(self, recorded_url):
recordset = self.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
for record in recordset:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format(
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, offset))
self._f.flush()
self._final_tasks(recorded_url, recordset, recordset_offset)
class WarcWriterThread(threading.Thread):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, recorded_url_q=None, warc_writer=None, rollover_idle_time=None):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
self.rollover_idle_time = rollover_idle_time
self.stop = threading.Event()
if warc_writer:
self.warc_writer = warc_writer
else:
self.warc_writer = WarcWriter()
def run(self): def run(self):
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.directory), self.gzip, self.rollover_size, os.path.abspath(self.warc_writer.directory), self.warc_writer.gzip, self.warc_writer.rollover_size,
self.rollover_idle_time, self.prefix, self.port)) self.rollover_idle_time, self.warc_writer.prefix, self.warc_writer.port))
self._last_sync = self._last_activity = time.time() self._last_sync = self._last_activity = time.time()
while not self.stop.is_set(): while not self.stop.is_set():
try: try:
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self.warc_writer.write_records(recorded_url)
self._last_activity = time.time() self._last_activity = time.time()
recordset = self.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
for record in recordset:
offset = writer.tell()
record.write_to(writer, gzip=self.gzip)
self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format(
record.get_header(warctools.WarcRecord.TYPE),
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
self._fpath, offset))
self._f.flush()
self._final_tasks(recorded_url, recordset, recordset_offset)
except queue.Empty: except queue.Empty:
if (self._fpath is not None if (self.warc_writer._fpath is not None
and self.rollover_idle_time is not None and self.rollover_idle_time is not None
and self.rollover_idle_time > 0 and self.rollover_idle_time > 0
and time.time() - self._last_activity > self.rollover_idle_time): and time.time() - self._last_activity > self.rollover_idle_time):
self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
self._close_writer() self.warc_writer.close_writer()
if time.time() - self._last_sync > 60: if time.time() - self._last_sync > 60:
if self.dedup_db: if self.warc_writer.dedup_db:
self.dedup_db.sync() self.warc_writer.dedup_db.sync()
if self.playback_index_db: if self.warc_writer.playback_index_db:
self.playback_index_db.sync() self.warc_writer.playback_index_db.sync()
self._last_sync = time.time() self._last_sync = time.time()
self.logger.info('WarcWriterThread shutting down') self.logger.info('WarcWriterThread shutting down')
self._close_writer(); self.warc_writer.close_writer();