Merge pull request #99 from vbanos/blackout_period

New --blackout-period option to skip writing redundant revisits to WARC
This commit is contained in:
Noah Levitt 2018-08-03 17:27:42 -07:00 committed by GitHub
commit 0031091d4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 3 deletions

View File

@ -22,7 +22,7 @@ USA.
import os import os
import fcntl import fcntl
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from datetime import datetime from datetime import datetime, timedelta
import pytest import pytest
import re import re
from warcprox.mitmproxy import ProxyingRecorder from warcprox.mitmproxy import ProxyingRecorder
@ -129,7 +129,7 @@ def test_special_dont_write_prefix():
wwt.join() wwt.join()
wwt = warcprox.writerthread.WarcWriterProcessor( wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1)) Options(writer_threads=1, blackout_period=60, prefix='foo'))
wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1)
try: try:
@ -158,6 +158,41 @@ def test_special_dont_write_prefix():
recorded_url = wwt.outq.get(timeout=10) recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records assert not recorded_url.warc_records
assert wwt.outq.empty() assert wwt.outq.empty()
# test blackout_period option. Write first revisit record because
# its outside the blackout_period (60). Do not write the second
# because its inside the blackout_period.
recorder = ProxyingRecorder(io.BytesIO(b'test1'), None)
recorder.read()
old = datetime.utcnow() - timedelta(0, 3600)
ru = RecordedUrl(
url='http://example.com/dup',
content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest)
ru.dedup_info = dict(id=b'1', url=b'http://example.com/dup',
date=old.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8'))
wwt.inq.put(ru)
recorded_url = wwt.outq.get(timeout=10)
recorder = ProxyingRecorder(io.BytesIO(b'test2'), None)
recorder.read()
recent = datetime.utcnow() - timedelta(0, 5)
ru = RecordedUrl(
url='http://example.com/dup', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest)
ru.dedup_info = dict(id=b'2', url=b'http://example.com/dup',
date=recent.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8'))
wwt.inq.put(ru)
assert recorded_url.warc_records
recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records
assert wwt.outq.empty()
finally: finally:
wwt.stop.set() wwt.stop.set()
wwt.join() wwt.join()

View File

@ -158,6 +158,9 @@ def _build_arg_parser(prog='warcprox'):
# Warcprox-Meta HTTP header. By default, we dedup all requests. # Warcprox-Meta HTTP header. By default, we dedup all requests.
arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket',
action='store_true', default=False, help=argparse.SUPPRESS) action='store_true', default=False, help=argparse.SUPPRESS)
arg_parser.add_argument('--blackout-period', dest='blackout_period',
type=int, default=0,
help='skip writing a revisit record if its too close to the original capture')
arg_parser.add_argument('--queue-size', dest='queue_size', type=int, arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS) default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int, arg_parser.add_argument('--max-threads', dest='max_threads', type=int,

View File

@ -31,6 +31,7 @@ import logging
import time import time
import warcprox import warcprox
from concurrent import futures from concurrent import futures
from datetime import datetime
import threading import threading
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
@ -52,6 +53,7 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
max_workers=options.writer_threads or 1, max_workers=options.writer_threads or 1,
max_queued=10 * (options.writer_threads or 1)) max_queued=10 * (options.writer_threads or 1))
self.batch = set() self.batch = set()
self.blackout_period = options.blackout_period or 0
def _startup(self): def _startup(self):
self.logger.info('%s warc writer threads', self.pool._max_workers) self.logger.info('%s warc writer threads', self.pool._max_workers)
@ -114,7 +116,26 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
else self.options.prefix) else self.options.prefix)
# special warc name prefix '-' means "don't archive" # special warc name prefix '-' means "don't archive"
return (prefix != '-' and not recorded_url.do_not_archive return (prefix != '-' and not recorded_url.do_not_archive
and self._filter_accepts(recorded_url)) and self._filter_accepts(recorded_url)
and not self._in_blackout(recorded_url))
def _in_blackout(self, recorded_url):
"""If --blackout-period=N (sec) is set, check if duplicate record
datetime is close to the original. If yes, we don't write it to WARC.
The aim is to avoid having unnecessary `revisit` records.
Return Boolean
"""
if self.blackout_period and hasattr(recorded_url, "dedup_info") and \
recorded_url.dedup_info:
dedup_date = recorded_url.dedup_info.get('date')
if dedup_date and recorded_url.dedup_info.get('url') == recorded_url.url:
try:
dt = datetime.strptime(dedup_date.decode('utf-8'),
'%Y-%m-%dT%H:%M:%SZ')
return (datetime.utcnow() - dt).total_seconds() <= self.blackout_period
except ValueError:
return False
return False
def _log(self, recorded_url, records): def _log(self, recorded_url, records):
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"} # 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}