New --blackout-period option to skip writing redundant revisits to WARC

Add option `--blackout-period` (default=0)

When set and if the record is a duplicate (revisit record), check the
datetime of `dedup_info` and its inside the `blackout_period`, skip
writing the record to WARC.

Add some unit tests.

This is an improved implementation based on @nlevitt comments here:
https://github.com/internetarchive/warcprox/pull/92
This commit is contained in:
Vangelis Banos 2018-07-21 11:20:49 +00:00
parent ec7a0bf569
commit 2c2c1d008a
3 changed files with 67 additions and 3 deletions

View File

@ -22,7 +22,7 @@ USA.
import os import os
import fcntl import fcntl
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from datetime import datetime from datetime import datetime, timedelta
import pytest import pytest
import re import re
from warcprox.mitmproxy import ProxyingRecorder from warcprox.mitmproxy import ProxyingRecorder
@ -129,7 +129,7 @@ def test_special_dont_write_prefix():
wwt.join() wwt.join()
wwt = warcprox.writerthread.WarcWriterProcessor( wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1)) Options(writer_threads=1, blackout_period=60, prefix='foo'))
wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1)
try: try:
@ -158,6 +158,42 @@ def test_special_dont_write_prefix():
recorded_url = wwt.outq.get(timeout=10) recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records assert not recorded_url.warc_records
assert wwt.outq.empty() assert wwt.outq.empty()
# test blackout_period option. Write first revisit record because
# its outside the blackout_period (60). Do not write the second
# because its inside the blackout_period.
recorder = ProxyingRecorder(io.BytesIO(b'test1'), None)
recorder.read()
old = datetime.utcnow() - timedelta(0, 3600)
ru = RecordedUrl(
url='http://example.com/yes',
# content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest)
ru.dedup_info = dict(id=b'1', url=b'http://example.com/dup',
date=old.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8'))
wwt.inq.put(ru)
recorded_url = wwt.outq.get(timeout=10)
recorder = ProxyingRecorder(io.BytesIO(b'test2'), None)
recorder.read()
recent = datetime.utcnow() - timedelta(0, 5)
ru = RecordedUrl(
url='http://example.com/yes', content_type='text/plain',
status=200, client_ip='127.0.0.2', request_data=b'abc',
response_recorder=recorder, remote_ip='127.0.0.3',
timestamp=datetime.utcnow(),
payload_digest=recorder.block_digest)
ru.dedup_info = dict(id=b'2', url=b'http://example.com/dup',
date=recent.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8'))
wwt.inq.put(ru)
assert recorded_url.warc_records
recorded_url = wwt.outq.get(timeout=10)
assert not recorded_url.warc_records
assert wwt.outq.empty()
finally: finally:
wwt.stop.set() wwt.stop.set()
wwt.join() wwt.join()

View File

@ -158,6 +158,9 @@ def _build_arg_parser(prog='warcprox'):
# Warcprox-Meta HTTP header. By default, we dedup all requests. # Warcprox-Meta HTTP header. By default, we dedup all requests.
arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket', arg_parser.add_argument('--dedup-only-with-bucket', dest='dedup_only_with_bucket',
action='store_true', default=False, help=argparse.SUPPRESS) action='store_true', default=False, help=argparse.SUPPRESS)
arg_parser.add_argument('--blackout-period', dest='blackout_period',
type=int, default=0,
help='skip writing a revisit record if its too close to the original capture')
arg_parser.add_argument('--queue-size', dest='queue_size', type=int, arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS) default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int, arg_parser.add_argument('--max-threads', dest='max_threads', type=int,

View File

@ -31,6 +31,7 @@ import logging
import time import time
import warcprox import warcprox
from concurrent import futures from concurrent import futures
from datetime import datetime
import threading import threading
class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
@ -52,6 +53,7 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
max_workers=options.writer_threads or 1, max_workers=options.writer_threads or 1,
max_queued=10 * (options.writer_threads or 1)) max_queued=10 * (options.writer_threads or 1))
self.batch = set() self.batch = set()
self.blackout_period = options.blackout_period or 0
def _startup(self): def _startup(self):
self.logger.info('%s warc writer threads', self.pool._max_workers) self.logger.info('%s warc writer threads', self.pool._max_workers)
@ -112,9 +114,32 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
if recorded_url.warcprox_meta if recorded_url.warcprox_meta
and 'warc-prefix' in recorded_url.warcprox_meta and 'warc-prefix' in recorded_url.warcprox_meta
else self.options.prefix) else self.options.prefix)
res = (prefix != '-' and not recorded_url.do_not_archive
and self._filter_accepts(recorded_url)
and not self._in_blackout(recorded_url))
# special warc name prefix '-' means "don't archive" # special warc name prefix '-' means "don't archive"
return (prefix != '-' and not recorded_url.do_not_archive return (prefix != '-' and not recorded_url.do_not_archive
and self._filter_accepts(recorded_url)) and self._filter_accepts(recorded_url)
and not self._in_blackout(recorded_url))
def _in_blackout(self, recorded_url):
"""If --blackout-period=N (sec) is set, check if duplicate record
datetime is close to the original. If yes, we don't write it to WARC.
The aim is to avoid having unnecessary `revisit` records.
Return Boolean
"""
if self.blackout_period and hasattr(recorded_url, "dedup_info") and \
recorded_url.dedup_info:
dedup_date = recorded_url.dedup_info.get('date')
if dedup_date:
try:
dt = datetime.strptime(dedup_date.decode('utf-8'),
'%Y-%m-%dT%H:%M:%SZ')
return (datetime.utcnow() - dt).total_seconds() <= self.blackout_period
except ValueError:
return False
return False
def _log(self, recorded_url, records): def _log(self, recorded_url, records):
try: try: