diff --git a/pywb/recorder/multifilewarcwriter.py b/pywb/recorder/multifilewarcwriter.py new file mode 100644 index 00000000..673ac042 --- /dev/null +++ b/pywb/recorder/multifilewarcwriter.py @@ -0,0 +1,269 @@ +import base64 +import datetime +import os +import shutil + +import traceback + +import portalocker + +from pywb.utils.timeutils import timestamp20_now + +from pywb.webagg.utils import res_template + +from pywb.warc.warcwriter import BaseWARCWriter + + +# ============================================================================ +class MultiFileWARCWriter(BaseWARCWriter): + FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz' + + def __init__(self, dir_template, filename_template=None, max_size=0, + max_idle_secs=1800, *args, **kwargs): + super(MultiFileWARCWriter, self).__init__(*args, **kwargs) + + if not filename_template: + dir_template, filename_template = os.path.split(dir_template) + dir_template += os.path.sep + + if not filename_template: + filename_template = self.FILE_TEMPLATE + + self.dir_template = dir_template + self.key_template = kwargs.get('key_template', self.dir_template) + self.dedup_index = kwargs.get('dedup_index') + self.filename_template = filename_template + self.max_size = max_size + if max_idle_secs > 0: + self.max_idle_time = datetime.timedelta(seconds=max_idle_secs) + else: + self.max_idle_time = None + + self.fh_cache = {} + + def write_req_resp(self, req, resp, params): + url = resp.rec_headers.get_header('WARC-Target-URI') + dt = resp.rec_headers.get_header('WARC-Date') + + #req.rec_headers['Content-Type'] = req.content_type + req.rec_headers.replace_header('WARC-Target-URI', url) + req.rec_headers.replace_header('WARC-Date', dt) + + resp_id = resp.rec_headers.get_header('WARC-Record-ID') + if resp_id: + req.rec_headers.add_header('WARC-Concurrent-To', resp_id) + + resp = self._check_revisit(resp, params) + if not resp: + print('Skipping due to dedup') + return + + self._do_write_req_resp(req, resp, params) + + def _check_revisit(self, record, params): + if not self.dedup_index: + return record + + try: + url = record.rec_headers.get_header('WARC-Target-URI') + digest = record.rec_headers.get_header('WARC-Payload-Digest') + iso_dt = record.rec_headers.get_header('WARC-Date') + result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt) + except Exception as e: + traceback.print_exc() + result = None + + if result == 'skip': + return None + + if isinstance(result, tuple) and result[0] == 'revisit': + record.rec_headers.replace_header('WARC-Type', 'revisit') + record.rec_headers.add_header('WARC-Profile', self.REVISIT_PROFILE) + + record.rec_headers.add_header('WARC-Refers-To-Target-URI', result[1]) + record.rec_headers.add_header('WARC-Refers-To-Date', result[2]) + + return record + + def get_new_filename(self, dir_, params): + timestamp = timestamp20_now() + + randstr = base64.b32encode(os.urandom(5)).decode('utf-8') + + filename = dir_ + res_template(self.filename_template, params, + hostname=self.hostname, + timestamp=timestamp, + random=randstr) + + return filename + + def allow_new_file(self, filename, params): + return True + + def _open_file(self, filename, params): + path, name = os.path.split(filename) + + try: + os.makedirs(path) + except: + pass + + fh = open(filename, 'a+b') + + if self.dedup_index: + self.dedup_index.add_warc_file(filename, params) + + return fh + + def _close_file(self, fh): + try: + portalocker.lock(fh, portalocker.LOCK_UN) + fh.close() + except Exception as e: + print(e) + + def get_dir_key(self, params): + return res_template(self.key_template, params) + + def close_key(self, dir_key): + if isinstance(dir_key, dict): + dir_key = self.get_dir_key(dir_key) + + result = self.fh_cache.pop(dir_key, None) + if not result: + return + + out, filename = result + self._close_file(out) + return filename + + def close_file(self, match_filename): + for dir_key, out, filename in self.iter_open_files(): + if filename == match_filename: + return self.close_key(dir_key) + + def _is_write_resp(self, resp, params): + return True + + def _is_write_req(self, req, params): + return True + + def write_record(self, record, params=None): + params = params or {} + self._do_write_req_resp(None, record, params) + + def _do_write_req_resp(self, req, resp, params): + def write_callback(out, filename): + #url = resp.rec_headers.get_header('WARC-Target-URI') + #print('Writing req/resp {0} to {1} '.format(url, filename)) + + if resp and self._is_write_resp(resp, params): + self._write_warc_record(out, resp) + + if req and self._is_write_req(req, params): + self._write_warc_record(out, req) + + return self._write_to_file(params, write_callback) + + def write_stream_to_file(self, params, stream): + def write_callback(out, filename): + #print('Writing stream to {0}'.format(filename)) + shutil.copyfileobj(stream, out) + + return self._write_to_file(params, write_callback) + + def _write_to_file(self, params, write_callback): + full_dir = res_template(self.dir_template, params) + dir_key = self.get_dir_key(params) + + result = self.fh_cache.get(dir_key) + + close_file = False + + if result: + out, filename = result + is_new = False + else: + filename = self.get_new_filename(full_dir, params) + + if not self.allow_new_file(filename, params): + return False + + out = self._open_file(filename, params) + + is_new = True + + try: + start = out.tell() + + write_callback(out, filename) + + out.flush() + + new_size = out.tell() + + out.seek(start) + + if self.dedup_index: + self.dedup_index.add_urls_to_index(out, params, + filename, + new_size - start) + + return True + + except Exception as e: + traceback.print_exc() + close_file = True + return False + + finally: + # check for rollover + if self.max_size and new_size > self.max_size: + close_file = True + + if close_file: + self._close_file(out) + if not is_new: + self.fh_cache.pop(dir_key, None) + + elif is_new: + portalocker.lock(out, portalocker.LOCK_EX | portalocker.LOCK_NB) + self.fh_cache[dir_key] = (out, filename) + + def iter_open_files(self): + for n, v in list(self.fh_cache.items()): + out, filename = v + yield n, out, filename + + def close(self): + for dir_key, out, filename in self.iter_open_files(): + self._close_file(out) + + self.fh_cache = {} + + def close_idle_files(self): + if not self.max_idle_time: + return + + now = datetime.datetime.now() + + for dir_key, out, filename in self.iter_open_files(): + try: + mtime = os.path.getmtime(filename) + except: + self.close_key(dir_key) + return + + mtime = datetime.datetime.fromtimestamp(mtime) + + if (now - mtime) > self.max_idle_time: + print('Closing idle ' + filename) + self.close_key(dir_key) + + +# ============================================================================ +class PerRecordWARCWriter(MultiFileWARCWriter): + def __init__(self, *args, **kwargs): + kwargs['max_size'] = 1 + super(PerRecordWARCWriter, self).__init__(*args, **kwargs) + diff --git a/pywb/recorder/test/simplerec.py b/pywb/recorder/test/simplerec.py index e7665c6b..8cfeddb5 100644 --- a/pywb/recorder/test/simplerec.py +++ b/pywb/recorder/test/simplerec.py @@ -3,7 +3,7 @@ from gevent import monkey; monkey.patch_all() from pywb.recorder.recorderapp import RecorderApp from pywb.recorder.redisindexer import WritableRedisIndexer -from pywb.recorder.warcwriter import MultiFileWARCWriter +from pywb.recorder.multifilewarcwriter import MultiFileWARCWriter from pywb.recorder.filters import SkipDupePolicy import atexit diff --git a/pywb/recorder/test/test_recorder.py b/pywb/recorder/test/test_recorder.py index 122c57ba..8a32ec59 100644 --- a/pywb/recorder/test/test_recorder.py +++ b/pywb/recorder/test/test_recorder.py @@ -13,7 +13,7 @@ from fakeredis import FakeStrictRedis from pywb.recorder.recorderapp import RecorderApp from pywb.recorder.redisindexer import WritableRedisIndexer -from pywb.recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter, SimpleTempWARCWriter +from pywb.recorder.multifilewarcwriter import PerRecordWARCWriter, MultiFileWARCWriter from pywb.recorder.filters import ExcludeSpecificHeaders from pywb.recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy diff --git a/pywb/utils/timeutils.py b/pywb/utils/timeutils.py index 3c5cd9f3..93cd2c2f 100644 --- a/pywb/utils/timeutils.py +++ b/pywb/utils/timeutils.py @@ -7,7 +7,7 @@ import re import time import datetime import calendar -from six.moves import map + from email.utils import parsedate, formatdate #================================================================= @@ -37,7 +37,7 @@ def iso_date_to_datetime(string): if nums[-1] == '': nums = nums[:-1] - the_datetime = datetime.datetime(*map(int, nums)) + the_datetime = datetime.datetime(*(int(num) for num in nums)) return the_datetime diff --git a/pywb/warc/archiveindexer.py b/pywb/warc/archiveindexer.py new file mode 100644 index 00000000..e5e8cfe5 --- /dev/null +++ b/pywb/warc/archiveindexer.py @@ -0,0 +1,342 @@ +from pywb.utils.timeutils import iso_date_to_timestamp +from pywb.utils.canonicalize import canonicalize +from pywb.utils.loaders import extract_post_query, append_post_query + +from pywb.warc.archiveiterator import ArchiveIterator + +import hashlib +import base64 +import six + +import re +import sys + +try: # pragma: no cover + from collections import OrderedDict +except ImportError: # pragma: no cover + from ordereddict import OrderedDict + + +#================================================================= +class ArchiveIndexEntryMixin(object): + MIME_RE = re.compile('[; ]') + + def __init__(self): + super(ArchiveIndexEntryMixin, self).__init__() + self.reset_entry() + + def reset_entry(self): + self['urlkey'] = '' + self['metadata'] = '' + self.buffer = None + self.record = None + + + def extract_mime(self, mime, def_mime='unk'): + """ Utility function to extract mimetype only + from a full content type, removing charset settings + """ + self['mime'] = def_mime + if mime: + self['mime'] = self.MIME_RE.split(mime, 1)[0] + self['_content_type'] = mime + + def extract_status(self, status_headers): + """ Extract status code only from status line + """ + self['status'] = status_headers.get_statuscode() + if not self['status']: + self['status'] = '-' + elif self['status'] == '204' and 'Error' in status_headers.statusline: + self['status'] = '-' + + def set_rec_info(self, offset, length): + self['length'] = str(length) + self['offset'] = str(offset) + + def merge_request_data(self, other, options): + surt_ordered = options.get('surt_ordered', True) + + if other.record.rec_type != 'request': + return False + + # two requests, not correct + if self.record.rec_type == 'request': + return False + + # merge POST/PUT body query + post_query = other.get('_post_query') + if post_query: + url = append_post_query(self['url'], post_query) + self['urlkey'] = canonicalize(url, surt_ordered) + other['urlkey'] = self['urlkey'] + + referer = other.record.status_headers.get_header('referer') + if referer: + self['_referer'] = referer + + return True + + +#================================================================= +class DefaultRecordParser(object): + def __init__(self, **options): + self.options = options + self.entry_cache = {} + self.digester = None + self.buff = None + + def _create_index_entry(self, rec_type): + try: + entry = self.entry_cache[rec_type] + entry.reset_entry() + except: + if self.options.get('cdxj'): + entry = OrderedArchiveIndexEntry() + else: + entry = ArchiveIndexEntry() + + # don't reuse when using append post + # entry may be cached + if not self.options.get('append_post'): + self.entry_cache[rec_type] = entry + + return entry + + def begin_payload(self, compute_digest, entry): + if compute_digest: + self.digester = hashlib.sha1() + else: + self.digester = None + + self.entry = entry + entry.buffer = self.create_payload_buffer(entry) + + def handle_payload(self, buff): + if self.digester: + self.digester.update(buff) + + if self.entry and self.entry.buffer: + self.entry.buffer.write(buff) + + def end_payload(self, entry): + if self.digester: + entry['digest'] = base64.b32encode(self.digester.digest()).decode('ascii') + + self.entry = None + + def create_payload_buffer(self, entry): + return None + + def create_record_iter(self, raw_iter): + append_post = self.options.get('append_post') + include_all = self.options.get('include_all') + surt_ordered = self.options.get('surt_ordered', True) + minimal = self.options.get('minimal') + + if append_post and minimal: + raise Exception('Sorry, minimal index option and ' + + 'append POST options can not be used together') + + for record in raw_iter: + entry = None + + if not include_all and not minimal and (record.status_headers.get_statuscode() == '-'): + continue + + if record.rec_type == 'arc_header': + continue + + if record.format == 'warc': + if (record.rec_type in ('request', 'warcinfo') and + not include_all and + not append_post): + continue + + elif (not include_all and + record.content_type == 'application/warc-fields'): + continue + + entry = self.parse_warc_record(record) + elif record.format == 'arc': + entry = self.parse_arc_record(record) + + if not entry: + continue + + if entry.get('url') and not entry.get('urlkey'): + entry['urlkey'] = canonicalize(entry['url'], surt_ordered) + + compute_digest = False + + if (entry.get('digest', '-') == '-' and + record.rec_type not in ('revisit', 'request', 'warcinfo')): + + compute_digest = True + + elif not minimal and record.rec_type == 'request' and append_post: + method = record.status_headers.protocol + len_ = record.status_headers.get_header('Content-Length') + + post_query = extract_post_query(method, + entry.get('_content_type'), + len_, + record.stream) + + entry['_post_query'] = post_query + + entry.record = record + + self.begin_payload(compute_digest, entry) + raw_iter.read_to_end(record, self.handle_payload) + + entry.set_rec_info(*raw_iter.member_info) + self.end_payload(entry) + + yield entry + + def join_request_records(self, entry_iter): + prev_entry = None + + for entry in entry_iter: + if not prev_entry: + prev_entry = entry + continue + + # check for url match + if (entry['url'] != prev_entry['url']): + pass + + # check for concurrency also + elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') != + prev_entry.record.rec_headers.get_header('WARC-Record-ID')): + pass + + elif (entry.merge_request_data(prev_entry, self.options) or + prev_entry.merge_request_data(entry, self.options)): + yield prev_entry + yield entry + prev_entry = None + continue + + yield prev_entry + prev_entry = entry + + if prev_entry: + yield prev_entry + + + #================================================================= + def parse_warc_record(self, record): + """ Parse warc record + """ + + entry = self._create_index_entry(record.rec_type) + + if record.rec_type == 'warcinfo': + entry['url'] = record.rec_headers.get_header('WARC-Filename') + entry['urlkey'] = entry['url'] + entry['_warcinfo'] = record.stream.read(record.length) + return entry + + entry['url'] = record.rec_headers.get_header('WARC-Target-Uri') + + # timestamp + entry['timestamp'] = iso_date_to_timestamp(record.rec_headers. + get_header('WARC-Date')) + + # mime + if record.rec_type == 'revisit': + entry['mime'] = 'warc/revisit' + elif self.options.get('minimal'): + entry['mime'] = '-' + else: + def_mime = '-' if record.rec_type == 'request' else 'unk' + entry.extract_mime(record.status_headers. + get_header('Content-Type'), + def_mime) + + # status -- only for response records (by convention): + if record.rec_type == 'response' and not self.options.get('minimal'): + entry.extract_status(record.status_headers) + else: + entry['status'] = '-' + + # digest + digest = record.rec_headers.get_header('WARC-Payload-Digest') + entry['digest'] = digest + if digest and digest.startswith('sha1:'): + entry['digest'] = digest[len('sha1:'):] + + elif not entry.get('digest'): + entry['digest'] = '-' + + # optional json metadata, if present + metadata = record.rec_headers.get_header('WARC-Json-Metadata') + if metadata: + entry['metadata'] = metadata + + return entry + + + #================================================================= + def parse_arc_record(self, record): + """ Parse arc record + """ + url = record.rec_headers.get_header('uri') + url = url.replace('\r', '%0D') + url = url.replace('\n', '%0A') + # replace formfeed + url = url.replace('\x0c', '%0C') + # replace nulls + url = url.replace('\x00', '%00') + + entry = self._create_index_entry(record.rec_type) + entry['url'] = url + + # timestamp + entry['timestamp'] = record.rec_headers.get_header('archive-date') + if len(entry['timestamp']) > 14: + entry['timestamp'] = entry['timestamp'][:14] + + if not self.options.get('minimal'): + # mime + entry.extract_mime(record.rec_headers.get_header('content-type')) + + # status + entry.extract_status(record.status_headers) + + # digest + entry['digest'] = '-' + + return entry + + def __call__(self, fh): + aiter = ArchiveIterator(fh, self.options.get('minimal', False), + self.options.get('verify_http', False), + self.options.get('arc2warc', False)) + + entry_iter = self.create_record_iter(aiter) + + if self.options.get('append_post'): + entry_iter = self.join_request_records(entry_iter) + + for entry in entry_iter: + if (entry.record.rec_type in ('request', 'warcinfo') and + not self.options.get('include_all')): + continue + + yield entry + + def open(self, filename): + with open(filename, 'rb') as fh: + for entry in self(fh): + yield entry + +class ArchiveIndexEntry(ArchiveIndexEntryMixin, dict): + pass + +class OrderedArchiveIndexEntry(ArchiveIndexEntryMixin, OrderedDict): + pass + + diff --git a/pywb/warc/archiveiterator.py b/pywb/warc/archiveiterator.py index c4785d3a..69296e5f 100644 --- a/pywb/warc/archiveiterator.py +++ b/pywb/warc/archiveiterator.py @@ -1,22 +1,10 @@ -from pywb.utils.timeutils import iso_date_to_timestamp from pywb.utils.bufferedreaders import DecompressingBufferedReader -from pywb.utils.canonicalize import canonicalize -from pywb.utils.loaders import extract_post_query, append_post_query from pywb.warc.recordloader import ArcWarcRecordLoader -import hashlib -import base64 import six - -import re import sys -try: # pragma: no cover - from collections import OrderedDict -except ImportError: # pragma: no cover - from ordereddict import OrderedDict - # ============================================================================ BUFF_SIZE = 16384 @@ -243,326 +231,3 @@ class ArchiveIterator(six.Iterator): return record -#================================================================= -class ArchiveIndexEntryMixin(object): - MIME_RE = re.compile('[; ]') - - def __init__(self): - super(ArchiveIndexEntryMixin, self).__init__() - self.reset_entry() - - def reset_entry(self): - self['urlkey'] = '' - self['metadata'] = '' - self.buffer = None - self.record = None - - - def extract_mime(self, mime, def_mime='unk'): - """ Utility function to extract mimetype only - from a full content type, removing charset settings - """ - self['mime'] = def_mime - if mime: - self['mime'] = self.MIME_RE.split(mime, 1)[0] - self['_content_type'] = mime - - def extract_status(self, status_headers): - """ Extract status code only from status line - """ - self['status'] = status_headers.get_statuscode() - if not self['status']: - self['status'] = '-' - elif self['status'] == '204' and 'Error' in status_headers.statusline: - self['status'] = '-' - - def set_rec_info(self, offset, length): - self['length'] = str(length) - self['offset'] = str(offset) - - def merge_request_data(self, other, options): - surt_ordered = options.get('surt_ordered', True) - - if other.record.rec_type != 'request': - return False - - # two requests, not correct - if self.record.rec_type == 'request': - return False - - # merge POST/PUT body query - post_query = other.get('_post_query') - if post_query: - url = append_post_query(self['url'], post_query) - self['urlkey'] = canonicalize(url, surt_ordered) - other['urlkey'] = self['urlkey'] - - referer = other.record.status_headers.get_header('referer') - if referer: - self['_referer'] = referer - - return True - - -#================================================================= -class DefaultRecordParser(object): - def __init__(self, **options): - self.options = options - self.entry_cache = {} - self.digester = None - self.buff = None - - def _create_index_entry(self, rec_type): - try: - entry = self.entry_cache[rec_type] - entry.reset_entry() - except: - if self.options.get('cdxj'): - entry = OrderedArchiveIndexEntry() - else: - entry = ArchiveIndexEntry() - - # don't reuse when using append post - # entry may be cached - if not self.options.get('append_post'): - self.entry_cache[rec_type] = entry - - return entry - - def begin_payload(self, compute_digest, entry): - if compute_digest: - self.digester = hashlib.sha1() - else: - self.digester = None - - self.entry = entry - entry.buffer = self.create_payload_buffer(entry) - - def handle_payload(self, buff): - if self.digester: - self.digester.update(buff) - - if self.entry and self.entry.buffer: - self.entry.buffer.write(buff) - - def end_payload(self, entry): - if self.digester: - entry['digest'] = base64.b32encode(self.digester.digest()).decode('ascii') - - self.entry = None - - def create_payload_buffer(self, entry): - return None - - def create_record_iter(self, raw_iter): - append_post = self.options.get('append_post') - include_all = self.options.get('include_all') - surt_ordered = self.options.get('surt_ordered', True) - minimal = self.options.get('minimal') - - if append_post and minimal: - raise Exception('Sorry, minimal index option and ' + - 'append POST options can not be used together') - - for record in raw_iter: - entry = None - - if not include_all and not minimal and (record.status_headers.get_statuscode() == '-'): - continue - - if record.rec_type == 'arc_header': - continue - - if record.format == 'warc': - if (record.rec_type in ('request', 'warcinfo') and - not include_all and - not append_post): - continue - - elif (not include_all and - record.content_type == 'application/warc-fields'): - continue - - entry = self.parse_warc_record(record) - elif record.format == 'arc': - entry = self.parse_arc_record(record) - - if not entry: - continue - - if entry.get('url') and not entry.get('urlkey'): - entry['urlkey'] = canonicalize(entry['url'], surt_ordered) - - compute_digest = False - - if (entry.get('digest', '-') == '-' and - record.rec_type not in ('revisit', 'request', 'warcinfo')): - - compute_digest = True - - elif not minimal and record.rec_type == 'request' and append_post: - method = record.status_headers.protocol - len_ = record.status_headers.get_header('Content-Length') - - post_query = extract_post_query(method, - entry.get('_content_type'), - len_, - record.stream) - - entry['_post_query'] = post_query - - entry.record = record - - self.begin_payload(compute_digest, entry) - raw_iter.read_to_end(record, self.handle_payload) - - entry.set_rec_info(*raw_iter.member_info) - self.end_payload(entry) - - yield entry - - def join_request_records(self, entry_iter): - prev_entry = None - - for entry in entry_iter: - if not prev_entry: - prev_entry = entry - continue - - # check for url match - if (entry['url'] != prev_entry['url']): - pass - - # check for concurrency also - elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') != - prev_entry.record.rec_headers.get_header('WARC-Record-ID')): - pass - - elif (entry.merge_request_data(prev_entry, self.options) or - prev_entry.merge_request_data(entry, self.options)): - yield prev_entry - yield entry - prev_entry = None - continue - - yield prev_entry - prev_entry = entry - - if prev_entry: - yield prev_entry - - - #================================================================= - def parse_warc_record(self, record): - """ Parse warc record - """ - - entry = self._create_index_entry(record.rec_type) - - if record.rec_type == 'warcinfo': - entry['url'] = record.rec_headers.get_header('WARC-Filename') - entry['urlkey'] = entry['url'] - entry['_warcinfo'] = record.stream.read(record.length) - return entry - - entry['url'] = record.rec_headers.get_header('WARC-Target-Uri') - - # timestamp - entry['timestamp'] = iso_date_to_timestamp(record.rec_headers. - get_header('WARC-Date')) - - # mime - if record.rec_type == 'revisit': - entry['mime'] = 'warc/revisit' - elif self.options.get('minimal'): - entry['mime'] = '-' - else: - def_mime = '-' if record.rec_type == 'request' else 'unk' - entry.extract_mime(record.status_headers. - get_header('Content-Type'), - def_mime) - - # status -- only for response records (by convention): - if record.rec_type == 'response' and not self.options.get('minimal'): - entry.extract_status(record.status_headers) - else: - entry['status'] = '-' - - # digest - digest = record.rec_headers.get_header('WARC-Payload-Digest') - entry['digest'] = digest - if digest and digest.startswith('sha1:'): - entry['digest'] = digest[len('sha1:'):] - - elif not entry.get('digest'): - entry['digest'] = '-' - - # optional json metadata, if present - metadata = record.rec_headers.get_header('WARC-Json-Metadata') - if metadata: - entry['metadata'] = metadata - - return entry - - - #================================================================= - def parse_arc_record(self, record): - """ Parse arc record - """ - url = record.rec_headers.get_header('uri') - url = url.replace('\r', '%0D') - url = url.replace('\n', '%0A') - # replace formfeed - url = url.replace('\x0c', '%0C') - # replace nulls - url = url.replace('\x00', '%00') - - entry = self._create_index_entry(record.rec_type) - entry['url'] = url - - # timestamp - entry['timestamp'] = record.rec_headers.get_header('archive-date') - if len(entry['timestamp']) > 14: - entry['timestamp'] = entry['timestamp'][:14] - - if not self.options.get('minimal'): - # mime - entry.extract_mime(record.rec_headers.get_header('content-type')) - - # status - entry.extract_status(record.status_headers) - - # digest - entry['digest'] = '-' - - return entry - - def __call__(self, fh): - aiter = ArchiveIterator(fh, self.options.get('minimal', False), - self.options.get('verify_http', False), - self.options.get('arc2warc', False)) - - entry_iter = self.create_record_iter(aiter) - - if self.options.get('append_post'): - entry_iter = self.join_request_records(entry_iter) - - for entry in entry_iter: - if (entry.record.rec_type in ('request', 'warcinfo') and - not self.options.get('include_all')): - continue - - yield entry - - def open(self, filename): - with open(filename, 'rb') as fh: - for entry in self(fh): - yield entry - -class ArchiveIndexEntry(ArchiveIndexEntryMixin, dict): - pass - -class OrderedArchiveIndexEntry(ArchiveIndexEntryMixin, OrderedDict): - pass - - diff --git a/pywb/warc/cdxindexer.py b/pywb/warc/cdxindexer.py index 13e7ba26..fd633eed 100644 --- a/pywb/warc/cdxindexer.py +++ b/pywb/warc/cdxindexer.py @@ -31,7 +31,7 @@ from bisect import insort from six import StringIO -from pywb.warc.archiveiterator import DefaultRecordParser +from pywb.warc.archiveindexer import DefaultRecordParser import codecs import six diff --git a/pywb/recorder/test/test_writer.py b/pywb/warc/test/test_writer.py similarity index 93% rename from pywb/recorder/test/test_writer.py rename to pywb/warc/test/test_writer.py index 40bf1e97..dd593575 100644 --- a/pywb/recorder/test/test_writer.py +++ b/pywb/warc/test/test_writer.py @@ -1,5 +1,5 @@ from pywb.utils.statusandheaders import StatusAndHeaders -from pywb.recorder.warcwriter import SimpleTempWARCWriter +from pywb.warc.warcwriter import BufferWARCWriter from pywb.warc.recordloader import ArcWarcRecordLoader from pywb.warc.archiveiterator import ArchiveIterator @@ -9,7 +9,7 @@ import json # ============================================================================ -class FixedTestWARCWriter(SimpleTempWARCWriter): +class FixedTestWARCWriter(BufferWARCWriter): @classmethod def _make_warc_id(cls, id_=None): return '' @@ -36,7 +36,7 @@ class TestWarcWriter(object): record = simplewriter.create_warcinfo_record('testfile.warc.gz', params) simplewriter.write_record(record) - buff = simplewriter.get_buffer() + buff = simplewriter.get_contents() assert isinstance(buff, bytes) buff = BytesIO(buff) @@ -71,7 +71,7 @@ json-metadata: {"foo": "bar"}\r\n\ \r\n\ ' - assert simplewriter.get_buffer().decode('utf-8') == warcinfo_record + assert simplewriter.get_contents().decode('utf-8') == warcinfo_record def test_generate_response(self): headers_list = [('Content-Type', 'text/plain; charset="UTF-8"'), @@ -93,7 +93,7 @@ json-metadata: {"foo": "bar"}\r\n\ writer.write_record(record) - buff = writer.get_buffer() + buff = writer.get_contents() self._validate_record_content_len(BytesIO(buff)) diff --git a/pywb/recorder/warcwriter.py b/pywb/warc/warcwriter.py similarity index 51% rename from pywb/recorder/warcwriter.py rename to pywb/warc/warcwriter.py index 6c9eca8a..e98d46d3 100644 --- a/pywb/recorder/warcwriter.py +++ b/pywb/warc/warcwriter.py @@ -4,31 +4,24 @@ import base64 import hashlib import datetime import zlib -import sys -import os import six -import shutil - -import traceback from socket import gethostname from io import BytesIO -import portalocker - -from pywb.utils.loaders import LimitReader, to_native_str -from pywb.utils.bufferedreaders import BufferedReader -from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date +from pywb.utils.loaders import to_native_str +from pywb.utils.timeutils import datetime_to_iso_date from pywb.utils.statusandheaders import StatusAndHeadersParser, StatusAndHeaders + from pywb.warc.recordloader import ArcWarcRecord from pywb.warc.recordloader import ArcWarcRecordLoader -from pywb.webagg.utils import res_template, BUFF_SIZE - # ============================================================================ class BaseWARCWriter(object): + BUFF_SIZE = 16384 + WARC_RECORDS = {'warcinfo': 'application/warc-fields', 'response': 'application/http; msgtype=response', 'revisit': 'application/http; msgtype=response', @@ -38,25 +31,20 @@ class BaseWARCWriter(object): REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/uri-agnostic-identical-payload-digest' - FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz' - WARC_VERSION = 'WARC/1.0' - def __init__(self, gzip=True, dedup_index=None, - header_filter=None, *args, **kwargs): - + def __init__(self, gzip=True, header_filter=None, *args, **kwargs): self.gzip = gzip - self.dedup_index = dedup_index self.header_filter = header_filter self.hostname = gethostname() self.parser = StatusAndHeadersParser([], verify=False) self.warc_version = kwargs.get('warc_version', self.WARC_VERSION) - @staticmethod - def _iter_stream(stream): + @classmethod + def _iter_stream(cls, stream): while True: - buf = stream.read(BUFF_SIZE) + buf = stream.read(cls.BUFF_SIZE) if not buf: return @@ -94,25 +82,6 @@ class BaseWARCWriter(object): buff = record.status_headers.to_bytes(exclude_list) record.status_headers.headers_buff = buff - def write_req_resp(self, req, resp, params): - url = resp.rec_headers.get_header('WARC-Target-URI') - dt = resp.rec_headers.get_header('WARC-Date') - - #req.rec_headers['Content-Type'] = req.content_type - req.rec_headers.replace_header('WARC-Target-URI', url) - req.rec_headers.replace_header('WARC-Date', dt) - - resp_id = resp.rec_headers.get_header('WARC-Record-ID') - if resp_id: - req.rec_headers.add_header('WARC-Concurrent-To', resp_id) - - resp = self._check_revisit(resp, params) - if not resp: - print('Skipping due to dedup') - return - - self._do_write_req_resp(req, resp, params) - def create_warcinfo_record(self, filename, info): warc_headers = StatusAndHeaders(self.warc_version, []) warc_headers.add_header('WARC-Type', 'warcinfo') @@ -182,31 +151,6 @@ class BaseWARCWriter(object): return record - def _check_revisit(self, record, params): - if not self.dedup_index: - return record - - try: - url = record.rec_headers.get_header('WARC-Target-URI') - digest = record.rec_headers.get_header('WARC-Payload-Digest') - iso_dt = record.rec_headers.get_header('WARC-Date') - result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt) - except Exception as e: - traceback.print_exc() - result = None - - if result == 'skip': - return None - - if isinstance(result, tuple) and result[0] == 'revisit': - record.rec_headers.replace_header('WARC-Type', 'revisit') - record.rec_headers.add_header('WARC-Profile', self.REVISIT_PROFILE) - - record.rec_headers.add_header('WARC-Refers-To-Target-URI', result[1]) - record.rec_headers.add_header('WARC-Refers-To-Date', result[2]) - - return record - def _write_warc_record(self, out, record, adjust_cl=True): if self.gzip: out = GzippingWrapper(out) @@ -321,231 +265,40 @@ class Digester(object): # ============================================================================ -class MultiFileWARCWriter(BaseWARCWriter): - def __init__(self, dir_template, filename_template=None, max_size=0, - max_idle_secs=1800, *args, **kwargs): - super(MultiFileWARCWriter, self).__init__(*args, **kwargs) - - if not filename_template: - dir_template, filename_template = os.path.split(dir_template) - dir_template += os.path.sep - - if not filename_template: - filename_template = self.FILE_TEMPLATE - - self.dir_template = dir_template - self.key_template = kwargs.get('key_template', self.dir_template) - self.filename_template = filename_template - self.max_size = max_size - if max_idle_secs > 0: - self.max_idle_time = datetime.timedelta(seconds=max_idle_secs) - else: - self.max_idle_time = None - - self.fh_cache = {} - - def get_new_filename(self, dir_, params): - timestamp = timestamp20_now() - - randstr = base64.b32encode(os.urandom(5)).decode('utf-8') - - filename = dir_ + res_template(self.filename_template, params, - hostname=self.hostname, - timestamp=timestamp, - random=randstr) - - return filename - - def allow_new_file(self, filename, params): - return True - - def _open_file(self, filename, params): - path, name = os.path.split(filename) - - try: - os.makedirs(path) - except: - pass - - fh = open(filename, 'a+b') - - if self.dedup_index: - self.dedup_index.add_warc_file(filename, params) - - return fh - - def _close_file(self, fh): - try: - portalocker.lock(fh, portalocker.LOCK_UN) - fh.close() - except Exception as e: - print(e) - - def get_dir_key(self, params): - return res_template(self.key_template, params) - - def close_key(self, dir_key): - if isinstance(dir_key, dict): - dir_key = self.get_dir_key(dir_key) - - result = self.fh_cache.pop(dir_key, None) - if not result: - return - - out, filename = result - self._close_file(out) - return filename - - def close_file(self, match_filename): - for dir_key, out, filename in self.iter_open_files(): - if filename == match_filename: - return self.close_key(dir_key) - - def _is_write_resp(self, resp, params): - return True - - def _is_write_req(self, req, params): - return True - - def write_record(self, record, params=None): - params = params or {} - self._do_write_req_resp(None, record, params) - - def _do_write_req_resp(self, req, resp, params): - def write_callback(out, filename): - #url = resp.rec_headers.get_header('WARC-Target-URI') - #print('Writing req/resp {0} to {1} '.format(url, filename)) - - if resp and self._is_write_resp(resp, params): - self._write_warc_record(out, resp) - - if req and self._is_write_req(req, params): - self._write_warc_record(out, req) - - return self._write_to_file(params, write_callback) - - def write_stream_to_file(self, params, stream): - def write_callback(out, filename): - #print('Writing stream to {0}'.format(filename)) - shutil.copyfileobj(stream, out) - - return self._write_to_file(params, write_callback) - - def _write_to_file(self, params, write_callback): - full_dir = res_template(self.dir_template, params) - dir_key = self.get_dir_key(params) - - result = self.fh_cache.get(dir_key) - - close_file = False - - if result: - out, filename = result - is_new = False - else: - filename = self.get_new_filename(full_dir, params) - - if not self.allow_new_file(filename, params): - return False - - out = self._open_file(filename, params) - - is_new = True - - try: - start = out.tell() - - write_callback(out, filename) - - out.flush() - - new_size = out.tell() - - out.seek(start) - - if self.dedup_index: - self.dedup_index.add_urls_to_index(out, params, - filename, - new_size - start) - - return True - - except Exception as e: - traceback.print_exc() - close_file = True - return False - - finally: - # check for rollover - if self.max_size and new_size > self.max_size: - close_file = True - - if close_file: - self._close_file(out) - if not is_new: - self.fh_cache.pop(dir_key, None) - - elif is_new: - portalocker.lock(out, portalocker.LOCK_EX | portalocker.LOCK_NB) - self.fh_cache[dir_key] = (out, filename) - - def iter_open_files(self): - for n, v in list(self.fh_cache.items()): - out, filename = v - yield n, out, filename - - def close(self): - for dir_key, out, filename in self.iter_open_files(): - self._close_file(out) - - self.fh_cache = {} - - def close_idle_files(self): - if not self.max_idle_time: - return - - now = datetime.datetime.now() - - for dir_key, out, filename in self.iter_open_files(): - try: - mtime = os.path.getmtime(filename) - except: - self.close_key(dir_key) - return - - mtime = datetime.datetime.fromtimestamp(mtime) - - if (now - mtime) > self.max_idle_time: - print('Closing idle ' + filename) - self.close_key(dir_key) - - -# ============================================================================ -class PerRecordWARCWriter(MultiFileWARCWriter): +class BufferWARCWriter(BaseWARCWriter): def __init__(self, *args, **kwargs): - kwargs['max_size'] = 1 - super(PerRecordWARCWriter, self).__init__(*args, **kwargs) - - -# ============================================================================ -class SimpleTempWARCWriter(BaseWARCWriter): - def __init__(self, *args, **kwargs): - super(SimpleTempWARCWriter, self).__init__(*args, **kwargs) + super(BufferWARCWriter, self).__init__(*args, **kwargs) self.out = self._create_buffer() def _create_buffer(self): return tempfile.SpooledTemporaryFile(max_size=512*1024) - def _do_write_req_resp(self, req, resp, params): - self._write_warc_record(self.out, resp) - self._write_warc_record(self.out, req) - - def write_record(self, record, params=None): + def write_record(self, record): self._write_warc_record(self.out, record) - def get_buffer(self): + def get_contents(self): pos = self.out.tell() self.out.seek(0) buff = self.out.read() self.out.seek(pos) return buff + + +# ============================================================================ +class FileWARCWriter(BufferWARCWriter): + def __init__(self, *args, **kwargs): + file_or_buff = None + if len(args) > 0: + file_or_buff = args[0] + else: + file_or_buff = kwargs.get('file') + + if isinstance(file_or_buff, str): + self.out = open(file_or_buff, 'rb') + elif hasattr(file_or_buff, 'read'): + self.out = file_or_buff + else: + raise Exception('file must be a readable or valid filename') + + +