diff --git a/warcprox/controller.py b/warcprox/controller.py index 185ce9f..26e88fc 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -69,8 +69,8 @@ class WarcproxController(object): self.proxy.shutdown() self.proxy.server_close() - if self.warc_writer_thread.warc_writer.dedup_db is not None: - self.warc_writer_thread.warc_writer.dedup_db.close() + if self.warc_writer_thread.default_warc_writer.dedup_db is not None: + self.warc_writer_thread.default_warc_writer.dedup_db.close() if self.playback_proxy is not None: self.playback_proxy.shutdown() diff --git a/warcprox/main.py b/warcprox/main.py index 04156d3..147a030 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -123,14 +123,14 @@ def main(argv=sys.argv): playback_index_db = None playback_proxy = None - warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory, + default_warc_writer = warcprox.warcwriter.WarcWriter(directory=args.directory, gzip=args.gzip, prefix=args.prefix, port=int(args.port), rollover_size=int(args.size), base32=args.base32, dedup_db=dedup_db, digest_algorithm=args.digest_algorithm, - playback_index_db=playback_index_db) - warc_writer_thread = warcprox.warcwriter.WarcWriterThread( - recorded_url_q=recorded_url_q, warc_writer=warc_writer, + playback_index_db=playback_index_db, rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) + warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, + default_warc_writer=default_warc_writer) controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) controller.run_until_shutdown() diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index cbd3992..24758f0 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -152,7 +152,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.log_date_time_string(), fmt % args)) def log_message(self, fmt, *args): - self.logger.info("{} {} - - [{}] {}".format(self.__class__.__name__, + self.logger.debug("{} {} - - [{}] {}".format(self.__class__.__name__, self.address_string(), self.log_date_time_string(), fmt % args)) diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 10e5b12..68184b2 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -210,7 +210,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): recorded_url = RecordedUrl(url=self.url, request_data=req, response_recorder=h.recorder, remote_ip=remote_ip, - warcprox_meta=warcprox_meta) + warcprox_meta=warcprox_meta, method=self.command, + status=h.status, size=h.recorder.len) self.server.recorded_url_q.put(recorded_url) return recorded_url @@ -241,7 +242,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): class RecordedUrl(object): def __init__(self, url, request_data, response_recorder, remote_ip, - warcprox_meta=None, content_type=None, custom_type=None): + warcprox_meta=None, content_type=None, custom_type=None, + method=None, status=None, size=None): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -265,6 +267,10 @@ class RecordedUrl(object): self.content_type = content_type self.custom_type = custom_type + self.method = method + self.status = status + self.size = size + class WarcProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): logger = logging.getLogger("warcprox.warcprox.WarcProxy") diff --git a/warcprox/warcwriter.py b/warcprox/warcwriter.py index d92f98a..4736a86 100644 --- a/warcprox/warcwriter.py +++ b/warcprox/warcwriter.py @@ -26,9 +26,11 @@ class WarcWriter: def __init__(self, directory='./warcs', rollover_size=1000000000, gzip=False, prefix='WARCPROX', port=0, digest_algorithm='sha1', base32=False, dedup_db=None, - playback_index_db=None): + playback_index_db=None, rollover_idle_time=None): self.rollover_size = rollover_size + self.rollover_idle_time = rollover_idle_time + self._last_activity = time.time() self.gzip = gzip self.digest_algorithm = digest_algorithm @@ -50,7 +52,6 @@ class WarcWriter: self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) os.mkdir(directory) - # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record def build_warc_records(self, recorded_url): warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) @@ -107,11 +108,9 @@ class WarcWriter: return principal_record, request_record - def digest_str(self, hash_obj): return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii')) - def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, profile=None, refers_to=None, refers_to_target_uri=None, @@ -167,7 +166,6 @@ class WarcWriter: return record - def timestamp17(self): now = datetime.utcnow() return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) @@ -207,7 +205,6 @@ class WarcWriter: return record - # def _writer(self): if self._fpath and os.path.getsize(self._fpath) > self.rollover_size: @@ -229,7 +226,6 @@ class WarcWriter: return self._f - def _final_tasks(self, recorded_url, recordset, recordset_offset): if (self.dedup_db is not None and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE @@ -243,6 +239,15 @@ class WarcWriter: if recorded_url.response_recorder is not None: recorded_url.response_recorder.tempfile.close() + self._last_activity = time.time() + + # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} + self.logger.info("{} {} {} size={} {} {} offset={}".format( + recorded_url.status, recorded_url.method, + recorded_url.url.decode('utf-8'), recorded_url.size, + recordset[0].get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode("utf-8"), + self._f_finalname, recordset_offset)) + def write_records(self, recorded_url): recordset = self.build_warc_records(recorded_url) @@ -262,50 +267,74 @@ class WarcWriter: self._final_tasks(recorded_url, recordset, recordset_offset) + def maybe_idle_rollover(self): + if (self._fpath is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity)) + self.close_writer() class WarcWriterThread(threading.Thread): logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread") - def __init__(self, recorded_url_q=None, warc_writer=None, rollover_idle_time=None): + def __init__(self, recorded_url_q=None, default_warc_writer=None): """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" threading.Thread.__init__(self, name='WarcWriterThread') self.recorded_url_q = recorded_url_q - self.rollover_idle_time = rollover_idle_time self.stop = threading.Event() - if warc_writer: - self.warc_writer = warc_writer + if default_warc_writer: + self.default_warc_writer = default_warc_writer else: - self.warc_writer = WarcWriter() + self.default_warc_writer = WarcWriter() + self.warc_writers = {} # {prefix:WarcWriter} + + def write_records(self, recorded_url): + w = self.default_warc_writer + if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta: + # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) + prefix = recorded_url.warcprox_meta["warc-prefix"] + if not prefix in self.warc_writers: + self.warc_writers[prefix] = WarcWriter(prefix=prefix, + directory=self.default_warc_writer.directory, + rollover_size=self.default_warc_writer.rollover_size, + rollover_idle_time=self.default_warc_writer.rollover_idle_time, + gzip=self.default_warc_writer.gzip, + port=self.default_warc_writer.port, + digest_algorithm=self.default_warc_writer.digest_algorithm, + base32=self.default_warc_writer.base32, + dedup_db=self.default_warc_writer.dedup_db, + playback_index_db=self.default_warc_writer.playback_index_db) + w = self.warc_writers[prefix] + w.write_records(recorded_url) def run(self): self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.warc_writer.directory), self.warc_writer.gzip, self.warc_writer.rollover_size, - self.rollover_idle_time, self.warc_writer.prefix, self.warc_writer.port)) + os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, + self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) - self._last_sync = self._last_activity = time.time() + self._last_sync = time.time() while not self.stop.is_set(): try: recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) - self.warc_writer.write_records(recorded_url) - self._last_activity = time.time() + self.write_records(recorded_url) except queue.Empty: - if (self.warc_writer._fpath is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.debug('rolling over warc file after {} seconds idle'.format(time.time() - self._last_activity)) - self.warc_writer.close_writer() + self.default_warc_writer.maybe_idle_rollover() + for w in self.warc_writers.values(): + w.maybe_idle_rollover() + # XXX prob doesn't belong here (do we need it at all?) if time.time() - self._last_sync > 60: - if self.warc_writer.dedup_db: - self.warc_writer.dedup_db.sync() - if self.warc_writer.playback_index_db: - self.warc_writer.playback_index_db.sync() + if self.default_warc_writer.dedup_db: + self.default_warc_writer.dedup_db.sync() + if self.default_warc_writer.playback_index_db: + self.default_warc_writer.playback_index_db.sync() self._last_sync = time.time() self.logger.info('WarcWriterThread shutting down') - self.warc_writer.close_writer(); + self.default_warc_writer.close_writer() + for w in self.warc_writers.values(): + w.close_writer()