mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
factor out warc record building into its own class
This commit is contained in:
parent
89fab33295
commit
10c724637f
@ -19,38 +19,13 @@ import hanzo.httptools
|
|||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
import warcprox
|
||||||
|
|
||||||
class WarcWriter:
|
class WarcRecordBuilder:
|
||||||
logger = logging.getLogger("warcprox.warcwriter.WarcWriter")
|
logger = logging.getLogger("warcprox.warcwriter.WarcRecordBuilder")
|
||||||
|
|
||||||
# port is only used for warc filename
|
def __init__(self, dedup_db=None, digest_algorithm="sha1", base32=False):
|
||||||
def __init__(self, directory='./warcs', rollover_size=1000000000,
|
self.dedup_db = dedup_db
|
||||||
gzip=False, prefix='WARCPROX', port=0,
|
|
||||||
digest_algorithm='sha1', base32=False, dedup_db=None,
|
|
||||||
playback_index_db=None, rollover_idle_time=None):
|
|
||||||
|
|
||||||
self.rollover_size = rollover_size
|
|
||||||
self.rollover_idle_time = rollover_idle_time
|
|
||||||
self._last_activity = time.time()
|
|
||||||
|
|
||||||
self.gzip = gzip
|
|
||||||
self.digest_algorithm = digest_algorithm
|
self.digest_algorithm = digest_algorithm
|
||||||
self.base32 = base32
|
self.base32 = base32
|
||||||
self.dedup_db = dedup_db
|
|
||||||
|
|
||||||
self.playback_index_db = playback_index_db
|
|
||||||
|
|
||||||
# warc path and filename stuff
|
|
||||||
self.directory = directory
|
|
||||||
self.prefix = prefix
|
|
||||||
self.port = port
|
|
||||||
|
|
||||||
self._f = None
|
|
||||||
self._fpath = None
|
|
||||||
self._serial = 0
|
|
||||||
|
|
||||||
if not os.path.exists(directory):
|
|
||||||
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
|
|
||||||
os.mkdir(directory)
|
|
||||||
|
|
||||||
def _build_response_principal_record(self, recorded_url, warc_date):
|
def _build_response_principal_record(self, recorded_url, warc_date):
|
||||||
"""Builds response or revisit record, whichever is appropriate."""
|
"""Builds response or revisit record, whichever is appropriate."""
|
||||||
@ -163,21 +138,7 @@ class WarcWriter:
|
|||||||
|
|
||||||
return record
|
return record
|
||||||
|
|
||||||
def timestamp17(self):
|
def build_warcinfo_record(self, filename):
|
||||||
now = datetime.utcnow()
|
|
||||||
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
|
|
||||||
|
|
||||||
def close_writer(self):
|
|
||||||
if self._fpath:
|
|
||||||
self.logger.info('closing {0}'.format(self._f_finalname))
|
|
||||||
self._f.close()
|
|
||||||
finalpath = os.path.sep.join([self.directory, self._f_finalname])
|
|
||||||
os.rename(self._fpath, finalpath)
|
|
||||||
|
|
||||||
self._fpath = None
|
|
||||||
self._f = None
|
|
||||||
|
|
||||||
def _build_warcinfo_record(self, filename):
|
|
||||||
warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow())
|
warc_record_date = warctools.warc.warc_datetime_str(datetime.utcnow())
|
||||||
record_id = warctools.WarcRecord.random_warc_uuid()
|
record_id = warctools.WarcRecord.random_warc_uuid()
|
||||||
|
|
||||||
@ -202,6 +163,52 @@ class WarcWriter:
|
|||||||
|
|
||||||
return record
|
return record
|
||||||
|
|
||||||
|
class WarcWriter:
|
||||||
|
logger = logging.getLogger("warcprox.warcwriter.WarcWriter")
|
||||||
|
|
||||||
|
# port is only used for warc filename
|
||||||
|
def __init__(self, directory='./warcs', rollover_size=1000000000,
|
||||||
|
gzip=False, prefix='WARCPROX', port=0,
|
||||||
|
digest_algorithm='sha1', base32=False, dedup_db=None,
|
||||||
|
playback_index_db=None, rollover_idle_time=None):
|
||||||
|
|
||||||
|
self.rollover_size = rollover_size
|
||||||
|
self.rollover_idle_time = rollover_idle_time
|
||||||
|
self._last_activity = time.time()
|
||||||
|
|
||||||
|
self.gzip = gzip
|
||||||
|
self.record_builder = WarcRecordBuilder(dedup_db=dedup_db, digest_algorithm=digest_algorithm, base32=base32)
|
||||||
|
self.dedup_db = dedup_db
|
||||||
|
|
||||||
|
self.playback_index_db = playback_index_db
|
||||||
|
|
||||||
|
# warc path and filename stuff
|
||||||
|
self.directory = directory
|
||||||
|
self.prefix = prefix
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
self._f = None
|
||||||
|
self._fpath = None
|
||||||
|
self._serial = 0
|
||||||
|
|
||||||
|
if not os.path.exists(directory):
|
||||||
|
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
|
||||||
|
os.mkdir(directory)
|
||||||
|
|
||||||
|
def timestamp17(self):
|
||||||
|
now = datetime.utcnow()
|
||||||
|
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
|
||||||
|
|
||||||
|
def close_writer(self):
|
||||||
|
if self._fpath:
|
||||||
|
self.logger.info('closing {0}'.format(self._f_finalname))
|
||||||
|
self._f.close()
|
||||||
|
finalpath = os.path.sep.join([self.directory, self._f_finalname])
|
||||||
|
os.rename(self._fpath, finalpath)
|
||||||
|
|
||||||
|
self._fpath = None
|
||||||
|
self._f = None
|
||||||
|
|
||||||
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
|
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
|
||||||
def _writer(self):
|
def _writer(self):
|
||||||
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
|
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
|
||||||
@ -215,7 +222,7 @@ class WarcWriter:
|
|||||||
|
|
||||||
self._f = open(self._fpath, 'wb')
|
self._f = open(self._fpath, 'wb')
|
||||||
|
|
||||||
warcinfo_record = self._build_warcinfo_record(self._f_finalname)
|
warcinfo_record = self.record_builder.build_warcinfo_record(self._f_finalname)
|
||||||
self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers))
|
self.logger.debug('warcinfo_record.headers={}'.format(warcinfo_record.headers))
|
||||||
warcinfo_record.write_to(self._f, gzip=self.gzip)
|
warcinfo_record.write_to(self._f, gzip=self.gzip)
|
||||||
|
|
||||||
@ -233,7 +240,7 @@ class WarcWriter:
|
|||||||
if (self.dedup_db is not None
|
if (self.dedup_db is not None
|
||||||
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
||||||
and recorded_url.response_recorder.payload_size() > 0):
|
and recorded_url.response_recorder.payload_size() > 0):
|
||||||
key = self.digest_str(recorded_url.response_recorder.payload_digest)
|
key = self.record_builder.digest_str(recorded_url.response_recorder.payload_digest)
|
||||||
self.dedup_db.save(key, recordset[0], recordset_offset)
|
self.dedup_db.save(key, recordset[0], recordset_offset)
|
||||||
|
|
||||||
if self.playback_index_db is not None:
|
if self.playback_index_db is not None:
|
||||||
@ -267,7 +274,7 @@ class WarcWriter:
|
|||||||
recordset_offset))
|
recordset_offset))
|
||||||
|
|
||||||
def write_records(self, recorded_url):
|
def write_records(self, recorded_url):
|
||||||
recordset = self.build_warc_records(recorded_url)
|
recordset = self.record_builder.build_warc_records(recorded_url)
|
||||||
|
|
||||||
writer = self._writer()
|
writer = self._writer()
|
||||||
recordset_offset = writer.tell()
|
recordset_offset = writer.tell()
|
||||||
@ -294,7 +301,7 @@ class WarcWriter:
|
|||||||
self.close_writer()
|
self.close_writer()
|
||||||
|
|
||||||
class WarcWriterPool:
|
class WarcWriterPool:
|
||||||
logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread")
|
logger = logging.getLogger("warcprox.warcwriter.WarcWriterPool")
|
||||||
|
|
||||||
def __init__(self, default_warc_writer):
|
def __init__(self, default_warc_writer):
|
||||||
if default_warc_writer:
|
if default_warc_writer:
|
||||||
@ -321,8 +328,8 @@ class WarcWriterPool:
|
|||||||
rollover_idle_time=self.default_warc_writer.rollover_idle_time,
|
rollover_idle_time=self.default_warc_writer.rollover_idle_time,
|
||||||
gzip=self.default_warc_writer.gzip,
|
gzip=self.default_warc_writer.gzip,
|
||||||
port=self.default_warc_writer.port,
|
port=self.default_warc_writer.port,
|
||||||
digest_algorithm=self.default_warc_writer.digest_algorithm,
|
digest_algorithm=self.default_warc_writer.record_builder.digest_algorithm,
|
||||||
base32=self.default_warc_writer.base32,
|
base32=self.default_warc_writer.record_builder.base32,
|
||||||
dedup_db=self.default_warc_writer.dedup_db,
|
dedup_db=self.default_warc_writer.dedup_db,
|
||||||
playback_index_db=self.default_warc_writer.playback_index_db)
|
playback_index_db=self.default_warc_writer.playback_index_db)
|
||||||
w = self.warc_writers[prefix]
|
w = self.warc_writers[prefix]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user