diff --git a/warcprox/controller.py b/warcprox/controller.py index 89d420d..db76135 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -61,8 +61,8 @@ class WarcproxController(object): self.proxy.shutdown() self.proxy.server_close() - if self.warc_writer_thread.default_warc_writer.dedup_db is not None: - self.warc_writer_thread.default_warc_writer.dedup_db.close() + if self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db is not None: + self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.close() if self.playback_proxy is not None: self.playback_proxy.shutdown() diff --git a/warcprox/main.py b/warcprox/main.py index 0ab6885..7a9f0b3 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -145,8 +145,9 @@ def main(argv=sys.argv): dedup_db=dedup_db, digest_algorithm=args.digest_algorithm, playback_index_db=playback_index_db, rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) + writer_pool=warcprox.warcwriter.WarcWriterPool(default_warc_writer) warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q, - default_warc_writer=default_warc_writer) + writer_pool=writer_pool) controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index edc9657..9d57b44 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -83,7 +83,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._transition_to_ssl() except Exception as e: try: - self.logger.error("problem with connect line {}: {}".format(repr(self.requestline), e), exc_info=True) + self.logger.error("problem handling {}: {}".format(repr(self.requestline), e), exc_info=True) if type(e) is socket.timeout: self.send_error(504, str(e)) else: @@ -117,13 +117,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): def do_COMMAND(self): if not self.is_connect: - if self.command == 'PUTMETA': - self._special_request(method=self.command, type_='metadata') - return - # if self.command == 'PUTRES': - # self._special_request(type_='resource') - # return - try: # Connect to destination self._determine_host_port() diff --git a/warcprox/warcprox.py b/warcprox/warcprox.py index 930a290..c5d6b4e 100644 --- a/warcprox/warcprox.py +++ b/warcprox/warcprox.py @@ -36,6 +36,7 @@ import traceback import hashlib import json import socket +from hanzo import warctools from certauth.certauth import CertificateAuthority import warcprox.mitmproxy @@ -179,7 +180,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): if 'Content-Length' in self.headers: req += self.rfile.read(int(self.headers['Content-Length'])) - self.logger.debug('req={}'.format(repr(req))) + self.logger.debug('sending to remote server req={}'.format(repr(req))) # Send it down the pipe! self._proxy_sock.sendall(req) @@ -222,12 +223,17 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): return recorded_url - def _special_request(self, method, type_): + # deprecated + def do_PUTMETA(self): + self.do_WARCPROX_WRITE_RECORD(warc_type=warctools.WarcRecord.METADATA) + + def do_WARCPROX_WRITE_RECORD(self, warc_type=None): try: self.url = self.path - if (method == 'PUTMETA' and 'Content-Length' in self.headers - and 'Content-Type' in self.headers): + if ('Content-Length' in self.headers and 'Content-Type' in self.headers + and (warc_type or 'WARC-Type' in self.headers)): + # stream this? request_data = self.rfile.read(int(self.headers['Content-Length'])) warcprox_meta = self.headers.get('Warcprox-Meta') @@ -238,10 +244,10 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): remote_ip=b'', warcprox_meta=warcprox_meta, content_type=self.headers['Content-Type'].encode('latin1'), - custom_type=type_, + custom_type=warc_type or self.headers['WARC-Type'], status=204, size=len(request_data), client_ip=self.client_address[0], - method=method) + method=self.command) self.server.recorded_url_q.put(rec_custom) self.send_response(204, 'OK') @@ -250,7 +256,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): self.end_headers() except: - self.logger.error("uncaught except in _special_request", exc_info=True) + self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True) raise def log_error(self, fmt, *args): diff --git a/warcprox/warcwriter.py b/warcprox/warcwriter.py index 0d57bda..ac69cb9 100644 --- a/warcprox/warcwriter.py +++ b/warcprox/warcwriter.py @@ -52,21 +52,8 @@ class WarcWriter: self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) os.mkdir(directory) - # returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record - def build_warc_records(self, recorded_url): - warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) - - dedup_info = None - - # metadata special case - if recorded_url.custom_type == 'metadata': - metadata_rec = self.build_warc_record(url=recorded_url.url, - warc_date=warc_date, - data=recorded_url.request_data, - warc_type=warctools.WarcRecord.METADATA, - content_type=recorded_url.content_type) - return [metadata_rec] - + def _build_response_principal_record(self, recorded_url, warc_date): + """Builds response or revisit record, whichever is appropriate.""" if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None: key = self.digest_str(recorded_url.response_recorder.payload_digest) dedup_info = self.dedup_db.lookup(key) @@ -79,7 +66,7 @@ class WarcWriter: else: response_header_block = recorded_url.response_recorder.tempfile.read() - principal_record = self.build_warc_record( + return self.build_warc_record( url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, @@ -92,21 +79,31 @@ class WarcWriter: remote_ip=recorded_url.remote_ip) else: # response record - principal_record = self.build_warc_record( + return self.build_warc_record( url=recorded_url.url, warc_date=warc_date, recorder=recorded_url.response_recorder, warc_type=warctools.WarcRecord.RESPONSE, content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, remote_ip=recorded_url.remote_ip) - request_record = self.build_warc_record( - url=recorded_url.url, warc_date=warc_date, - data=recorded_url.request_data, - warc_type=warctools.WarcRecord.REQUEST, - content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE, - concurrent_to=principal_record.id) + # returns a tuple (principal_record, ...) + def build_warc_records(self, recorded_url): + warc_date = warctools.warc.warc_datetime_str(datetime.utcnow()) - return principal_record, request_record + if recorded_url.response_recorder: + principal_record = self._build_response_principal_record(recorded_url, warc_date) + request_record = self.build_warc_record(url=recorded_url.url, + warc_date=warc_date, data=recorded_url.request_data, + warc_type=warctools.WarcRecord.REQUEST, + content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE, + concurrent_to=principal_record.id) + return principal_record, request_record + else: + principal_record = self.build_warc_record(url=recorded_url.url, + warc_date=warc_date, data=recorded_url.request_data, + warc_type=recorded_url.custom_type, + content_type=recorded_url.content_type) + return (principal_record,) def digest_str(self, hash_obj): return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii')) @@ -278,11 +275,11 @@ class WarcWriter: for record in recordset: offset = writer.tell() record.write_to(writer, gzip=self.gzip) - self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format( + self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d', record.get_header(warctools.WarcRecord.TYPE), record.get_header(warctools.WarcRecord.CONTENT_LENGTH), record.get_header(warctools.WarcRecord.URL), - self._fpath, offset)) + self._fpath, offset) self._f.flush() @@ -296,21 +293,23 @@ class WarcWriter: self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity)) self.close_writer() -class WarcWriterThread(threading.Thread): +class WarcWriterPool: logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread") - def __init__(self, recorded_url_q=None, default_warc_writer=None): - """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" - threading.Thread.__init__(self, name='WarcWriterThread') - self.recorded_url_q = recorded_url_q - self.stop = threading.Event() + def __init__(self, default_warc_writer): if default_warc_writer: self.default_warc_writer = default_warc_writer else: self.default_warc_writer = WarcWriter() self.warc_writers = {} # {prefix:WarcWriter} + self._last_sync = time.time() - def write_records(self, recorded_url): + self.logger.info('directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( + os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, + self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) + + # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set + def _writer(self, recorded_url): w = self.default_warc_writer if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta: # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) @@ -327,37 +326,55 @@ class WarcWriterThread(threading.Thread): dedup_db=self.default_warc_writer.dedup_db, playback_index_db=self.default_warc_writer.playback_index_db) w = self.warc_writers[prefix] - w.write_records(recorded_url) + return w + + def write_records(self, recorded_url): + self._writer(recorded_url).write_records(recorded_url) + + def maybe_idle_rollover(self): + self.default_warc_writer.maybe_idle_rollover() + for w in self.warc_writers.values(): + w.maybe_idle_rollover() + + def sync(self): + # XXX prob doesn't belong here (do we need it at all?) + if time.time() - self._last_sync > 60: + if self.default_warc_writer.dedup_db: + self.default_warc_writer.dedup_db.sync() + if self.default_warc_writer.playback_index_db: + self.default_warc_writer.playback_index_db.sync() + self._last_sync = time.time() + + def close_writers(self): + self.default_warc_writer.close_writer() + for w in self.warc_writers.values(): + w.close_writer() + +class WarcWriterThread(threading.Thread): + logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread") + + def __init__(self, recorded_url_q=None, writer_pool=None): + """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" + threading.Thread.__init__(self, name='WarcWriterThread') + self.recorded_url_q = recorded_url_q + self.stop = threading.Event() + if writer_pool: + self.writer_pool = writer_pool + else: + self.writer_pool = WarcWriterPool() def run(self): try: - self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, - self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) - - self._last_sync = time.time() - while not self.stop.is_set(): try: recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) - self.write_records(recorded_url) + self.writer_pool.write_records(recorded_url) except queue.Empty: - self.default_warc_writer.maybe_idle_rollover() - for w in self.warc_writers.values(): - w.maybe_idle_rollover() - - # XXX prob doesn't belong here (do we need it at all?) - if time.time() - self._last_sync > 60: - if self.default_warc_writer.dedup_db: - self.default_warc_writer.dedup_db.sync() - if self.default_warc_writer.playback_index_db: - self.default_warc_writer.playback_index_db.sync() - self._last_sync = time.time() + self.writer_pool.maybe_idle_rollover() + self.writer_pool.sync() self.logger.info('WarcWriterThread shutting down') - self.default_warc_writer.close_writer() - for w in self.warc_writers.values(): - w.close_writer() + self.writer_pool.close_writers() except: self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)