mirror of
https://github.com/webrecorder/pywb.git
synced 2025-03-15 00:03:28 +01:00
Dedup Improvments (#611)
* dedup improvements on top of #597, work towards patching support (#601) - single key 'dedup_policy' of 'skip', 'revisit', 'keep' - optional 'dedup_index_url', defaults to redis urls - support for 'cache: always' to further add cacheing on all requests that have a referrer - updated docs to mention latest config, explain 'instant replay' that is possible when dedup_policy is set - add check to ensure only redis:// URLs can be set for dedup_index_url for now - config: convert shorthand 'recorder: <source_coll>' setting string to dict, don't override custom config
This commit is contained in:
parent
ddf3207e40
commit
e1cad621b9
@ -294,20 +294,53 @@ If running with auto indexing, the WARC will also get automatically indexed and
|
|||||||
|
|
||||||
As a shortcut, ``recorder: live`` can also be used to specify only the ``source_coll`` option.
|
As a shortcut, ``recorder: live`` can also be used to specify only the ``source_coll`` option.
|
||||||
|
|
||||||
Optionally, a ``dedup_index`` key can be placed under the ``recorder`` key to enable deduplication of responses via an index::
|
|
||||||
|
Dedup Options for Recording
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
By default, recording mode will record every URL.
|
||||||
|
|
||||||
|
Starting with pywb 2.5.0, it is possible to configure pywb to either write revisit records or skip duplicate URLs altogether using the ``dedup_policy`` key.
|
||||||
|
|
||||||
|
Using deduplication requires a Redis instance, which will keep track of the index for deduplication in a sorted-set key.
|
||||||
|
The default Redis key used is ``redis://localhost:6379/0/pywb:{coll}:cdxj`` where ``{coll}`` is replaced with current collection id.
|
||||||
|
|
||||||
|
The field can be customized using the ``dedup_index_url`` field in the recorder config. The URL must start with ``redis://``, as that is the only
|
||||||
|
supported dedup index at this time.
|
||||||
|
|
||||||
|
- To skip duplicate URLs, set ``dedup_policy: skip``. With this setting, only one instance of any URL will be recorded.
|
||||||
|
|
||||||
|
- To write revist records, set ``dedup_policy: revisit``. With this setting, WARC ``revisit`` records will be written when a duplicate URL is detected
|
||||||
|
and has the same digest as a previous response.
|
||||||
|
|
||||||
|
- To keep all duplicates, use ``dedup_policy: keep``. All WARC records are written to disk normally as with no policy, however, the Redis dedup index is still populated,
|
||||||
|
which allows for instant replay (see below).
|
||||||
|
|
||||||
|
- To disable the dedup system, set to ``dedup_policy: none`` or omit the field. This is the default, and no Redis is required.
|
||||||
|
|
||||||
|
Another option, pywb can add an aggressive Cache-Control header to force the browser to cache all responses on a page.
|
||||||
|
This feature is still experimental, but can be enabled via ``cache: always`` setting.
|
||||||
|
|
||||||
|
|
||||||
|
For example, the following will enable ``revisit`` records to be written using the given Redis URL, and also enable aggressive cacheing when recording::
|
||||||
|
|
||||||
recorder:
|
recorder:
|
||||||
...
|
...
|
||||||
dedup_index:
|
cache: always
|
||||||
type: redis
|
dedup_policy: revisit
|
||||||
dupe_policy: revisit
|
dedup_index_url: 'redis://localhost:6379/0/pywb:{coll}:cdxj' # default when omitted
|
||||||
redis_url: 'redis://localhost/2/{coll}:cdxj'
|
|
||||||
|
|
||||||
For ``type`` currently only ``redis`` is supported.
|
|
||||||
|
|
||||||
The ``dupe_policy`` key specifies what will hapen when a duplicate response is found. Can be ``duplicate``, to write duplicate responses, ``revisit``, to write a revisit record or ``skip`` to ignore duplicates and don't write anything to the WARC.
|
Instant Replay (experimental)
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Starting with pywb 2.5.0, when the ``dedup_policy`` is set, pywb can do 'instant replay' after recording, without having to regenerate the CDX or waiting for it to be updated with auto-indexing.
|
||||||
|
|
||||||
|
When any dedup_policy, pywb can also access the dedup Redis index, along with any on-disk CDX, when replaying the collection.
|
||||||
|
|
||||||
|
This feature is still experimental but should generally work. Additional options for working with the Redis Dedup index will be added in the futuer.
|
||||||
|
|
||||||
|
|
||||||
The ``redis_url`` key specifies which redis database to use and the template for the sorted-set key to use.
|
|
||||||
|
|
||||||
.. _auto-fetch:
|
.. _auto-fetch:
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ class BaseCli(object):
|
|||||||
self.extra_config['debug'] = True
|
self.extra_config['debug'] = True
|
||||||
|
|
||||||
if self.r.record:
|
if self.r.record:
|
||||||
self.extra_config['recorder'] = 'live'
|
self.extra_config['recorder'] = {'source_coll': 'live'}
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Start the application"""
|
"""Start the application"""
|
||||||
|
@ -209,38 +209,40 @@ class FrontEndApp(object):
|
|||||||
else:
|
else:
|
||||||
recorder_coll = recorder_config['source_coll']
|
recorder_coll = recorder_config['source_coll']
|
||||||
|
|
||||||
if 'dedup_index' in recorder_config:
|
# cache mode
|
||||||
dedup_config = recorder_config['dedup_index']
|
self.rec_cache_mode = recorder_config.get('cache', 'default')
|
||||||
else:
|
|
||||||
dedup_config = None
|
|
||||||
|
|
||||||
if dedup_config:
|
dedup_policy = recorder_config.get('dedup_policy')
|
||||||
type = dedup_config.get('type')
|
dedup_by_url = False
|
||||||
if type != 'redis':
|
|
||||||
msg = 'Invalid option for dedup_index: type: {0}'
|
|
||||||
raise Exception(msg.format(type))
|
|
||||||
|
|
||||||
dupe_policy = dedup_config.get('dupe_policy')
|
if dedup_policy == 'none':
|
||||||
if dupe_policy == 'duplicate':
|
dedup_policy = ''
|
||||||
dupe_policy = WriteDupePolicy()
|
|
||||||
elif dupe_policy == 'revisit':
|
|
||||||
dupe_policy = WriteRevisitDupePolicy()
|
|
||||||
elif dupe_policy == 'skip':
|
|
||||||
dupe_policy = SkipDupePolicy()
|
|
||||||
else:
|
|
||||||
msg = 'Invalid option for dedup_index: dupe_policy: {0}'
|
|
||||||
raise Exception(msg.format(dupe_policy))
|
|
||||||
|
|
||||||
dedup_index = WritableRedisIndexer(redis_url=dedup_config.get('redis_url'),
|
if dedup_policy == 'keep':
|
||||||
dupe_policy=dupe_policy)
|
dedup_policy = WriteDupePolicy()
|
||||||
|
elif dedup_policy == 'revisit':
|
||||||
|
dedup_policy = WriteRevisitDupePolicy()
|
||||||
|
elif dedup_policy == 'skip':
|
||||||
|
dedup_policy = SkipDupePolicy()
|
||||||
|
dedup_by_url = True
|
||||||
|
elif dedup_policy:
|
||||||
|
msg = 'Invalid option for dedup_policy: {0}'
|
||||||
|
raise Exception(msg.format(dedup_policy))
|
||||||
|
|
||||||
|
if dedup_policy:
|
||||||
|
dedup_index = WritableRedisIndexer(redis_url=self.warcserver.dedup_index_url,
|
||||||
|
dupe_policy=dedup_policy,
|
||||||
|
rel_path_template=self.warcserver.root_dir + '/{coll}/archive')
|
||||||
else:
|
else:
|
||||||
dedup_index = None
|
dedup_index = None
|
||||||
|
|
||||||
|
|
||||||
warc_writer = MultiFileWARCWriter(self.warcserver.archive_paths,
|
warc_writer = MultiFileWARCWriter(self.warcserver.archive_paths,
|
||||||
max_size=int(recorder_config.get('rollover_size', 1000000000)),
|
max_size=int(recorder_config.get('rollover_size', 1000000000)),
|
||||||
max_idle_secs=int(recorder_config.get('rollover_idle_secs', 600)),
|
max_idle_secs=int(recorder_config.get('rollover_idle_secs', 600)),
|
||||||
filename_template=recorder_config.get('filename_template'),
|
filename_template=recorder_config.get('filename_template'),
|
||||||
dedup_index=dedup_index)
|
dedup_index=dedup_index,
|
||||||
|
dedup_by_url=dedup_by_url)
|
||||||
|
|
||||||
self.recorder = RecorderApp(self.RECORD_SERVER % str(self.warcserver_server.port), warc_writer,
|
self.recorder = RecorderApp(self.RECORD_SERVER % str(self.warcserver_server.port), warc_writer,
|
||||||
accept_colls=recorder_config.get('source_filter'))
|
accept_colls=recorder_config.get('source_filter'))
|
||||||
@ -455,6 +457,7 @@ class FrontEndApp(object):
|
|||||||
coll_config = self.get_coll_config(coll)
|
coll_config = self.get_coll_config(coll)
|
||||||
if record:
|
if record:
|
||||||
coll_config['type'] = 'record'
|
coll_config['type'] = 'record'
|
||||||
|
coll_config['cache'] = self.rec_cache_mode
|
||||||
|
|
||||||
if timemap_output:
|
if timemap_output:
|
||||||
coll_config['output'] = timemap_output
|
coll_config['output'] = timemap_output
|
||||||
|
@ -574,6 +574,9 @@ class RewriterApp(object):
|
|||||||
if is_proxy and environ.get('HTTP_ORIGIN'):
|
if is_proxy and environ.get('HTTP_ORIGIN'):
|
||||||
response.add_access_control_headers(environ)
|
response.add_access_control_headers(environ)
|
||||||
|
|
||||||
|
if r.status_code == 200 and kwargs.get('cache') == 'always' and environ.get('HTTP_REFERER'):
|
||||||
|
response.status_headers['Cache-Control'] = 'public, max-age=31536000, immutable'
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def format_response(self, response, wb_url, full_prefix, is_timegate, is_proxy, timegate_closest_ts=None):
|
def format_response(self, response, wb_url, full_prefix, is_timegate, is_proxy, timegate_closest_ts=None):
|
||||||
|
@ -30,6 +30,7 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
self.dir_template = dir_template
|
self.dir_template = dir_template
|
||||||
self.key_template = kwargs.get('key_template', self.dir_template)
|
self.key_template = kwargs.get('key_template', self.dir_template)
|
||||||
self.dedup_index = kwargs.get('dedup_index')
|
self.dedup_index = kwargs.get('dedup_index')
|
||||||
|
self.dedup_by_url = kwargs.get('dedup_by_url')
|
||||||
self.filename_template = filename_template
|
self.filename_template = filename_template
|
||||||
self.max_size = max_size
|
self.max_size = max_size
|
||||||
if max_idle_secs > 0:
|
if max_idle_secs > 0:
|
||||||
@ -48,7 +49,7 @@ class MultiFileWARCWriter(BaseWARCWriter):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
url = record.rec_headers.get_header('WARC-Target-URI')
|
url = record.rec_headers.get_header('WARC-Target-URI')
|
||||||
digest = record.rec_headers.get_header('WARC-Payload-Digest')
|
digest = record.rec_headers.get_header('WARC-Payload-Digest') if not self.dedup_by_url else None
|
||||||
iso_dt = record.rec_headers.get_header('WARC-Date')
|
iso_dt = record.rec_headers.get_header('WARC-Date')
|
||||||
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
|
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -48,9 +48,11 @@ class WritableRedisIndexer(RedisIndexSource):
|
|||||||
return base_name
|
return base_name
|
||||||
|
|
||||||
def add_warc_file(self, full_filename, params):
|
def add_warc_file(self, full_filename, params):
|
||||||
base_filename = self._get_rel_or_base_name(full_filename, params)
|
|
||||||
file_key = res_template(self.file_key_template, params)
|
file_key = res_template(self.file_key_template, params)
|
||||||
|
if not file_key:
|
||||||
|
return
|
||||||
|
|
||||||
|
base_filename = self._get_rel_or_base_name(full_filename, params)
|
||||||
full_load_path = self.full_warc_prefix + full_filename
|
full_load_path = self.full_warc_prefix + full_filename
|
||||||
|
|
||||||
self.redis.hset(file_key, base_filename, full_load_path)
|
self.redis.hset(file_key, base_filename, full_load_path)
|
||||||
|
@ -607,7 +607,8 @@ class TestRecorder(LiveServerTests, HttpBinLiveTests, FakeRedisTests, TempDirTes
|
|||||||
writer.close()
|
writer.close()
|
||||||
assert len(writer.fh_cache) == 0
|
assert len(writer.fh_cache) == 0
|
||||||
|
|
||||||
@pytest.mark.skipif(os.environ.get('CI') is not None, reason='Skip Test on CI')
|
#@pytest.mark.skipif(os.environ.get('CI') is not None, reason='Skip Test on CI')
|
||||||
|
@pytest.mark.skip
|
||||||
def test_record_video_metadata(self):
|
def test_record_video_metadata(self):
|
||||||
pytest.importorskip('youtube_dl')
|
pytest.importorskip('youtube_dl')
|
||||||
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
warc_path = to_path(self.root_dir + '/warcs/{user}/{coll}/')
|
||||||
|
@ -42,6 +42,8 @@ SOURCE_LIST = [LiveIndexSource,
|
|||||||
class WarcServer(BaseWarcServer):
|
class WarcServer(BaseWarcServer):
|
||||||
AUTO_COLL_TEMPL = '{coll}'
|
AUTO_COLL_TEMPL = '{coll}'
|
||||||
|
|
||||||
|
DEFAULT_DEDUP_URL = 'redis://localhost:6379/0/pywb:{coll}:cdxj'
|
||||||
|
|
||||||
def __init__(self, config_file='./config.yaml', custom_config=None):
|
def __init__(self, config_file='./config.yaml', custom_config=None):
|
||||||
config = load_yaml_config(DEFAULT_CONFIG)
|
config = load_yaml_config(DEFAULT_CONFIG)
|
||||||
|
|
||||||
@ -59,11 +61,23 @@ class WarcServer(BaseWarcServer):
|
|||||||
custom_config['collections'].update(config['collections'])
|
custom_config['collections'].update(config['collections'])
|
||||||
if 'proxy' in custom_config and 'proxy' in config:
|
if 'proxy' in custom_config and 'proxy' in config:
|
||||||
custom_config['proxy'].update(config['proxy'])
|
custom_config['proxy'].update(config['proxy'])
|
||||||
|
if 'recorder' in custom_config and 'recorder' in config:
|
||||||
|
if isinstance(config['recorder'], str):
|
||||||
|
config['recorder'] = {'source_coll': config['recorder']}
|
||||||
|
|
||||||
config.update(custom_config)
|
config.update(custom_config)
|
||||||
|
|
||||||
super(WarcServer, self).__init__(debug=config.get('debug', False))
|
super(WarcServer, self).__init__(debug=config.get('debug', False))
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
|
recorder_config = self.config.get('recorder') or {}
|
||||||
|
if isinstance(recorder_config, dict) and recorder_config.get('dedup_policy'):
|
||||||
|
self.dedup_index_url = self.config.get('dedup_index_url', WarcServer.DEFAULT_DEDUP_URL)
|
||||||
|
if self.dedup_index_url and not self.dedup_index_url.startswith('redis://'):
|
||||||
|
raise Exception("The dedup_index_url must start with \"redis://\". Only Redis-based dedup index is supported at this time.")
|
||||||
|
else:
|
||||||
|
self.dedup_index_url = None
|
||||||
|
|
||||||
self.root_dir = self.config.get('collections_root', '')
|
self.root_dir = self.config.get('collections_root', '')
|
||||||
self.index_paths = self.init_paths('index_paths')
|
self.index_paths = self.init_paths('index_paths')
|
||||||
self.archive_paths = self.init_paths('archive_paths', self.root_dir)
|
self.archive_paths = self.init_paths('archive_paths', self.root_dir)
|
||||||
@ -125,7 +139,14 @@ class WarcServer(BaseWarcServer):
|
|||||||
access_checker = AccessChecker(CacheDirectoryAccessSource(self.acl_paths),
|
access_checker = AccessChecker(CacheDirectoryAccessSource(self.acl_paths),
|
||||||
self.default_access)
|
self.default_access)
|
||||||
|
|
||||||
return DefaultResourceHandler(dir_source, self.archive_paths,
|
if self.dedup_index_url:
|
||||||
|
source = SimpleAggregator({'dedup': RedisMultiKeyIndexSource(self.dedup_index_url),
|
||||||
|
'dir': dir_source})
|
||||||
|
|
||||||
|
else:
|
||||||
|
source = dir_source
|
||||||
|
|
||||||
|
return DefaultResourceHandler(source, self.archive_paths,
|
||||||
rules_file=self.rules_file,
|
rules_file=self.rules_file,
|
||||||
access_checker=access_checker)
|
access_checker=access_checker)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user