refactor so that warc records are constructed in the warc writer thread; this way the disk-based dedup lookup, to decide whether to write a revisit record, happens out-of-band; and maybe more importantly, now all dedup db reading and writing happens in a single thread, so we don't have to worry about dbm thread safety; also, dedup info is not saved or looked up for urls with empty payload

This commit is contained in:
Noah Levitt 2013-10-30 13:36:32 -07:00
parent 1967b6aabf
commit e370ec6fe2

View File

@ -183,6 +183,12 @@ class ProxyingRecorder:
def __len__(self):
return self.len
def payload_size(self):
if self.payload_offset is not None:
return self.len - self.payload_offset
else:
return 0
class ProxyingRecordingHTTPResponse(httplib.HTTPResponse):
@ -325,7 +331,9 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
h.close()
self._proxy_sock.close()
self.server.recordset_q.create_and_queue(self.url, req, h.recorder, remote_ip)
recorded_url = RecordedUrl(url=self.url, request_data=req,
response_recorder=h.recorder, remote_ip=remote_ip)
self.server.recorded_url_q.put(recorded_url)
def __getattr__(self, item):
@ -341,14 +349,22 @@ class WarcProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.log_date_time_string(), fmt % args))
class RecordedUrl:
def __init__(self, url, request_data, response_recorder, remote_ip):
self.url = url
self.request_data = request_data
self.response_recorder = response_recorder
self.remote_ip = remote_ip
class WarcProxy(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, server_address, req_handler_class=WarcProxyHandler,
bind_and_activate=True, ca_file='./warcprox-ca.pem',
certs_dir='./warcprox-ca', recordset_q=None):
certs_dir='./warcprox-ca', recorded_url_q=None):
BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.ca = CertificateAuthority(ca_file, certs_dir)
self.recordset_q = recordset_q
self.recorded_url_q = recorded_url_q
def server_activate(self):
BaseHTTPServer.HTTPServer.server_activate(self)
@ -374,21 +390,16 @@ class DedupDb:
self.db.close()
def warc_record_written(self, record, warcfile, offset):
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
return
def save(self, key, response_record, offset):
record_id = response_record.get_header(warctools.WarcRecord.ID)
url = response_record.get_header(warctools.WarcRecord.URL)
date = response_record.get_header(warctools.WarcRecord.DATE)
key = record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST)
if key is None:
return
py_value = {'i':record_id, 'u':url, 'd':date}
json_value = json.dumps(py_value, separators=(',',':'))
record_id = record.get_header(warctools.WarcRecord.ID)
url = record.get_header(warctools.WarcRecord.URL)
date = record.get_header(warctools.WarcRecord.DATE)
value = json.dumps({'i':record_id, 'u':url, 'd':date})
self.db[key] = value
self.db[key] = json_value
logging.info('dedup db saved {}={}'.format(key, json_value))
def lookup(self, key):
@ -400,57 +411,85 @@ class DedupDb:
return None
# Each item in the queue is a tuple of warc records, which should be written
# consecutively in the same warc.
class WarcRecordsetQueue(Queue.Queue):
class WarcWriterThread(threading.Thread):
def __init__(self, base32=False, dedup_db=None):
Queue.Queue.__init__(self)
# port is only used for warc filename
def __init__(self, recorded_url_q, directory, rollover_size=1000000000,
rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0,
base32=False, dedup_db=None):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q
self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self.gzip = gzip
self.base32 = base32
self.dedup_db = dedup_db
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
self.port = port
def create_and_queue(self, url, request_data, response_recorder, remote_ip):
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
logging.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory)
self.stop = threading.Event()
self.listeners = []
# returns a tuple (principal_record, request_record) where principal_record is either a response or revisit record
def build_warc_records(self, recorded_url):
warc_date = warctools.warc.warc_datetime_str(datetime.now())
dedup_info = None
if dedup_db is not None and response_recorder.payload_sha1 is not None:
key = 'sha1:{}'.format(self.digest_str(response_recorder.payload_sha1))
if dedup_db is not None and recorded_url.response_recorder.payload_sha1 is not None:
key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1))
dedup_info = dedup_db.lookup(key)
if dedup_info is not None:
# revisit record
response_recorder.tempfile.seek(0)
if response_recorder.payload_offset is not None:
response_header_block = response_recorder.tempfile.read(response_recorder.payload_offset)
recorded_url.response_recorder.tempfile.seek(0)
if recorded_url.response_recorder.payload_offset is not None:
response_header_block = recorded_url.response_recorder.tempfile.read(recorded_url.response_recorder.payload_offset)
else:
response_header_block = response_recorder.tempfile.read()
response_header_block = recorded_url.response_recorder.tempfile.read()
principal_record, principal_record_id = self.make_record(url=url,
warc_date=warc_date, data=response_header_block,
principal_record, principal_record_id = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=dedup_info['i'],
refers_to_target_uri=dedup_info['u'],
refers_to_date=dedup_info['d'],
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE,
remote_ip=remote_ip)
remote_ip=recorded_url.remote_ip)
else:
# response record
principal_record, principal_record_id = self.make_record(url=url,
warc_date=warc_date, recorder=response_recorder,
principal_record, principal_record_id = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=warctools.WarcRecord.HTTP_RESPONSE_MIMETYPE,
remote_ip=remote_ip)
remote_ip=recorded_url.remote_ip)
request_record, request_record_id = self.make_record(url=url,
warc_date=warc_date, data=request_data,
request_record, request_record_id = self.build_warc_record(
url=recorded_url.url, warc_date=warc_date,
data=recorded_url.request_data,
warc_type=warctools.WarcRecord.REQUEST,
content_type=warctools.WarcRecord.HTTP_REQUEST_MIMETYPE,
concurrent_to=principal_record_id)
record_group = (principal_record, request_record)
self.put(record_group)
return principal_record, request_record
def digest_str(self, hash_obj):
@ -460,7 +499,7 @@ class WarcRecordsetQueue(Queue.Queue):
return hash_obj.hexdigest()
def make_record(self, url, warc_date=None, recorder=None, data=None,
def build_warc_record(self, url, warc_date=None, recorder=None, data=None,
concurrent_to=None, warc_type=None, content_type=None, remote_ip=None,
profile=None, refers_to=None, refers_to_target_uri=None,
refers_to_date=None):
@ -491,7 +530,6 @@ class WarcRecordsetQueue(Queue.Queue):
if content_type is not None:
headers.append((warctools.WarcRecord.CONTENT_TYPE, content_type))
if recorder is not None:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder))))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
@ -513,38 +551,6 @@ class WarcRecordsetQueue(Queue.Queue):
return record, record_id
class WarcWriterThread(threading.Thread):
# port is only used for warc filename
def __init__(self, recordset_q, directory, rollover_size=1000000000, rollover_idle_time=None, gzip=False, prefix='WARCPROX', port=0):
threading.Thread.__init__(self, name='WarcWriterThread')
self.recordset_q = recordset_q
self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time
self.gzip = gzip
# warc path and filename stuff
self.directory = directory
self.prefix = prefix
self.port = port
self._f = None
self._fpath = None
self._serial = 0
if not os.path.exists(directory):
logging.info("warc destination directory {} doesn't exist, creating it".format(directory))
os.mkdir(directory)
self.stop = threading.Event()
self.listeners = []
def timestamp17(self):
now = datetime.now()
return '{}{}'.format(now.strftime('%Y%m%d%H%M%S'), now.microsecond//1000)
@ -559,7 +565,7 @@ class WarcWriterThread(threading.Thread):
self._fpath = None
self._f = None
def _make_warcinfo_record(self, filename):
def _build_warcinfo_record(self, filename):
warc_record_date = warctools.warc.warc_datetime_str(datetime.now())
record_id = warctools.WarcRecord.random_warc_uuid()
@ -575,7 +581,7 @@ class WarcWriterThread(threading.Thread):
warcinfo_fields.append('hostname: {0}'.format(hostname))
warcinfo_fields.append('ip: {0}'.format(socket.gethostbyname(hostname)))
warcinfo_fields.append('format: WARC File Format 1.0')
warcinfo_fields.append('robots: ignore') # XXX implement robots support
# warcinfo_fields.append('robots: ignore')
# warcinfo_fields.append('description: {0}'.format(self.description))
# warcinfo_fields.append('isPartOf: {0}'.format(self.is_part_of))
data = '\r\n'.join(warcinfo_fields) + '\r\n'
@ -598,7 +604,7 @@ class WarcWriterThread(threading.Thread):
self._f = open(self._fpath, 'wb')
warcinfo_record = self._make_warcinfo_record(filename)
warcinfo_record = self._build_warcinfo_record(filename)
warcinfo_record.write_to(self._f, gzip=self.gzip)
self._serial += 1
@ -606,10 +612,14 @@ class WarcWriterThread(threading.Thread):
return self._f
def register_listener(self, listener):
"""listener should be a function that takes 3 arguments (record, warcfile, offset)"""
self.listeners.append(listener)
def _final_tasks(self, recorded_url, recordset, recordset_offset):
if (self.dedup_db is not None
and recordset[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = 'sha1:{}'.format(self.digest_str(recorded_url.response_recorder.payload_sha1))
self.dedup_db.save(key, recordset[0], recordset_offset)
recorded_url.response_recorder.tempfile.close()
def run(self):
logging.info('WarcWriterThread starting, directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
@ -620,11 +630,14 @@ class WarcWriterThread(threading.Thread):
while not self.stop.is_set():
try:
recordset = self.recordset_q.get(block=True, timeout=0.5)
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self._last_activity = time.time()
recordset = self.build_warc_records(recorded_url)
writer = self._writer()
recordset_offset = writer.tell()
for record in recordset:
offset = writer.tell()
@ -635,11 +648,10 @@ class WarcWriterThread(threading.Thread):
record.get_header(warctools.WarcRecord.URL),
self._fpath, offset))
for listener in self.listeners:
listener(record, self._fpath, offset)
self._f.flush()
self._final_tasks(recorded_url, recordset, recordset_offset)
except Queue.Empty:
if (self._fpath is not None
and self.rollover_idle_time is not None
@ -707,25 +719,17 @@ if __name__ == '__main__':
else:
dedup_db = DedupDb(args.dedup_db_file)
recordset_q = WarcRecordsetQueue(base32=args.base32, dedup_db=dedup_db)
recorded_url_q = Queue.Queue()
proxy = WarcProxy(server_address=(args.address, int(args.port)),
ca_file=args.cacert, certs_dir=args.certs_dir,
recordset_q=recordset_q)
recorded_url_q=recorded_url_q)
warc_writer = WarcWriterThread(recordset_q=recordset_q,
warc_writer = WarcWriterThread(recorded_url_q=recorded_url_q,
directory=args.directory, gzip=args.gzip, prefix=args.prefix,
port=int(args.port), rollover_size=int(args.size),
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
def close_content_file(record, warcfile, offset):
if record.content_file:
logging.info('closing record.content_file={}'.format(record.content_file))
record.content_file.close()
warc_writer.register_listener(close_content_file)
if dedup_db is not None:
warc_writer.register_listener(dedup_db.warc_record_written)
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None,
base32=args.base32, dedup_db=dedup_db)
proxy_thread = threading.Thread(target=proxy.serve_forever, name='ProxyThread')
proxy_thread.start()