From 10c724637f34eb8e7002c6a6b24685f0e66ecf7c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 29 Jul 2015 21:23:46 +0000 Subject: [PATCH] factor out warc record building into its own class --- warcprox/warcwriter.py | 107 ++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/warcprox/warcwriter.py b/warcprox/warcwriter.py index ac69cb9..f219abf 100644 --- a/warcprox/warcwriter.py +++ b/warcprox/warcwriter.py @@ -19,38 +19,13 @@ import hanzo.httptools from hanzo import warctools import warcprox -class WarcWriter: - logger = logging.getLogger("warcprox.warcwriter.WarcWriter") +class WarcRecordBuilder: + logger = logging.getLogger("warcprox.warcwriter.WarcRecordBuilder") - # port is only used for warc filename - def __init__(self, directory='./warcs', rollover_size=1000000000, - gzip=False, prefix='WARCPROX', port=0, - digest_algorithm='sha1', base32=False, dedup_db=None, - playback_index_db=None, rollover_idle_time=None): - - self.rollover_size = rollover_size - self.rollover_idle_time = rollover_idle_time - self._last_activity = time.time() - - self.gzip = gzip + def __init__(self, dedup_db=None, digest_algorithm="sha1", base32=False): + self.dedup_db = dedup_db self.digest_algorithm = digest_algorithm self.base32 = base32 - self.dedup_db = dedup_db - - self.playback_index_db = playback_index_db - - # warc path and filename stuff - self.directory = directory - self.prefix = prefix - self.port = port - - self._f = None - self._fpath = None - self._serial = 0 - - if not os.path.exists(directory): - self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) - os.mkdir(directory) def _build_response_principal_record(self, recorded_url, warc_date): """Builds response or revisit record, whichever is appropriate.""" @@ -163,21 +138,7 @@ class WarcWriter: return record - def timestamp17(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) - - def close_writer(self): - if self._fpath: - self.logger.info('closing {0}'.format(self._f_finalname)) - self._f.close() - finalpath = os.path.sep.join([self.directory, self._f_finalname]) - os.rename(self._fpath, finalpath) - - self._fpath = None - self._f = None - - def _build_warcinfo_record(self, filename): + def build_warcinfo_record(self, filename): warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow()) record_id = warctools.WarcRecord.random_warc_uuid() @@ -202,6 +163,52 @@ class WarcWriter: return record +class WarcWriter: + logger = logging.getLogger("warcprox.warcwriter.WarcWriter") + + # port is only used for warc filename + def __init__(self, directory='./warcs', rollover_size=1000000000, + gzip=False, prefix='WARCPROX', port=0, + digest_algorithm='sha1', base32=False, dedup_db=None, + playback_index_db=None, rollover_idle_time=None): + + self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + self._last_activity = time.time() + + self.gzip = gzip + self.record_builder = WarcRecordBuilder(dedup_db=dedup_db, digest_algorithm=digest_algorithm, base32=base32) + self.dedup_db = dedup_db + + self.playback_index_db = playback_index_db + + # warc path and filename stuff + self.directory = directory + self.prefix = prefix + self.port = port + + self._f = None + self._fpath = None + self._serial = 0 + + if not os.path.exists(directory): + self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) + os.mkdir(directory) + + def timestamp17(self): + now = datetime.utcnow() + return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) + + def close_writer(self): + if self._fpath: + self.logger.info('closing {0}'.format(self._f_finalname)) + self._f.close() + finalpath = os.path.sep.join([self.directory, self._f_finalname]) + os.rename(self._fpath, finalpath) + + self._fpath = None + self._f = None + # def _writer(self): if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: @@ -215,7 +222,7 @@ class WarcWriter: self._f = open(self._fpath, 'wb') - warcinfo_record = self._build_warcinfo_record(self._f_finalname) + warcinfo_record = self.record_builder.build_warcinfo_record(self._f_finalname) self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers)) warcinfo_record.write_to(self._f, gzip=self.gzip) @@ -233,7 +240,7 @@ class WarcWriter: if (self.dedup_db is not None and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): - key = self.digest_str(recorded_url.response_recorder.payload_digest) + key = self.record_builder.digest_str(recorded_url.response_recorder.payload_digest) self.dedup_db.save(key, recordset[0], recordset_offset) if self.playback_index_db is not None: @@ -267,7 +274,7 @@ class WarcWriter: recordset_offset)) def write_records(self, recorded_url): - recordset = self.build_warc_records(recorded_url) + recordset = self.record_builder.build_warc_records(recorded_url) writer = self._writer() recordset_offset = writer.tell() @@ -294,7 +301,7 @@ class WarcWriter: self.close_writer() class WarcWriterPool: - logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread") + logger = logging.getLogger("warcprox.warcwriter.WarcWriterPool") def __init__(self, default_warc_writer): if default_warc_writer: @@ -321,8 +328,8 @@ class WarcWriterPool: rollover_idle_time=self.default_warc_writer.rollover_idle_time, gzip=self.default_warc_writer.gzip, port=self.default_warc_writer.port, - digest_algorithm=self.default_warc_writer.digest_algorithm, - base32=self.default_warc_writer.base32, + digest_algorithm=self.default_warc_writer.record_builder.digest_algorithm, + base32=self.default_warc_writer.record_builder.base32, dedup_db=self.default_warc_writer.dedup_db, playback_index_db=self.default_warc_writer.playback_index_db) w = self.warc_writers[prefix]