From d8d7435d773030ebc6c72b33e20e2085234af845 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Feb 2014 16:42:42 -0800 Subject: [PATCH] add zipnum location reloading support default to 10 min interval #17 --- pywb/cdx/cdxserver.py | 25 ++++++++-------- pywb/cdx/zipnum.py | 67 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 68 insertions(+), 24 deletions(-) diff --git a/pywb/cdx/cdxserver.py b/pywb/cdx/cdxserver.py index ffd34e81..e6825956 100644 --- a/pywb/cdx/cdxserver.py +++ b/pywb/cdx/cdxserver.py @@ -6,6 +6,8 @@ from zipnum import ZipNumCluster from cdxobject import CDXObject, CaptureNotFoundException, CDXException from cdxdomainspecific import load_domain_specific_cdx_rules +from pywb.utils.loaders import is_http + from itertools import chain import logging import os @@ -82,7 +84,7 @@ class CDXServer(BaseCDXServer): def __init__(self, paths, **kwargs): super(CDXServer, self).__init__(**kwargs) - self.sources = create_cdx_sources(paths) + self.sources = create_cdx_sources(paths, kwargs.get('config')) def load_cdx(self, **params): # if key not set, assume 'url' is set and needs canonicalization @@ -154,8 +156,7 @@ def create_cdx_server(config, ds_rules_file=None): logging.debug('CDX Surt-Ordered? ' + str(surt_ordered)) - if (isinstance(paths, str) and - any(paths.startswith(x) for x in ['http://', 'https://'])): + if isinstance(paths, str) and is_http(paths): server_cls = RemoteCDXServer else: server_cls = CDXServer @@ -167,7 +168,7 @@ def create_cdx_server(config, ds_rules_file=None): #================================================================= -def create_cdx_sources(paths): +def create_cdx_sources(paths, config=None): sources = [] if not isinstance(paths, list): @@ -175,13 +176,13 @@ def create_cdx_sources(paths): for path in paths: if isinstance(path, CDXSource): - add_cdx_source(sources, path) + add_cdx_source(sources, path, config) elif isinstance(path, str): if os.path.isdir(path): for file in os.listdir(path): - add_cdx_source(sources, path + file) + add_cdx_source(sources, path + file, config) else: - add_cdx_source(sources, path) + add_cdx_source(sources, path, config) if len(sources) == 0: logging.exception('No CDX Sources Found from: ' + str(sources)) @@ -190,9 +191,9 @@ def create_cdx_sources(paths): #================================================================= -def add_cdx_source(sources, source): +def add_cdx_source(sources, source, config): if not isinstance(source, CDXSource): - source = create_cdx_source(source) + source = create_cdx_source(source, config) if not source: return @@ -201,15 +202,15 @@ def add_cdx_source(sources, source): #================================================================= -def create_cdx_source(filename): - if filename.startswith('http://') or filename.startswith('https://'): +def create_cdx_source(filename, config): + if is_http(filename): return RemoteCDXSource(filename) if filename.endswith('.cdx'): return CDXFile(filename) if filename.endswith('.summary'): - return ZipNumCluster(filename) + return ZipNumCluster(filename, config) return None #TODO: support zipnum diff --git a/pywb/cdx/zipnum.py b/pywb/cdx/zipnum.py index 91db9e15..a62da6fc 100644 --- a/pywb/cdx/zipnum.py +++ b/pywb/cdx/zipnum.py @@ -3,6 +3,7 @@ import collections import itertools import logging from cStringIO import StringIO +import datetime from cdxsource import CDXSource from cdxobject import IDXObject @@ -38,29 +39,72 @@ def readline_to_iter(stream): #================================================================= class ZipNumCluster(CDXSource): - def __init__(self, summary, loc=None): + DEFAULT_RELOAD_INTERVAL = 10 # in minutes + DEFAULT_MAX_BLOCKS = 50 + + def __init__(self, summary, config=None): + + loc = None + cookie_maker = None + self.max_blocks = self.DEFAULT_MAX_BLOCKS + reload_ival = self.DEFAULT_RELOAD_INTERVAL + + if config: + loc = config.get('zipnum_loc') + cookie_maker = config.get('cookie_maker') + + self.max_blocks = config.get('max_blocks', + self.max_blocks) + + reload_ival = config.get('reload_interval', reload_ival) + if not loc: splits = os.path.splitext(summary) loc = splits[0] + '.loc' self.summary = summary - self.loc = loc - self.loc_map = self.load_loc(loc) + self.loc_filename = loc - @staticmethod - def load_loc(loc_file): - loc_map = {} - with open(loc_file) as fh: + # initial loc map + self.loc_map = {} + self.load_loc() + + # reload interval + self.loc_update_time = datetime.datetime.now() + self.reload_interval = datetime.timedelta(minutes=reload_ival) + + self.blk_loader = BlockLoader(cookie_maker=cookie_maker) + + def load_loc(self): + logging.debug('Loading loc from: ' + self.loc_filename) + with open(self.loc_filename) as fh: for line in fh: parts = line.rstrip().split('\t') - loc_map[parts[0]] = parts[1:] + self.loc_map[parts[0]] = parts[1:] - return loc_map + @staticmethod + def reload_timed(timestamp, val, delta, func): + now = datetime.datetime.now() + if now - timestamp >= delta: + func() + return now + return None + + def reload_loc(self): + newtime = self.reload_timed(self.loc_update_time, + self.loc_map, + self.reload_interval, + self.load_loc) + + if newtime: + self.loc_update_time = newtime def lookup_loc(self, part): return self.loc_map[part] def load_cdx(self, params): + self.reload_loc() + reader = SeekableTextFileReader(self.summary) idx_iter = iter_range(reader, @@ -83,7 +127,6 @@ class ZipNumCluster(CDXSource): def idx_to_cdx(self, idx_iter, params): blocks = None - max_blocks = 10 ranges = [] for idx in idx_iter: @@ -91,7 +134,7 @@ class ZipNumCluster(CDXSource): if (blocks and blocks.part == idx['part'] and blocks.offset + blocks.length == idx['offset'] and - blocks.count < max_blocks): + blocks.count < self.max_blocks): blocks.length += idx['length'] blocks.count += 1 @@ -134,7 +177,7 @@ class ZipNumCluster(CDXSource): msg = 'Loading {b.count} blocks from {loc}:{b.offset}+{b.length}' logging.debug(msg.format(b=blocks, loc=location)) - reader = BlockLoader().load(location, blocks.offset, blocks.length) + reader = self.blk_loader.load(location, blocks.offset, blocks.length) def decompress_block(range_): decomp = gzip_decompressor()