diff --git a/tests/test_writer.py b/tests/test_writer.py index 126932a..2675393 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -22,7 +22,7 @@ USA. import os import fcntl from multiprocessing import Process, Queue -from datetime import datetime +from datetime import datetime, timedelta import pytest import re from warcprox.mitmproxy import ProxyingRecorder @@ -129,7 +129,7 @@ def test_special_dont_write_prefix(): wwt.join() wwt = warcprox.writerthread.WarcWriterProcessor( - Options(writer_threads=1)) + Options(writer_threads=1, blackout_period=60, prefix='foo')) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -158,6 +158,42 @@ def test_special_dont_write_prefix(): recorded_url = wwt.outq.get(timeout=10) assert not recorded_url.warc_records assert wwt.outq.empty() + + # test blackout_period option. Write first revisit record because + # its outside the blackout_period (60). Do not write the second + # because its inside the blackout_period. + recorder = ProxyingRecorder(io.BytesIO(b'test1'), None) + recorder.read() + old = datetime.utcnow() - timedelta(0, 3600) + ru = RecordedUrl( + url='http://example.com/yes', + # content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE, + content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest) + ru.dedup_info = dict(id=b'1', url=b'http://example.com/dup', + date=old.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')) + wwt.inq.put(ru) + recorded_url = wwt.outq.get(timeout=10) + recorder = ProxyingRecorder(io.BytesIO(b'test2'), None) + recorder.read() + recent = datetime.utcnow() - timedelta(0, 5) + ru = RecordedUrl( + url='http://example.com/yes', content_type='text/plain', + status=200, client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow(), + payload_digest=recorder.block_digest) + ru.dedup_info = dict(id=b'2', url=b'http://example.com/dup', + date=recent.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')) + wwt.inq.put(ru) + assert recorded_url.warc_records + recorded_url = wwt.outq.get(timeout=10) + assert not recorded_url.warc_records + assert wwt.outq.empty() + finally: wwt.stop.set() wwt.join() diff --git a/warcprox/main.py b/warcprox/main.py index 5f45a13..2ba996c 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -158,6 +158,9 @@ def _build_arg_parser(prog='warcprox'): # Warcprox-Meta HTTP header. By default, we dedup all requests. arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', action='store_true', default=False, help=argparse.SUPPRESS) + arg_parser.add_argument('--blackout-period', dest='blackout_period', + type=int, default=0, + help='skip writing a revisit record if its too close to the original capture') arg_parser.add_argument('--queue-size', dest='queue_size', type=int, default=500, help=argparse.SUPPRESS) arg_parser.add_argument('--max-threads', dest='max_threads', type=int, diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index ef0bd2d..83f4485 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -31,6 +31,7 @@ import logging import time import warcprox from concurrent import futures +from datetime import datetime import threading class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): @@ -52,6 +53,7 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): max_workers=options.writer_threads or 1, max_queued=10 * (options.writer_threads or 1)) self.batch = set() + self.blackout_period = options.blackout_period or 0 def _startup(self): self.logger.info('%s warc writer threads', self.pool._max_workers) @@ -112,9 +114,32 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): if recorded_url.warcprox_meta and 'warc-prefix' in recorded_url.warcprox_meta else self.options.prefix) + res = (prefix != '-' and not recorded_url.do_not_archive + and self._filter_accepts(recorded_url) + and not self._in_blackout(recorded_url)) + # special warc name prefix '-' means "don't archive" return (prefix != '-' and not recorded_url.do_not_archive - and self._filter_accepts(recorded_url)) + and self._filter_accepts(recorded_url) + and not self._in_blackout(recorded_url)) + + def _in_blackout(self, recorded_url): + """If --blackout-period=N (sec) is set, check if duplicate record + datetime is close to the original. If yes, we don't write it to WARC. + The aim is to avoid having unnecessary `revisit` records. + Return Boolean + """ + if self.blackout_period and hasattr(recorded_url, "dedup_info") and \ + recorded_url.dedup_info: + dedup_date = recorded_url.dedup_info.get('date') + if dedup_date: + try: + dt = datetime.strptime(dedup_date.decode('utf-8'), + '%Y-%m-%dT%H:%M:%SZ') + return (datetime.utcnow() - dt).total_seconds() <= self.blackout_period + except ValueError: + return False + return False def _log(self, recorded_url, records): try: