fix failing test just committed, which involves running "listeners" for all urls, including those not archived; make adjustments accordingly

This commit is contained in:
Noah Levitt 2017-11-09 13:10:57 -08:00
parent df6d7f1ce6
commit 700056cc04
7 changed files with 35 additions and 29 deletions

View File

@ -51,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.2.1b2.dev111',
version='2.2.1b2.dev112',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -200,9 +200,10 @@ class RethinkCaptures:
return entry
def notify(self, recorded_url, records):
entry = self._assemble_entry(recorded_url, records)
with self._batch_lock:
self._batch.append(entry)
if records:
entry = self._assemble_entry(recorded_url, records)
with self._batch_lock:
self._batch.append(entry)
def close(self):
self.stop()

View File

@ -36,11 +36,10 @@ class CrawlLogger(object):
def notify(self, recorded_url, records):
# 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"}
now = datetime.datetime.utcnow()
extra_info = {
'contentSize': recorded_url.size,
'warcFilename': records[0].warc_filename,
'warcFileOffset': records[0].offset,
}
extra_info = {'contentSize': recorded_url.size,}
if records:
extra_info['warcFilename'] = records[0].warc_filename
extra_info['warcFileOffset'] = records[0].offset
if recorded_url.response_recorder:
content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset
payload_digest = warcprox.digest_str(
@ -49,8 +48,7 @@ class CrawlLogger(object):
else:
# WARCPROX_WRITE_RECORD request
content_length = len(recorded_url.request_data)
payload_digest = records[0].get_header(
b'WARC-Payload-Digest')
payload_digest = records[0].get_header(b'WARC-Payload-Digest')
fields = [
'{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'% 5s' % recorded_url.status,
@ -66,7 +64,7 @@ class CrawlLogger(object):
recorded_url.duration.microseconds//1000),
payload_digest,
recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'),
'duplicate:digest' if records[0].type == b'revisit' else '-',
'duplicate:digest' if records and records[0].type == b'revisit' else '-',
json.dumps(extra_info, separators=(',',':')),
]
for i in range(len(fields)):

View File

@ -93,7 +93,7 @@ class DedupDb(object):
return result
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(
recorded_url.response_recorder.payload_digest,
@ -172,7 +172,7 @@ class RethinkDedupDb:
return result
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.options.base32)

View File

@ -259,7 +259,8 @@ class PlaybackIndexDb(object):
pass
def notify(self, recorded_url, records):
self.save(records[0].warc_filename, records, records[0].offset)
if records:
self.save(records[0].warc_filename, records, records[0].offset)
def save(self, warcfile, recordset, offset):
response_record = recordset[0]

View File

@ -170,12 +170,13 @@ class StatsDb:
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute(
@ -304,8 +305,7 @@ class RethinkStatsDb(StatsDb):
def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url)
is_revisit = records[0].get_header(
warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT
is_revisit = records[0].type == b'revisit'
with self._batch_lock:
for bucket in buckets:
bucket_stats = self._batch.setdefault(

View File

@ -82,13 +82,15 @@ class WarcWriterThread(threading.Thread):
self.logger.info("%s urls left to write", qsize)
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
records = []
self.idle = None
if self._filter_accepts(recorded_url):
if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records)
self._final_tasks(recorded_url, records)
# try to release resources in a timely fashion
if recorded_url.response_recorder and recorded_url.response_recorder.tempfile:
@ -134,11 +136,15 @@ class WarcWriterThread(threading.Thread):
payload_digest = "-"
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
self.logger.info("{} {} {} {} {} size={} {} {} {} offset={}".format(
recorded_url.client_ip, recorded_url.status, recorded_url.method,
recorded_url.url.decode("utf-8"), recorded_url.mimetype,
recorded_url.size, payload_digest, records[0].type.decode("utf-8"),
records[0].warc_filename, records[0].offset))
type_ = records[0].type.decode("utf-8") if records else '-'
filename = records[0].warc_filename if records else '-'
offset = records[0].offset if records else '-'
self.logger.info(
"%s %s %s %s %s size=%s %s %s %s offset=%s",
recorded_url.client_ip, recorded_url.status,
recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset)
def _final_tasks(self, recorded_url, records):
if self.listeners: