mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
support for writing to different warcs based on Warcprox-Meta http request header warc-prefix setting
This commit is contained in:
parent
403404f590
commit
0647c0c76d
@ -69,8 +69,8 @@ class WarcproxController(object):
|
|||||||
self.proxy.shutdown()
|
self.proxy.shutdown()
|
||||||
self.proxy.server_close()
|
self.proxy.server_close()
|
||||||
|
|
||||||
if self.warc_writer_thread.warc_writer.dedup_db is not None:
|
if self.warc_writer_thread.default_warc_writer.dedup_db is not None:
|
||||||
self.warc_writer_thread.warc_writer.dedup_db.close()
|
self.warc_writer_thread.default_warc_writer.dedup_db.close()
|
||||||
|
|
||||||
if self.playback_proxy is not None:
|
if self.playback_proxy is not None:
|
||||||
self.playback_proxy.shutdown()
|
self.playback_proxy.shutdown()
|
||||||
|
@ -123,14 +123,14 @@ def main(argv=sys.argv):
|
|||||||
playback_index_db = None
|
playback_index_db = None
|
||||||
playback_proxy = None
|
playback_proxy = None
|
||||||
|
|
||||||
warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory,
|
default_warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory,
|
||||||
gzip=args.gzip, prefix=args.prefix, port=int(args.port),
|
gzip=args.gzip, prefix=args.prefix, port=int(args.port),
|
||||||
rollover_size=int(args.size), base32=args.base32,
|
rollover_size=int(args.size), base32=args.base32,
|
||||||
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
|
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
|
||||||
playback_index_db=playback_index_db)
|
playback_index_db=playback_index_db,
|
||||||
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(
|
|
||||||
recorded_url_q=recorded_url_q, warc_writer=warc_writer,
|
|
||||||
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
|
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
|
||||||
|
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
|
||||||
|
default_warc_writer=default_warc_writer)
|
||||||
|
|
||||||
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
||||||
controller.run_until_shutdown()
|
controller.run_until_shutdown()
|
||||||
|
@ -152,7 +152,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
self.log_date_time_string(), fmt % args))
|
self.log_date_time_string(), fmt % args))
|
||||||
|
|
||||||
def log_message(self, fmt, *args):
|
def log_message(self, fmt, *args):
|
||||||
self.logger.info("{} {} - - [{}] {}".format(self.__class__.__name__,
|
self.logger.debug("{} {} - - [{}] {}".format(self.__class__.__name__,
|
||||||
self.address_string(), self.log_date_time_string(), fmt % args))
|
self.address_string(), self.log_date_time_string(), fmt % args))
|
||||||
|
|
||||||
|
|
||||||
|
@ -210,7 +210,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
|
|
||||||
recorded_url = RecordedUrl(url=self.url, request_data=req,
|
recorded_url = RecordedUrl(url=self.url, request_data=req,
|
||||||
response_recorder=h.recorder, remote_ip=remote_ip,
|
response_recorder=h.recorder, remote_ip=remote_ip,
|
||||||
warcprox_meta=warcprox_meta)
|
warcprox_meta=warcprox_meta, method=self.command,
|
||||||
|
status=h.status, size=h.recorder.len)
|
||||||
self.server.recorded_url_q.put(recorded_url)
|
self.server.recorded_url_q.put(recorded_url)
|
||||||
|
|
||||||
return recorded_url
|
return recorded_url
|
||||||
@ -241,7 +242,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
|
|
||||||
class RecordedUrl(object):
|
class RecordedUrl(object):
|
||||||
def __init__(self, url, request_data, response_recorder, remote_ip,
|
def __init__(self, url, request_data, response_recorder, remote_ip,
|
||||||
warcprox_meta=None, content_type=None, custom_type=None):
|
warcprox_meta=None, content_type=None, custom_type=None,
|
||||||
|
method=None, status=None, size=None):
|
||||||
# XXX should test what happens with non-ascii url (when does
|
# XXX should test what happens with non-ascii url (when does
|
||||||
# url-encoding happen?)
|
# url-encoding happen?)
|
||||||
if type(url) is not bytes:
|
if type(url) is not bytes:
|
||||||
@ -265,6 +267,10 @@ class RecordedUrl(object):
|
|||||||
self.content_type = content_type
|
self.content_type = content_type
|
||||||
self.custom_type = custom_type
|
self.custom_type = custom_type
|
||||||
|
|
||||||
|
self.method = method
|
||||||
|
self.status = status
|
||||||
|
self.size = size
|
||||||
|
|
||||||
|
|
||||||
class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
|
class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
|
||||||
logger = logging.getLogger("warcprox.warcprox.WarcProxy")
|
logger = logging.getLogger("warcprox.warcprox.WarcProxy")
|
||||||
|
@ -26,9 +26,11 @@ class WarcWriter:
|
|||||||
def __init__(self, directory='./warcs', rollover_size=1000000000,
|
def __init__(self, directory='./warcs', rollover_size=1000000000,
|
||||||
gzip=False, prefix='WARCPROX', port=0,
|
gzip=False, prefix='WARCPROX', port=0,
|
||||||
digest_algorithm='sha1', base32=False, dedup_db=None,
|
digest_algorithm='sha1', base32=False, dedup_db=None,
|
||||||
playback_index_db=None):
|
playback_index_db=None, rollover_idle_time=None):
|
||||||
|
|
||||||
self.rollover_size = rollover_size
|
self.rollover_size = rollover_size
|
||||||
|
self.rollover_idle_time = rollover_idle_time
|
||||||
|
self._last_activity = time.time()
|
||||||
|
|
||||||
self.gzip = gzip
|
self.gzip = gzip
|
||||||
self.digest_algorithm = digest_algorithm
|
self.digest_algorithm = digest_algorithm
|
||||||
@ -50,7 +52,6 @@ class WarcWriter:
|
|||||||
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
|
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
|
||||||
os.mkdir(directory)
|
os.mkdir(directory)
|
||||||
|
|
||||||
|
|
||||||
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
|
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
|
||||||
def build_warc_records(self, recorded_url):
|
def build_warc_records(self, recorded_url):
|
||||||
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
|
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
|
||||||
@ -107,11 +108,9 @@ class WarcWriter:
|
|||||||
|
|
||||||
return principal_record, request_record
|
return principal_record, request_record
|
||||||
|
|
||||||
|
|
||||||
def digest_str(self, hash_obj):
|
def digest_str(self, hash_obj):
|
||||||
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii'))
|
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii'))
|
||||||
|
|
||||||
|
|
||||||
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
|
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
|
||||||
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
|
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
|
||||||
profile=None, refers_to=None, refers_to_target_uri=None,
|
profile=None, refers_to=None, refers_to_target_uri=None,
|
||||||
@ -167,7 +166,6 @@ class WarcWriter:
|
|||||||
|
|
||||||
return record
|
return record
|
||||||
|
|
||||||
|
|
||||||
def timestamp17(self):
|
def timestamp17(self):
|
||||||
now = datetime.utcnow()
|
now = datetime.utcnow()
|
||||||
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
|
return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000)
|
||||||
@ -207,7 +205,6 @@ class WarcWriter:
|
|||||||
|
|
||||||
return record
|
return record
|
||||||
|
|
||||||
|
|
||||||
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
|
# <!-- <property name="template" value="${prefix}-${timestamp17}-${serialno}-${heritrix.pid}~${heritrix.hostname}~${heritrix.port}" /> -->
|
||||||
def _writer(self):
|
def _writer(self):
|
||||||
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
|
if self._fpath and os.path.getsize(self._fpath) > self.rollover_size:
|
||||||
@ -229,7 +226,6 @@ class WarcWriter:
|
|||||||
|
|
||||||
return self._f
|
return self._f
|
||||||
|
|
||||||
|
|
||||||
def _final_tasks(self, recorded_url, recordset, recordset_offset):
|
def _final_tasks(self, recorded_url, recordset, recordset_offset):
|
||||||
if (self.dedup_db is not None
|
if (self.dedup_db is not None
|
||||||
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
|
||||||
@ -243,6 +239,15 @@ class WarcWriter:
|
|||||||
if recorded_url.response_recorder is not None:
|
if recorded_url.response_recorder is not None:
|
||||||
recorded_url.response_recorder.tempfile.close()
|
recorded_url.response_recorder.tempfile.close()
|
||||||
|
|
||||||
|
self._last_activity = time.time()
|
||||||
|
|
||||||
|
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
|
||||||
|
self.logger.info("{} {} {} size={} {} {} offset={}".format(
|
||||||
|
recorded_url.status, recorded_url.method,
|
||||||
|
recorded_url.url.decode('utf-8'), recorded_url.size,
|
||||||
|
recordset[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8"),
|
||||||
|
self._f_finalname, recordset_offset))
|
||||||
|
|
||||||
def write_records(self, recorded_url):
|
def write_records(self, recorded_url):
|
||||||
recordset = self.build_warc_records(recorded_url)
|
recordset = self.build_warc_records(recorded_url)
|
||||||
|
|
||||||
@ -262,50 +267,74 @@ class WarcWriter:
|
|||||||
|
|
||||||
self._final_tasks(recorded_url, recordset, recordset_offset)
|
self._final_tasks(recorded_url, recordset, recordset_offset)
|
||||||
|
|
||||||
|
def maybe_idle_rollover(self):
|
||||||
|
if (self._fpath is not None
|
||||||
|
and self.rollover_idle_time is not None
|
||||||
|
and self.rollover_idle_time > 0
|
||||||
|
and time.time() - self._last_activity > self.rollover_idle_time):
|
||||||
|
self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity))
|
||||||
|
self.close_writer()
|
||||||
|
|
||||||
class WarcWriterThread(threading.Thread):
|
class WarcWriterThread(threading.Thread):
|
||||||
logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread")
|
logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread")
|
||||||
|
|
||||||
def __init__(self, recorded_url_q=None, warc_writer=None, rollover_idle_time=None):
|
def __init__(self, recorded_url_q=None, default_warc_writer=None):
|
||||||
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
|
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
|
||||||
threading.Thread.__init__(self, name='WarcWriterThread')
|
threading.Thread.__init__(self, name='WarcWriterThread')
|
||||||
self.recorded_url_q = recorded_url_q
|
self.recorded_url_q = recorded_url_q
|
||||||
self.rollover_idle_time = rollover_idle_time
|
|
||||||
self.stop = threading.Event()
|
self.stop = threading.Event()
|
||||||
if warc_writer:
|
if default_warc_writer:
|
||||||
self.warc_writer = warc_writer
|
self.default_warc_writer = default_warc_writer
|
||||||
else:
|
else:
|
||||||
self.warc_writer = WarcWriter()
|
self.default_warc_writer = WarcWriter()
|
||||||
|
self.warc_writers = {} # {prefix:WarcWriter}
|
||||||
|
|
||||||
|
def write_records(self, recorded_url):
|
||||||
|
w = self.default_warc_writer
|
||||||
|
if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta:
|
||||||
|
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
|
||||||
|
prefix = recorded_url.warcprox_meta["warc-prefix"]
|
||||||
|
if not prefix in self.warc_writers:
|
||||||
|
self.warc_writers[prefix] = WarcWriter(prefix=prefix,
|
||||||
|
directory=self.default_warc_writer.directory,
|
||||||
|
rollover_size=self.default_warc_writer.rollover_size,
|
||||||
|
rollover_idle_time=self.default_warc_writer.rollover_idle_time,
|
||||||
|
gzip=self.default_warc_writer.gzip,
|
||||||
|
port=self.default_warc_writer.port,
|
||||||
|
digest_algorithm=self.default_warc_writer.digest_algorithm,
|
||||||
|
base32=self.default_warc_writer.base32,
|
||||||
|
dedup_db=self.default_warc_writer.dedup_db,
|
||||||
|
playback_index_db=self.default_warc_writer.playback_index_db)
|
||||||
|
w = self.warc_writers[prefix]
|
||||||
|
w.write_records(recorded_url)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
|
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
|
||||||
os.path.abspath(self.warc_writer.directory), self.warc_writer.gzip, self.warc_writer.rollover_size,
|
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
|
||||||
self.rollover_idle_time, self.warc_writer.prefix, self.warc_writer.port))
|
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
|
||||||
|
|
||||||
self._last_sync = self._last_activity = time.time()
|
self._last_sync = time.time()
|
||||||
|
|
||||||
while not self.stop.is_set():
|
while not self.stop.is_set():
|
||||||
try:
|
try:
|
||||||
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
|
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
|
||||||
self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
|
self.write_records(recorded_url)
|
||||||
self.warc_writer.write_records(recorded_url)
|
|
||||||
self._last_activity = time.time()
|
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
if (self.warc_writer._fpath is not None
|
self.default_warc_writer.maybe_idle_rollover()
|
||||||
and self.rollover_idle_time is not None
|
for w in self.warc_writers.values():
|
||||||
and self.rollover_idle_time > 0
|
w.maybe_idle_rollover()
|
||||||
and time.time() - self._last_activity > self.rollover_idle_time):
|
|
||||||
self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity))
|
|
||||||
self.warc_writer.close_writer()
|
|
||||||
|
|
||||||
|
# XXX prob doesn't belong here (do we need it at all?)
|
||||||
if time.time() - self._last_sync > 60:
|
if time.time() - self._last_sync > 60:
|
||||||
if self.warc_writer.dedup_db:
|
if self.default_warc_writer.dedup_db:
|
||||||
self.warc_writer.dedup_db.sync()
|
self.default_warc_writer.dedup_db.sync()
|
||||||
if self.warc_writer.playback_index_db:
|
if self.default_warc_writer.playback_index_db:
|
||||||
self.warc_writer.playback_index_db.sync()
|
self.default_warc_writer.playback_index_db.sync()
|
||||||
self._last_sync = time.time()
|
self._last_sync = time.time()
|
||||||
|
|
||||||
self.logger.info('WarcWriterThread shutting down')
|
self.logger.info('WarcWriterThread shutting down')
|
||||||
self.warc_writer.close_writer();
|
self.default_warc_writer.close_writer()
|
||||||
|
for w in self.warc_writers.values():
|
||||||
|
w.close_writer()
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user