From 733642551da989fdfc227e16d7ab75871060efb7 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sun, 22 Mar 2015 17:55:38 -0700 Subject: [PATCH] manager: support autoindexing! (#91) wb-manager autoindex will use watchdog library to detect creation/updates to any warc/arc in specified collection or across all and update autoindex cdx cdx indexing: add --dir-root option to specify custom relative root dir for filenames used in cdx --- pywb/manager/autoindex.py | 44 ++++++++ pywb/manager/manager.py | 173 +++++++++++++++++++++----------- pywb/warc/cdxindexer.py | 71 +++++++++---- pywb/warc/test/test_indexing.py | 12 +++ tests/test_auto_colls.py | 55 ++++++++++ 5 files changed, 274 insertions(+), 81 deletions(-) create mode 100644 pywb/manager/autoindex.py diff --git a/pywb/manager/autoindex.py b/pywb/manager/autoindex.py new file mode 100644 index 00000000..0c182e5d --- /dev/null +++ b/pywb/manager/autoindex.py @@ -0,0 +1,44 @@ +import sys +import time +from watchdog.observers import Observer +from watchdog.events import RegexMatchingEventHandler + + +#============================================================================= +EXT_REGEX = '.*\.w?arc(\.gz)?$' + +keep_running = True + +#============================================================================= +class CDXAutoIndexer(RegexMatchingEventHandler): + def __init__(self, updater, path): + super(CDXAutoIndexer, self).__init__(regexes=[EXT_REGEX], + ignore_directories=True) + self.updater = updater + self.cdx_path = path + + def on_created(self, event): + self.updater(event.src_path) + + def on_modified(self, event): + self.updater(event.src_path) + + def do_watch(self, sleep_time=1): + observer = Observer() + observer.schedule(self, self.cdx_path, recursive=True) + observer.start() + + try: + while keep_running: + time.sleep(sleep_time) + except KeyboardInterrupt: # pragma: no cover + observer.stop() + observer.join() + + +#============================================================================= +if __name__ == "__main__": + w = Watcher(sys.argv[1] if len(sys.argv) > 1 else '.') + def p(x): + print(x) + w.run(p) diff --git a/pywb/manager/manager.py b/pywb/manager/manager.py index fb2f0240..b40bb6e1 100644 --- a/pywb/manager/manager.py +++ b/pywb/manager/manager.py @@ -2,19 +2,18 @@ import os import shutil import sys import logging - -from pywb.utils.loaders import load_yaml_config -from pywb.utils.timeutils import timestamp20_now -from pywb.warc.cdxindexer import main as cdxindexer_main -from pywb.webapp.pywb_init import DEFAULT_CONFIG -from migrate import MigrateCDX +import heapq +import yaml from distutils.util import strtobool from pkg_resources import resource_string from argparse import ArgumentParser, RawTextHelpFormatter -import heapq -import yaml + +from pywb.utils.loaders import load_yaml_config +from pywb.utils.timeutils import timestamp20_now + +from pywb import DEFAULT_CONFIG #============================================================================= @@ -32,25 +31,32 @@ It may be used via cmdline to setup and maintain the directory structure expected by pywb """ DEF_INDEX_FILE = 'index.cdxj' + AUTO_INDEX_FILE = 'autoindex.cdxj' - def __init__(self, coll_name, root_dir='collections', must_exist=True): - self.root_dir = root_dir - self.default_config = load_yaml_config('pywb/default_config.yaml') - self.coll_name = coll_name + def __init__(self, coll_name, colls_dir='collections', must_exist=True): + self.default_config = load_yaml_config(DEFAULT_CONFIG) - self.coll_dir = os.path.join(self.root_dir, coll_name) + self.colls_dir = colls_dir + + self._set_coll_dirs(coll_name) - self.warc_dir = self._get_dir('archive_paths') - self.cdx_dir = self._get_dir('index_paths') - self.static_dir = self._get_dir('static_path') - self.templates_dir = self._get_dir('templates_dir') if must_exist: self._assert_coll_exists() + def _set_coll_dirs(self, coll_name): + self.coll_name = coll_name + self.curr_coll_dir = os.path.join(self.colls_dir, coll_name) + + self.archive_dir = self._get_dir('archive_paths') + + self.indexes_dir = self._get_dir('index_paths') + self.static_dir = self._get_dir('static_path') + self.templates_dir = self._get_dir('templates_dir') + def list_colls(self): print('Collections:') - for d in os.listdir(self.root_dir): - if os.path.isdir(os.path.join(self.root_dir, d)): + for d in os.listdir(self.colls_dir): + if os.path.isdir(os.path.join(self.colls_dir, d)): print('- ' + d) def _get_root_dir(self, name): @@ -58,7 +64,7 @@ directory structure expected by pywb self.default_config['paths'][name]) def _get_dir(self, name): - return os.path.join(self.coll_dir, + return os.path.join(self.curr_coll_dir, self.default_config['paths'][name]) def _create_dir(self, dirname): @@ -68,11 +74,11 @@ directory structure expected by pywb logging.info('Created Dir: ' + dirname) def add_collection(self): - os.makedirs(self.coll_dir) - logging.info('Created directory: ' + self.coll_dir) + os.makedirs(self.curr_coll_dir) + logging.info('Created directory: ' + self.curr_coll_dir) - self._create_dir(self.warc_dir) - self._create_dir(self.cdx_dir) + self._create_dir(self.archive_dir) + self._create_dir(self.indexes_dir) self._create_dir(self.static_dir) self._create_dir(self.templates_dir) @@ -80,65 +86,71 @@ directory structure expected by pywb self._create_dir(self._get_root_dir('templates_dir')) def _assert_coll_exists(self): - if not os.path.isdir(self.coll_dir): + if not os.path.isdir(self.curr_coll_dir): raise IOError('Collection {0} does not exist'. format(self.coll_name)) def add_warcs(self, warcs): - if not os.path.isdir(self.warc_dir): + if not os.path.isdir(self.archive_dir): raise IOError('Directory {0} does not exist'. - format(self.warc_dir)) + format(self.archive_dir)) full_paths = [] for filename in warcs: - shutil.copy2(filename, self.warc_dir) - full_paths.append(os.path.join(self.warc_dir, filename)) - logging.info('Copied ' + filename + ' to ' + self.warc_dir) + shutil.copy2(filename, self.archive_dir) + full_paths.append(os.path.join(self.archive_dir, filename)) + logging.info('Copied ' + filename + ' to ' + self.archive_dir) - self._index_merge_warcs(full_paths) + self._index_merge_warcs(full_paths, self.DEF_INDEX_FILE) def reindex(self): - cdx_file = os.path.join(self.cdx_dir, self.DEF_INDEX_FILE) - logging.info('Indexing ' + self.warc_dir + ' to ' + cdx_file) - self._cdx_index(cdx_file, [self.warc_dir]) + cdx_file = os.path.join(self.indexes_dir, self.DEF_INDEX_FILE) + logging.info('Indexing ' + self.archive_dir + ' to ' + cdx_file) + self._cdx_index(cdx_file, [self.archive_dir]) - def _cdx_index(self, out, input_): - def_args = ['-p', '-j', '-s', '-r'] - def_args.append(out) - def_args.extend(input_) - cdxindexer_main(def_args) + def _cdx_index(self, out, input_, rel_root=None): + from pywb.warc.cdxindexer import write_multi_cdx_index - def index_merge(self, filelist): + options = dict(append_post=True, + cdxj=True, + sort=True, + recurse=True, + rel_root=rel_root) + + write_multi_cdx_index(out, input_, **options) + + def index_merge(self, filelist, index_file): wrongdir = 'Skipping {0}, must be in {1} archive directory' notfound = 'Skipping {0}, file not found' filtered_warcs = [] - # Check that warcs are actually in warcs dir - abs_warc_dir = os.path.abspath(self.warc_dir) + # Check that warcs are actually in archive dir + abs_archive_dir = os.path.abspath(self.archive_dir) for f in filelist: abs_filepath = os.path.abspath(f) - prefix = os.path.commonprefix([abs_warc_dir, abs_filepath]) + prefix = os.path.commonprefix([abs_archive_dir, abs_filepath]) - if prefix != abs_warc_dir: - raise IOError(wrongdir.format(abs_filepath, abs_warc_dir)) + if prefix != abs_archive_dir: + raise IOError(wrongdir.format(abs_filepath, abs_archive_dir)) elif not os.path.isfile(abs_filepath): raise IOError(notfound.format(f)) else: - filtered_warcs.append(abs_filepath.split(prefix)[1]) + filtered_warcs.append(abs_filepath) - self._index_merge_warcs(filtered_warcs) + self._index_merge_warcs(filtered_warcs, index_file, abs_archive_dir) - def _index_merge_warcs(self, new_warcs): - cdx_file = os.path.join(self.cdx_dir, self.DEF_INDEX_FILE) - - # no existing file, just reindex all - if not os.path.isfile(cdx_file): - return self.reindex() + def _index_merge_warcs(self, new_warcs, index_file, rel_root=None): + cdx_file = os.path.join(self.indexes_dir, index_file) temp_file = cdx_file + '.tmp.' + timestamp20_now() - self._cdx_index(temp_file, new_warcs) + self._cdx_index(temp_file, new_warcs, rel_root) + + # no existing file, so just make it the new file + if not os.path.isfile(cdx_file): + shutil.move(temp_file, cdx_file) + return merged_file = temp_file + '.merged' @@ -157,7 +169,7 @@ directory structure expected by pywb os.remove(temp_file) def set_metadata(self, namevalue_pairs): - metadata_yaml = os.path.join(self.coll_dir, 'metadata.yaml') + metadata_yaml = os.path.join(self.curr_coll_dir, 'metadata.yaml') metadata = None if os.path.isfile(metadata_yaml): with open(metadata_yaml) as fh: @@ -280,6 +292,8 @@ directory structure expected by pywb print('Removed template file "{0}"'.format(full_path)) def migrate_cdxj(self, path, force=False): + from migrate import MigrateCDX + migrate = MigrateCDX(path) count = migrate.count_cdx() if count == 0: @@ -299,6 +313,30 @@ directory structure expected by pywb migrate.convert_to_cdxj() + def autoindex(self): + from autoindex import CDXAutoIndexer + + if self.coll_name: + any_coll = False + path = self.archive_dir + else: + path = self.colls_dir + any_coll = True + + def do_index(warc): + if any_coll: + coll_name = warc.split(self.colls_dir + os.path.sep)[-1].split('/')[0] + if coll_name != self.coll_name: + self._set_coll_dirs(coll_name) + + print('Auto-Indexing: ' + warc) + self.index_merge([warc], self.AUTO_INDEX_FILE) + print('Done.. Waiting for file updates') + + + indexer = CDXAutoIndexer(do_index, path) + indexer.do_watch() + #============================================================================= def main(args=None): @@ -360,7 +398,7 @@ Create manage file based web archive collections # Index warcs def do_index(r): m = CollectionsManager(r.coll_name) - m.index_merge(r.files) + m.index_merge(r.files, m.DEF_INDEX_FILE) indexwarcs_help = 'Index specified ARC/WARC files in the collection' indexwarcs = subparsers.add_parser('index', help=indexwarcs_help) @@ -390,7 +428,7 @@ Create manage file based web archive collections m.list_templates() template_help = 'Add default html template for customization' - template = subparsers.add_parser('template') + template = subparsers.add_parser('template', help=template_help) template.add_argument('coll_name', nargs='?', default='') template.add_argument('-f', '--force', action='store_true') template.add_argument('--add') @@ -398,14 +436,26 @@ Create manage file based web archive collections template.add_argument('--list', action='store_true') template.set_defaults(func=do_add_template) + # Migrate CDX def do_migrate(r): m = CollectionsManager('', must_exist=False) m.migrate_cdxj(r.path, r.force) - template = subparsers.add_parser('migrate') - template.add_argument('path', default='./', nargs='?') - template.add_argument('-f', '--force', action='store_true') - template.set_defaults(func=do_migrate) + migrate_help = 'Convert any existing archive indexes to new json format' + migrate = subparsers.add_parser('migrate', help=migrate_help) + migrate.add_argument('path', default='./', nargs='?') + migrate.add_argument('-f', '--force', action='store_true') + migrate.set_defaults(func=do_migrate) + + # Auto Index + def do_autoindex(r): + m = CollectionsManager(r.coll_name, must_exist=False) + m.autoindex() + + autoindex_help = 'Automatically index any change archive files' + autoindex = subparsers.add_parser('autoindex', help=autoindex_help) + autoindex.add_argument('coll_name', nargs='?', default='') + autoindex.set_defaults(func=do_autoindex) r = parser.parse_args(args=args) r.func(r) @@ -416,6 +466,7 @@ def main_wrap_exc(): #pragma: no cover try: main() except Exception as e: + raise print('Error: ' + str(e)) sys.exit(2) diff --git a/pywb/warc/cdxindexer.py b/pywb/warc/cdxindexer.py index cf889e74..4c8aa377 100644 --- a/pywb/warc/cdxindexer.py +++ b/pywb/warc/cdxindexer.py @@ -144,22 +144,32 @@ ALLOWED_EXT = ('.arc', '.arc.gz', '.warc', '.warc.gz') #================================================================= -def iter_file_or_dir(inputs, recursive=True): +def iter_file_or_dir(inputs, recursive=True, rel_root=None): for input_ in inputs: if not os.path.isdir(input_): - yield input_, os.path.basename(input_) + if not rel_root: + filename = os.path.basename(input_) + else: + filename = os.path.relpath(input_, rel_root) + + yield input_, filename elif not recursive: for filename in os.listdir(input_): if filename.endswith(ALLOWED_EXT): - yield os.path.join(input_, filename), filename + full_path = os.path.join(input_, filename) + if rel_root: + filename = os.path.relpath(full_path, rel_root) + yield full_path, filename else: for root, dirs, files in os.walk(input_): for filename in files: if filename.endswith(ALLOWED_EXT): full_path = os.path.join(root, filename) - rel_path = os.path.relpath(full_path, input_) + if not rel_root: + rel_root = input_ + rel_path = os.path.relpath(full_path, rel_root) rel_path = rel_path.replace(os.path.sep, '/') yield full_path, rel_path @@ -181,10 +191,10 @@ def cdx_filename(filename): #================================================================= def get_cdx_writer_cls(options): - writer_cls = options.get('writer_cls') if options.get('minimal'): options['cdxj'] = True + writer_cls = options.get('writer_cls') if writer_cls: if not options.get('writer_add_mixin'): return writer_cls @@ -209,10 +219,13 @@ def get_cdx_writer_cls(options): #================================================================= def write_multi_cdx_index(output, inputs, **options): recurse = options.get('recurse', False) + rel_root = options.get('rel_root') # write one cdx per dir if output != '-' and os.path.isdir(output): - for fullpath, filename in iter_file_or_dir(inputs, recurse): + for fullpath, filename in iter_file_or_dir(inputs, + recurse, + rel_root): outpath = cdx_filename(filename) outpath = os.path.join(output, outpath) @@ -234,7 +247,9 @@ def write_multi_cdx_index(output, inputs, **options): record_iter = DefaultRecordIter(**options) with writer_cls(outfile) as writer: - for fullpath, filename in iter_file_or_dir(inputs, recurse): + for fullpath, filename in iter_file_or_dir(inputs, + recurse, + rel_root): with open(fullpath, 'rb') as infile: entry_iter = record_iter(infile) @@ -282,7 +297,7 @@ Some examples: """.format(os.path.basename(sys.argv[0])) sort_help = """ -sort the output to each file before writing to create a total ordering +Sort the output to each file before writing to create a total ordering """ unsurt_help = """ @@ -296,8 +311,8 @@ Use older 9-field cdx format, default is 11-cdx field """ minimal_json_help = """ CDX JSON output, but with minimal fields only, available w/o parsing -http record. The fields are: -canonicalized url, timestamp, original url, digest, archive offset, archive length +http record. The fields are: canonicalized url, timestamp, +original url, digest, archive offset, archive length and archive filename. mimetype is included to indicate warc/revisit only. This option skips record parsing and will not work with @@ -305,30 +320,42 @@ POST append (-p) option """ json_help = """ -Output CDX JSON format per line, with url timestamp first, followed by json dict -for all other fields: +Output CDX JSON format per line, with url timestamp first, +followed by a json dict for all other fields: url timestamp { ... } """ - output_help = """output file or directory. + output_help = """ +Output file or directory. - If directory, each input file is written to a seperate output file with a .cdx extension - If output is '-', output is written to stdout """ - input_help = """input file or directory + input_help = """ +Input file or directory. - If directory, all archive files from that directory are read """ - allrecords_help = """include all records. + allrecords_help = """ +Include All records. currently includes the 'request' records in addition to all -response records""" +response records +""" - post_append_help = """for POST requests, append -form query to url key. (Only applies to form url encoded posts)""" + post_append_help = """ +For POST requests, append form query to url key. +(Only applies to form url encoded posts) +""" - recurse_dirs_help = """recurse through all subdirectories -if input is a directory""" + recurse_dirs_help = """ +Recurse through all subdirectories if the input is a directory +""" + + dir_root_help = """ +Make CDX filenames relative to specified root directory, +instead of current working directory +""" parser = ArgumentParser(description=description, epilog=epilog, @@ -350,6 +377,9 @@ if input is a directory""" action='store_true', help=recurse_dirs_help) + parser.add_argument('-d', '--dir-root', + help=dir_root_help) + parser.add_argument('-u', '--unsurt', action='store_true', help=unsurt_help) @@ -378,6 +408,7 @@ if input is a directory""" include_all=cmd.allrecords, append_post=cmd.postappend, recurse=cmd.recurse, + rel_root=cmd.dir_root, cdx09=cmd.cdx09, cdxj=cmd.cdxj, minimal=cmd.minimal_cdxj) diff --git a/pywb/warc/test/test_indexing.py b/pywb/warc/test/test_indexing.py index 25498760..de64e513 100644 --- a/pywb/warc/test/test_indexing.py +++ b/pywb/warc/test/test_indexing.py @@ -162,6 +162,18 @@ com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 20 org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 577 2907 example.warc.gz Total: 4 +# test custom root dir for cdx filenames, singlw warc +>>> cli_lines(['--dir-root', get_test_dir() + 'other/', TEST_WARC_DIR + 'example.warc.gz']) +com,example)/?example=1 20140103030321 http://example.com?example=1 text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1043 333 ../warcs/example.warc.gz +org,iana)/domains/example 20140128051539 http://www.iana.org/domains/example text/html 302 JZ622UA23G5ZU6Y3XAKH4LINONUEICEG - - 577 2907 ../warcs/example.warc.gz +Total: 4 + +# test custom root dir for cdx filenames, dir input +>>> cli_lines(['--sort', '--dir-root', get_test_dir() + 'other/', TEST_WARC_DIR]) +com,example)/ 20130729195151 http://test@example.com/ warc/revisit - B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 591 355 ../warcs/example-url-agnostic-revisit.warc.gz +org,iana,example)/ 20130702195402 http://example.iana.org/ text/html 200 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A - - 1001 353 ../warcs/example-url-agnostic-orig.warc.gz +Total: 206 + # test writing to temp dir, also use unicode filename >>> cli_lines_with_dir(unicode(TEST_WARC_DIR + 'example.warc.gz')) example.cdx diff --git a/tests/test_auto_colls.py b/tests/test_auto_colls.py index cae4d6b0..31e8fa89 100644 --- a/tests/test_auto_colls.py +++ b/tests/test_auto_colls.py @@ -5,6 +5,9 @@ import sys import webtest +import time +import threading + from io import BytesIO from pywb.webapp.pywb_init import create_wb_router @@ -22,7 +25,9 @@ from mock import patch #============================================================================= ARCHIVE_DIR = 'archive' INDEX_DIR = 'indexes' + INDEX_FILE = 'index.cdxj' +AUTOINDEX_FILE = 'autoindex.cdxj' #============================================================================= @@ -432,6 +437,56 @@ class TestManagedColls(object): # Nothing else to migrate main(['migrate', migrate_dir]) + def test_auto_index(self): + main(['init', 'auto']) + auto_dir = os.path.join(self.root_dir, 'collections', 'auto') + archive_dir = os.path.join(auto_dir, ARCHIVE_DIR) + + archive_sub_dir = os.path.join(archive_dir, 'sub') + os.makedirs(archive_sub_dir) + + def do_copy(): + try: + time.sleep(1) + shutil.copy(self._get_sample_warc('example.warc.gz'), archive_dir) + shutil.copy(self._get_sample_warc('example-extra.warc'), archive_sub_dir) + time.sleep(1) + finally: + import pywb.manager.autoindex + pywb.manager.autoindex.keep_running = False + + thread = threading.Thread(target=do_copy) + thread.daemon = True + thread.start() + + main(['autoindex']) + + index_file = os.path.join(auto_dir, INDEX_DIR, AUTOINDEX_FILE) + assert os.path.isfile(index_file) + + with open(index_file) as fh: + index = fh.read() + + assert '"example.warc.gz' in index + assert '"sub/example-extra.warc' in index, index + + mtime = os.path.getmtime(index_file) + + # Update + import pywb.manager.autoindex + pywb.manager.autoindex.keep_running = True + + os.remove(index_file) + + thread = threading.Thread(target=do_copy) + thread.daemon = True + thread.start() + + main(['autoindex', 'auto']) + + # assert file was update + assert os.path.getmtime(index_file) > mtime + def test_err_template_remove(self): """ Test various error conditions for templates: invalid template name, no collection for collection template