diff --git a/setup.py b/setup.py index 02853d5..0b5c891 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ except: setuptools.setup( name='warcprox', - version='2.2.1b2.dev111', + version='2.2.1b2.dev112', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index f3d897d..c9547b2 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -200,9 +200,10 @@ class RethinkCaptures: return entry def notify(self, recorded_url, records): - entry = self._assemble_entry(recorded_url, records) - with self._batch_lock: - self._batch.append(entry) + if records: + entry = self._assemble_entry(recorded_url, records) + with self._batch_lock: + self._batch.append(entry) def close(self): self.stop() diff --git a/warcprox/crawl_log.py b/warcprox/crawl_log.py index 68d1fbf..5b4a4fc 100644 --- a/warcprox/crawl_log.py +++ b/warcprox/crawl_log.py @@ -36,11 +36,10 @@ class CrawlLogger(object): def notify(self, recorded_url, records): # 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"} now = datetime.datetime.utcnow() - extra_info = { - 'contentSize': recorded_url.size, - 'warcFilename': records[0].warc_filename, - 'warcFileOffset': records[0].offset, - } + extra_info = {'contentSize': recorded_url.size,} + if records: + extra_info['warcFilename'] = records[0].warc_filename + extra_info['warcFileOffset'] = records[0].offset if recorded_url.response_recorder: content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset payload_digest = warcprox.digest_str( @@ -49,8 +48,7 @@ class CrawlLogger(object): else: # WARCPROX_WRITE_RECORD request content_length = len(recorded_url.request_data) - payload_digest = records[0].get_header( - b'WARC-Payload-Digest') + payload_digest = records[0].get_header(b'WARC-Payload-Digest') fields = [ '{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000), '% 5s' % recorded_url.status, @@ -66,7 +64,7 @@ class CrawlLogger(object): recorded_url.duration.microseconds//1000), payload_digest, recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'), - 'duplicate:digest' if records[0].type == b'revisit' else '-', + 'duplicate:digest' if records and records[0].type == b'revisit' else '-', json.dumps(extra_info, separators=(',',':')), ] for i in range(len(fields)): diff --git a/warcprox/dedup.py b/warcprox/dedup.py index e70f5f9..b9cd223 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -93,7 +93,7 @@ class DedupDb(object): return result def notify(self, recorded_url, records): - if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str( recorded_url.response_recorder.payload_digest, @@ -172,7 +172,7 @@ class RethinkDedupDb: return result def notify(self, recorded_url, records): - if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + if (records and records[0].type == b'response' and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.options.base32) diff --git a/warcprox/playback.py b/warcprox/playback.py index a9aa47d..1a698c0 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -259,7 +259,8 @@ class PlaybackIndexDb(object): pass def notify(self, recorded_url, records): - self.save(records[0].warc_filename, records, records[0].offset) + if records: + self.save(records[0].warc_filename, records, records[0].offset) def save(self, warcfile, recordset, offset): response_record = recordset[0] diff --git a/warcprox/stats.py b/warcprox/stats.py index 55693a2..99e6804 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -170,12 +170,13 @@ class StatsDb: bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["wire_bytes"] += recorded_url.size - if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size + if records: + if records[0].type == b'revisit': + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size json_value = json.dumps(bucket_stats, separators=(',',':')) conn.execute( @@ -304,8 +305,7 @@ class RethinkStatsDb(StatsDb): def tally(self, recorded_url, records): buckets = self.buckets(recorded_url) - is_revisit = records[0].get_header( - warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT + is_revisit = records[0].type == b'revisit' with self._batch_lock: for bucket in buckets: bucket_stats = self._batch.setdefault( diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index e422a65..a8a6ef7 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -82,13 +82,15 @@ class WarcWriterThread(threading.Thread): self.logger.info("%s urls left to write", qsize) recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + records = [] self.idle = None if self._filter_accepts(recorded_url): if self.dedup_db: warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url, base32=self.options.base32) records = self.writer_pool.write_records(recorded_url) - self._final_tasks(recorded_url, records) + + self._final_tasks(recorded_url, records) # try to release resources in a timely fashion if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: @@ -134,11 +136,15 @@ class WarcWriterThread(threading.Thread): payload_digest = "-" # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} - self.logger.info("{} {} {} {} {} size={} {} {} {} offset={}".format( - recorded_url.client_ip, recorded_url.status, recorded_url.method, - recorded_url.url.decode("utf-8"), recorded_url.mimetype, - recorded_url.size, payload_digest, records[0].type.decode("utf-8"), - records[0].warc_filename, records[0].offset)) + type_ = records[0].type.decode("utf-8") if records else '-' + filename = records[0].warc_filename if records else '-' + offset = records[0].offset if records else '-' + self.logger.info( + "%s %s %s %s %s size=%s %s %s %s offset=%s", + recorded_url.client_ip, recorded_url.status, + recorded_url.method, recorded_url.url.decode("utf-8"), + recorded_url.mimetype, recorded_url.size, payload_digest, + type_, filename, offset) def _final_tasks(self, recorded_url, records): if self.listeners: