mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
refactor proxy handler to use do_* methods for custom http verbs; refactor warc writer thread to use new WarcWriterPool class
This commit is contained in:
parent
084bd75ed6
commit
771383d0a6
@ -61,8 +61,8 @@ class WarcproxController(object):
|
|||||||
self.proxy.shutdown()
|
self.proxy.shutdown()
|
||||||
self.proxy.server_close()
|
self.proxy.server_close()
|
||||||
|
|
||||||
if self.warc_writer_thread.default_warc_writer.dedup_db is not None:
|
if self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db is not None:
|
||||||
self.warc_writer_thread.default_warc_writer.dedup_db.close()
|
self.warc_writer_thread.writer_pool.default_warc_writer.dedup_db.close()
|
||||||
|
|
||||||
if self.playback_proxy is not None:
|
if self.playback_proxy is not None:
|
||||||
self.playback_proxy.shutdown()
|
self.playback_proxy.shutdown()
|
||||||
|
@ -145,8 +145,9 @@ def main(argv=sys.argv):
|
|||||||
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
|
dedup_db=dedup_db, digest_algorithm=args.digest_algorithm,
|
||||||
playback_index_db=playback_index_db,
|
playback_index_db=playback_index_db,
|
||||||
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
|
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
|
||||||
|
writer_pool=warcprox.warcwriter.WarcWriterPool(default_warc_writer)
|
||||||
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
|
warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=recorded_url_q,
|
||||||
default_warc_writer=default_warc_writer)
|
writer_pool=writer_pool)
|
||||||
|
|
||||||
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
self._transition_to_ssl()
|
self._transition_to_ssl()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
try:
|
try:
|
||||||
self.logger.error("problem with connect line {}: {}".format(repr(self.requestline), e), exc_info=True)
|
self.logger.error("problem handling {}: {}".format(repr(self.requestline), e), exc_info=True)
|
||||||
if type(e) is socket.timeout:
|
if type(e) is socket.timeout:
|
||||||
self.send_error(504, str(e))
|
self.send_error(504, str(e))
|
||||||
else:
|
else:
|
||||||
@ -117,13 +117,6 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
def do_COMMAND(self):
|
def do_COMMAND(self):
|
||||||
if not self.is_connect:
|
if not self.is_connect:
|
||||||
if self.command == 'PUTMETA':
|
|
||||||
self._special_request(method=self.command, type_='metadata')
|
|
||||||
return
|
|
||||||
# if self.command == 'PUTRES':
|
|
||||||
# self._special_request(type_='resource')
|
|
||||||
# return
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Connect to destination
|
# Connect to destination
|
||||||
self._determine_host_port()
|
self._determine_host_port()
|
||||||
|
@ -36,6 +36,7 @@ import traceback
|
|||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
|
from hanzo import warctools
|
||||||
|
|
||||||
from certauth.certauth import CertificateAuthority
|
from certauth.certauth import CertificateAuthority
|
||||||
import warcprox.mitmproxy
|
import warcprox.mitmproxy
|
||||||
@ -179,7 +180,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
if 'Content-Length' in self.headers:
|
if 'Content-Length' in self.headers:
|
||||||
req += self.rfile.read(int(self.headers['Content-Length']))
|
req += self.rfile.read(int(self.headers['Content-Length']))
|
||||||
|
|
||||||
self.logger.debug('req={}'.format(repr(req)))
|
self.logger.debug('sending to remote server req={}'.format(repr(req)))
|
||||||
|
|
||||||
# Send it down the pipe!
|
# Send it down the pipe!
|
||||||
self._proxy_sock.sendall(req)
|
self._proxy_sock.sendall(req)
|
||||||
@ -222,12 +223,17 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
|
|
||||||
return recorded_url
|
return recorded_url
|
||||||
|
|
||||||
def _special_request(self, method, type_):
|
# deprecated
|
||||||
|
def do_PUTMETA(self):
|
||||||
|
self.do_WARCPROX_WRITE_RECORD(warc_type=warctools.WarcRecord.METADATA)
|
||||||
|
|
||||||
|
def do_WARCPROX_WRITE_RECORD(self, warc_type=None):
|
||||||
try:
|
try:
|
||||||
self.url = self.path
|
self.url = self.path
|
||||||
|
|
||||||
if (method == 'PUTMETA' and 'Content-Length' in self.headers
|
if ('Content-Length' in self.headers and 'Content-Type' in self.headers
|
||||||
and 'Content-Type' in self.headers):
|
and (warc_type or 'WARC-Type' in self.headers)):
|
||||||
|
# stream this?
|
||||||
request_data = self.rfile.read(int(self.headers['Content-Length']))
|
request_data = self.rfile.read(int(self.headers['Content-Length']))
|
||||||
|
|
||||||
warcprox_meta = self.headers.get('Warcprox-Meta')
|
warcprox_meta = self.headers.get('Warcprox-Meta')
|
||||||
@ -238,10 +244,10 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
remote_ip=b'',
|
remote_ip=b'',
|
||||||
warcprox_meta=warcprox_meta,
|
warcprox_meta=warcprox_meta,
|
||||||
content_type=self.headers['Content-Type'].encode('latin1'),
|
content_type=self.headers['Content-Type'].encode('latin1'),
|
||||||
custom_type=type_,
|
custom_type=warc_type or self.headers['WARC-Type'],
|
||||||
status=204, size=len(request_data),
|
status=204, size=len(request_data),
|
||||||
client_ip=self.client_address[0],
|
client_ip=self.client_address[0],
|
||||||
method=method)
|
method=self.command)
|
||||||
|
|
||||||
self.server.recorded_url_q.put(rec_custom)
|
self.server.recorded_url_q.put(rec_custom)
|
||||||
self.send_response(204, 'OK')
|
self.send_response(204, 'OK')
|
||||||
@ -250,7 +256,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
|||||||
|
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
except:
|
except:
|
||||||
self.logger.error("uncaught except in _special_request", exc_info=True)
|
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def log_error(self, fmt, *args):
|
def log_error(self, fmt, *args):
|
||||||
|
@ -52,21 +52,8 @@ class WarcWriter:
|
|||||||
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
|
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory))
|
||||||
os.mkdir(directory)
|
os.mkdir(directory)
|
||||||
|
|
||||||
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
|
def _build_response_principal_record(self, recorded_url, warc_date):
|
||||||
def build_warc_records(self, recorded_url):
|
"""Builds response or revisit record, whichever is appropriate."""
|
||||||
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
|
|
||||||
|
|
||||||
dedup_info = None
|
|
||||||
|
|
||||||
# metadata special case
|
|
||||||
if recorded_url.custom_type == 'metadata':
|
|
||||||
metadata_rec = self.build_warc_record(url=recorded_url.url,
|
|
||||||
warc_date=warc_date,
|
|
||||||
data=recorded_url.request_data,
|
|
||||||
warc_type=warctools.WarcRecord.METADATA,
|
|
||||||
content_type=recorded_url.content_type)
|
|
||||||
return [metadata_rec]
|
|
||||||
|
|
||||||
if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None:
|
if self.dedup_db is not None and recorded_url.response_recorder.payload_digest is not None:
|
||||||
key = self.digest_str(recorded_url.response_recorder.payload_digest)
|
key = self.digest_str(recorded_url.response_recorder.payload_digest)
|
||||||
dedup_info = self.dedup_db.lookup(key)
|
dedup_info = self.dedup_db.lookup(key)
|
||||||
@ -79,7 +66,7 @@ class WarcWriter:
|
|||||||
else:
|
else:
|
||||||
response_header_block = recorded_url.response_recorder.tempfile.read()
|
response_header_block = recorded_url.response_recorder.tempfile.read()
|
||||||
|
|
||||||
principal_record = self.build_warc_record(
|
return self.build_warc_record(
|
||||||
url=recorded_url.url, warc_date=warc_date,
|
url=recorded_url.url, warc_date=warc_date,
|
||||||
data=response_header_block,
|
data=response_header_block,
|
||||||
warc_type=warctools.WarcRecord.REVISIT,
|
warc_type=warctools.WarcRecord.REVISIT,
|
||||||
@ -92,21 +79,31 @@ class WarcWriter:
|
|||||||
remote_ip=recorded_url.remote_ip)
|
remote_ip=recorded_url.remote_ip)
|
||||||
else:
|
else:
|
||||||
# response record
|
# response record
|
||||||
principal_record = self.build_warc_record(
|
return self.build_warc_record(
|
||||||
url=recorded_url.url, warc_date=warc_date,
|
url=recorded_url.url, warc_date=warc_date,
|
||||||
recorder=recorded_url.response_recorder,
|
recorder=recorded_url.response_recorder,
|
||||||
warc_type=warctools.WarcRecord.RESPONSE,
|
warc_type=warctools.WarcRecord.RESPONSE,
|
||||||
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
|
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
|
||||||
remote_ip=recorded_url.remote_ip)
|
remote_ip=recorded_url.remote_ip)
|
||||||
|
|
||||||
request_record = self.build_warc_record(
|
# returns a tuple (principal_record, ...)
|
||||||
url=recorded_url.url, warc_date=warc_date,
|
def build_warc_records(self, recorded_url):
|
||||||
data=recorded_url.request_data,
|
warc_date = warctools.warc.warc_datetime_str(datetime.utcnow())
|
||||||
warc_type=warctools.WarcRecord.REQUEST,
|
|
||||||
content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE,
|
|
||||||
concurrent_to=principal_record.id)
|
|
||||||
|
|
||||||
return principal_record, request_record
|
if recorded_url.response_recorder:
|
||||||
|
principal_record = self._build_response_principal_record(recorded_url, warc_date)
|
||||||
|
request_record = self.build_warc_record(url=recorded_url.url,
|
||||||
|
warc_date=warc_date, data=recorded_url.request_data,
|
||||||
|
warc_type=warctools.WarcRecord.REQUEST,
|
||||||
|
content_type=hanzo.httptools.RequestMessage.CONTENT_TYPE,
|
||||||
|
concurrent_to=principal_record.id)
|
||||||
|
return principal_record, request_record
|
||||||
|
else:
|
||||||
|
principal_record = self.build_warc_record(url=recorded_url.url,
|
||||||
|
warc_date=warc_date, data=recorded_url.request_data,
|
||||||
|
warc_type=recorded_url.custom_type,
|
||||||
|
content_type=recorded_url.content_type)
|
||||||
|
return (principal_record,)
|
||||||
|
|
||||||
def digest_str(self, hash_obj):
|
def digest_str(self, hash_obj):
|
||||||
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii'))
|
return hash_obj.name.encode('utf-8') + b':' + (base64.b32encode(hash_obj.digest()) if self.base32 else hash_obj.hexdigest().encode('ascii'))
|
||||||
@ -278,11 +275,11 @@ class WarcWriter:
|
|||||||
for record in recordset:
|
for record in recordset:
|
||||||
offset = writer.tell()
|
offset = writer.tell()
|
||||||
record.write_to(writer, gzip=self.gzip)
|
record.write_to(writer, gzip=self.gzip)
|
||||||
self.logger.debug('wrote warc record: warc_type={} content_length={} url={} warc={} offset={}'.format(
|
self.logger.debug('wrote warc record: warc_type=%s content_length=%s url=%s warc=%s offset=%d',
|
||||||
record.get_header(warctools.WarcRecord.TYPE),
|
record.get_header(warctools.WarcRecord.TYPE),
|
||||||
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
|
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
|
||||||
record.get_header(warctools.WarcRecord.URL),
|
record.get_header(warctools.WarcRecord.URL),
|
||||||
self._fpath, offset))
|
self._fpath, offset)
|
||||||
|
|
||||||
self._f.flush()
|
self._f.flush()
|
||||||
|
|
||||||
@ -296,21 +293,23 @@ class WarcWriter:
|
|||||||
self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity))
|
self.logger.debug('rolling over {} after {} seconds idle'.format(self._f_finalname, time.time() - self._last_activity))
|
||||||
self.close_writer()
|
self.close_writer()
|
||||||
|
|
||||||
class WarcWriterThread(threading.Thread):
|
class WarcWriterPool:
|
||||||
logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread")
|
logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread")
|
||||||
|
|
||||||
def __init__(self, recorded_url_q=None, default_warc_writer=None):
|
def __init__(self, default_warc_writer):
|
||||||
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
|
|
||||||
threading.Thread.__init__(self, name='WarcWriterThread')
|
|
||||||
self.recorded_url_q = recorded_url_q
|
|
||||||
self.stop = threading.Event()
|
|
||||||
if default_warc_writer:
|
if default_warc_writer:
|
||||||
self.default_warc_writer = default_warc_writer
|
self.default_warc_writer = default_warc_writer
|
||||||
else:
|
else:
|
||||||
self.default_warc_writer = WarcWriter()
|
self.default_warc_writer = WarcWriter()
|
||||||
self.warc_writers = {} # {prefix:WarcWriter}
|
self.warc_writers = {} # {prefix:WarcWriter}
|
||||||
|
self._last_sync = time.time()
|
||||||
|
|
||||||
def write_records(self, recorded_url):
|
self.logger.info('directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
|
||||||
|
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
|
||||||
|
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
|
||||||
|
|
||||||
|
# chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
|
||||||
|
def _writer(self, recorded_url):
|
||||||
w = self.default_warc_writer
|
w = self.default_warc_writer
|
||||||
if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta:
|
if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta:
|
||||||
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
|
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
|
||||||
@ -327,37 +326,55 @@ class WarcWriterThread(threading.Thread):
|
|||||||
dedup_db=self.default_warc_writer.dedup_db,
|
dedup_db=self.default_warc_writer.dedup_db,
|
||||||
playback_index_db=self.default_warc_writer.playback_index_db)
|
playback_index_db=self.default_warc_writer.playback_index_db)
|
||||||
w = self.warc_writers[prefix]
|
w = self.warc_writers[prefix]
|
||||||
w.write_records(recorded_url)
|
return w
|
||||||
|
|
||||||
|
def write_records(self, recorded_url):
|
||||||
|
self._writer(recorded_url).write_records(recorded_url)
|
||||||
|
|
||||||
|
def maybe_idle_rollover(self):
|
||||||
|
self.default_warc_writer.maybe_idle_rollover()
|
||||||
|
for w in self.warc_writers.values():
|
||||||
|
w.maybe_idle_rollover()
|
||||||
|
|
||||||
|
def sync(self):
|
||||||
|
# XXX prob doesn't belong here (do we need it at all?)
|
||||||
|
if time.time() - self._last_sync > 60:
|
||||||
|
if self.default_warc_writer.dedup_db:
|
||||||
|
self.default_warc_writer.dedup_db.sync()
|
||||||
|
if self.default_warc_writer.playback_index_db:
|
||||||
|
self.default_warc_writer.playback_index_db.sync()
|
||||||
|
self._last_sync = time.time()
|
||||||
|
|
||||||
|
def close_writers(self):
|
||||||
|
self.default_warc_writer.close_writer()
|
||||||
|
for w in self.warc_writers.values():
|
||||||
|
w.close_writer()
|
||||||
|
|
||||||
|
class WarcWriterThread(threading.Thread):
|
||||||
|
logger = logging.getLogger("warcprox.warcwriter.WarcWriterThread")
|
||||||
|
|
||||||
|
def __init__(self, recorded_url_q=None, writer_pool=None):
|
||||||
|
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
|
||||||
|
threading.Thread.__init__(self, name='WarcWriterThread')
|
||||||
|
self.recorded_url_q = recorded_url_q
|
||||||
|
self.stop = threading.Event()
|
||||||
|
if writer_pool:
|
||||||
|
self.writer_pool = writer_pool
|
||||||
|
else:
|
||||||
|
self.writer_pool = WarcWriterPool()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
self.logger.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
|
|
||||||
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
|
|
||||||
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
|
|
||||||
|
|
||||||
self._last_sync = time.time()
|
|
||||||
|
|
||||||
while not self.stop.is_set():
|
while not self.stop.is_set():
|
||||||
try:
|
try:
|
||||||
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
|
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
|
||||||
self.write_records(recorded_url)
|
self.writer_pool.write_records(recorded_url)
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
self.default_warc_writer.maybe_idle_rollover()
|
self.writer_pool.maybe_idle_rollover()
|
||||||
for w in self.warc_writers.values():
|
self.writer_pool.sync()
|
||||||
w.maybe_idle_rollover()
|
|
||||||
|
|
||||||
# XXX prob doesn't belong here (do we need it at all?)
|
|
||||||
if time.time() - self._last_sync > 60:
|
|
||||||
if self.default_warc_writer.dedup_db:
|
|
||||||
self.default_warc_writer.dedup_db.sync()
|
|
||||||
if self.default_warc_writer.playback_index_db:
|
|
||||||
self.default_warc_writer.playback_index_db.sync()
|
|
||||||
self._last_sync = time.time()
|
|
||||||
|
|
||||||
self.logger.info('WarcWriterThread shutting down')
|
self.logger.info('WarcWriterThread shutting down')
|
||||||
self.default_warc_writer.close_writer()
|
self.writer_pool.close_writers()
|
||||||
for w in self.warc_writers.values():
|
|
||||||
w.close_writer()
|
|
||||||
except:
|
except:
|
||||||
self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)
|
self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user