1
0
mirror of https://github.com/webrecorder/pywb.git synced 2025-03-15 08:04:49 +01:00

warc & recorder refactor: split BaseWARCWriter from MultiWARCWriter, move to warc/warcwriter.py, recorder/multifilewarcwriter.py

split indexing functionality from base warc iterator, move to archiveindexer.py
This commit is contained in:
Ilya Kreymer 2017-03-01 14:18:44 -08:00
parent 3faa55906a
commit 1213466afb
9 changed files with 654 additions and 625 deletions

View File

@ -0,0 +1,269 @@
import base64
import datetime
import os
import shutil
import traceback
import portalocker
from pywb.utils.timeutils import timestamp20_now
from pywb.webagg.utils import res_template
from pywb.warc.warcwriter import BaseWARCWriter
# ============================================================================
class MultiFileWARCWriter(BaseWARCWriter):
FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz'
def __init__(self, dir_template, filename_template=None, max_size=0,
max_idle_secs=1800, *args, **kwargs):
super(MultiFileWARCWriter, self).__init__(*args, **kwargs)
if not filename_template:
dir_template, filename_template = os.path.split(dir_template)
dir_template += os.path.sep
if not filename_template:
filename_template = self.FILE_TEMPLATE
self.dir_template = dir_template
self.key_template = kwargs.get('key_template', self.dir_template)
self.dedup_index = kwargs.get('dedup_index')
self.filename_template = filename_template
self.max_size = max_size
if max_idle_secs > 0:
self.max_idle_time = datetime.timedelta(seconds=max_idle_secs)
else:
self.max_idle_time = None
self.fh_cache = {}
def write_req_resp(self, req, resp, params):
url = resp.rec_headers.get_header('WARC-Target-URI')
dt = resp.rec_headers.get_header('WARC-Date')
#req.rec_headers['Content-Type'] = req.content_type
req.rec_headers.replace_header('WARC-Target-URI', url)
req.rec_headers.replace_header('WARC-Date', dt)
resp_id = resp.rec_headers.get_header('WARC-Record-ID')
if resp_id:
req.rec_headers.add_header('WARC-Concurrent-To', resp_id)
resp = self._check_revisit(resp, params)
if not resp:
print('Skipping due to dedup')
return
self._do_write_req_resp(req, resp, params)
def _check_revisit(self, record, params):
if not self.dedup_index:
return record
try:
url = record.rec_headers.get_header('WARC-Target-URI')
digest = record.rec_headers.get_header('WARC-Payload-Digest')
iso_dt = record.rec_headers.get_header('WARC-Date')
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
except Exception as e:
traceback.print_exc()
result = None
if result == 'skip':
return None
if isinstance(result, tuple) and result[0] == 'revisit':
record.rec_headers.replace_header('WARC-Type', 'revisit')
record.rec_headers.add_header('WARC-Profile', self.REVISIT_PROFILE)
record.rec_headers.add_header('WARC-Refers-To-Target-URI', result[1])
record.rec_headers.add_header('WARC-Refers-To-Date', result[2])
return record
def get_new_filename(self, dir_, params):
timestamp = timestamp20_now()
randstr = base64.b32encode(os.urandom(5)).decode('utf-8')
filename = dir_ + res_template(self.filename_template, params,
hostname=self.hostname,
timestamp=timestamp,
random=randstr)
return filename
def allow_new_file(self, filename, params):
return True
def _open_file(self, filename, params):
path, name = os.path.split(filename)
try:
os.makedirs(path)
except:
pass
fh = open(filename, 'a+b')
if self.dedup_index:
self.dedup_index.add_warc_file(filename, params)
return fh
def _close_file(self, fh):
try:
portalocker.lock(fh, portalocker.LOCK_UN)
fh.close()
except Exception as e:
print(e)
def get_dir_key(self, params):
return res_template(self.key_template, params)
def close_key(self, dir_key):
if isinstance(dir_key, dict):
dir_key = self.get_dir_key(dir_key)
result = self.fh_cache.pop(dir_key, None)
if not result:
return
out, filename = result
self._close_file(out)
return filename
def close_file(self, match_filename):
for dir_key, out, filename in self.iter_open_files():
if filename == match_filename:
return self.close_key(dir_key)
def _is_write_resp(self, resp, params):
return True
def _is_write_req(self, req, params):
return True
def write_record(self, record, params=None):
params = params or {}
self._do_write_req_resp(None, record, params)
def _do_write_req_resp(self, req, resp, params):
def write_callback(out, filename):
#url = resp.rec_headers.get_header('WARC-Target-URI')
#print('Writing req/resp {0} to {1} '.format(url, filename))
if resp and self._is_write_resp(resp, params):
self._write_warc_record(out, resp)
if req and self._is_write_req(req, params):
self._write_warc_record(out, req)
return self._write_to_file(params, write_callback)
def write_stream_to_file(self, params, stream):
def write_callback(out, filename):
#print('Writing stream to {0}'.format(filename))
shutil.copyfileobj(stream, out)
return self._write_to_file(params, write_callback)
def _write_to_file(self, params, write_callback):
full_dir = res_template(self.dir_template, params)
dir_key = self.get_dir_key(params)
result = self.fh_cache.get(dir_key)
close_file = False
if result:
out, filename = result
is_new = False
else:
filename = self.get_new_filename(full_dir, params)
if not self.allow_new_file(filename, params):
return False
out = self._open_file(filename, params)
is_new = True
try:
start = out.tell()
write_callback(out, filename)
out.flush()
new_size = out.tell()
out.seek(start)
if self.dedup_index:
self.dedup_index.add_urls_to_index(out, params,
filename,
new_size - start)
return True
except Exception as e:
traceback.print_exc()
close_file = True
return False
finally:
# check for rollover
if self.max_size and new_size > self.max_size:
close_file = True
if close_file:
self._close_file(out)
if not is_new:
self.fh_cache.pop(dir_key, None)
elif is_new:
portalocker.lock(out, portalocker.LOCK_EX | portalocker.LOCK_NB)
self.fh_cache[dir_key] = (out, filename)
def iter_open_files(self):
for n, v in list(self.fh_cache.items()):
out, filename = v
yield n, out, filename
def close(self):
for dir_key, out, filename in self.iter_open_files():
self._close_file(out)
self.fh_cache = {}
def close_idle_files(self):
if not self.max_idle_time:
return
now = datetime.datetime.now()
for dir_key, out, filename in self.iter_open_files():
try:
mtime = os.path.getmtime(filename)
except:
self.close_key(dir_key)
return
mtime = datetime.datetime.fromtimestamp(mtime)
if (now - mtime) > self.max_idle_time:
print('Closing idle ' + filename)
self.close_key(dir_key)
# ============================================================================
class PerRecordWARCWriter(MultiFileWARCWriter):
def __init__(self, *args, **kwargs):
kwargs['max_size'] = 1
super(PerRecordWARCWriter, self).__init__(*args, **kwargs)

View File

@ -3,7 +3,7 @@ from gevent import monkey; monkey.patch_all()
from pywb.recorder.recorderapp import RecorderApp
from pywb.recorder.redisindexer import WritableRedisIndexer
from pywb.recorder.warcwriter import MultiFileWARCWriter
from pywb.recorder.multifilewarcwriter import MultiFileWARCWriter
from pywb.recorder.filters import SkipDupePolicy
import atexit

View File

@ -13,7 +13,7 @@ from fakeredis import FakeStrictRedis
from pywb.recorder.recorderapp import RecorderApp
from pywb.recorder.redisindexer import WritableRedisIndexer
from pywb.recorder.warcwriter import PerRecordWARCWriter, MultiFileWARCWriter, SimpleTempWARCWriter
from pywb.recorder.multifilewarcwriter import PerRecordWARCWriter, MultiFileWARCWriter
from pywb.recorder.filters import ExcludeSpecificHeaders
from pywb.recorder.filters import SkipDupePolicy, WriteDupePolicy, WriteRevisitDupePolicy

View File

@ -7,7 +7,7 @@ import re
import time
import datetime
import calendar
from six.moves import map
from email.utils import parsedate, formatdate
#=================================================================
@ -37,7 +37,7 @@ def iso_date_to_datetime(string):
if nums[-1] == '':
nums = nums[:-1]
the_datetime = datetime.datetime(*map(int, nums))
the_datetime = datetime.datetime(*(int(num) for num in nums))
return the_datetime

342
pywb/warc/archiveindexer.py Normal file
View File

@ -0,0 +1,342 @@
from pywb.utils.timeutils import iso_date_to_timestamp
from pywb.utils.canonicalize import canonicalize
from pywb.utils.loaders import extract_post_query, append_post_query
from pywb.warc.archiveiterator import ArchiveIterator
import hashlib
import base64
import six
import re
import sys
try: # pragma: no cover
from collections import OrderedDict
except ImportError: # pragma: no cover
from ordereddict import OrderedDict
#=================================================================
class ArchiveIndexEntryMixin(object):
MIME_RE = re.compile('[; ]')
def __init__(self):
super(ArchiveIndexEntryMixin, self).__init__()
self.reset_entry()
def reset_entry(self):
self['urlkey'] = ''
self['metadata'] = ''
self.buffer = None
self.record = None
def extract_mime(self, mime, def_mime='unk'):
""" Utility function to extract mimetype only
from a full content type, removing charset settings
"""
self['mime'] = def_mime
if mime:
self['mime'] = self.MIME_RE.split(mime, 1)[0]
self['_content_type'] = mime
def extract_status(self, status_headers):
""" Extract status code only from status line
"""
self['status'] = status_headers.get_statuscode()
if not self['status']:
self['status'] = '-'
elif self['status'] == '204' and 'Error' in status_headers.statusline:
self['status'] = '-'
def set_rec_info(self, offset, length):
self['length'] = str(length)
self['offset'] = str(offset)
def merge_request_data(self, other, options):
surt_ordered = options.get('surt_ordered', True)
if other.record.rec_type != 'request':
return False
# two requests, not correct
if self.record.rec_type == 'request':
return False
# merge POST/PUT body query
post_query = other.get('_post_query')
if post_query:
url = append_post_query(self['url'], post_query)
self['urlkey'] = canonicalize(url, surt_ordered)
other['urlkey'] = self['urlkey']
referer = other.record.status_headers.get_header('referer')
if referer:
self['_referer'] = referer
return True
#=================================================================
class DefaultRecordParser(object):
def __init__(self, **options):
self.options = options
self.entry_cache = {}
self.digester = None
self.buff = None
def _create_index_entry(self, rec_type):
try:
entry = self.entry_cache[rec_type]
entry.reset_entry()
except:
if self.options.get('cdxj'):
entry = OrderedArchiveIndexEntry()
else:
entry = ArchiveIndexEntry()
# don't reuse when using append post
# entry may be cached
if not self.options.get('append_post'):
self.entry_cache[rec_type] = entry
return entry
def begin_payload(self, compute_digest, entry):
if compute_digest:
self.digester = hashlib.sha1()
else:
self.digester = None
self.entry = entry
entry.buffer = self.create_payload_buffer(entry)
def handle_payload(self, buff):
if self.digester:
self.digester.update(buff)
if self.entry and self.entry.buffer:
self.entry.buffer.write(buff)
def end_payload(self, entry):
if self.digester:
entry['digest'] = base64.b32encode(self.digester.digest()).decode('ascii')
self.entry = None
def create_payload_buffer(self, entry):
return None
def create_record_iter(self, raw_iter):
append_post = self.options.get('append_post')
include_all = self.options.get('include_all')
surt_ordered = self.options.get('surt_ordered', True)
minimal = self.options.get('minimal')
if append_post and minimal:
raise Exception('Sorry, minimal index option and ' +
'append POST options can not be used together')
for record in raw_iter:
entry = None
if not include_all and not minimal and (record.status_headers.get_statuscode() == '-'):
continue
if record.rec_type == 'arc_header':
continue
if record.format == 'warc':
if (record.rec_type in ('request', 'warcinfo') and
not include_all and
not append_post):
continue
elif (not include_all and
record.content_type == 'application/warc-fields'):
continue
entry = self.parse_warc_record(record)
elif record.format == 'arc':
entry = self.parse_arc_record(record)
if not entry:
continue
if entry.get('url') and not entry.get('urlkey'):
entry['urlkey'] = canonicalize(entry['url'], surt_ordered)
compute_digest = False
if (entry.get('digest', '-') == '-' and
record.rec_type not in ('revisit', 'request', 'warcinfo')):
compute_digest = True
elif not minimal and record.rec_type == 'request' and append_post:
method = record.status_headers.protocol
len_ = record.status_headers.get_header('Content-Length')
post_query = extract_post_query(method,
entry.get('_content_type'),
len_,
record.stream)
entry['_post_query'] = post_query
entry.record = record
self.begin_payload(compute_digest, entry)
raw_iter.read_to_end(record, self.handle_payload)
entry.set_rec_info(*raw_iter.member_info)
self.end_payload(entry)
yield entry
def join_request_records(self, entry_iter):
prev_entry = None
for entry in entry_iter:
if not prev_entry:
prev_entry = entry
continue
# check for url match
if (entry['url'] != prev_entry['url']):
pass
# check for concurrency also
elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') !=
prev_entry.record.rec_headers.get_header('WARC-Record-ID')):
pass
elif (entry.merge_request_data(prev_entry, self.options) or
prev_entry.merge_request_data(entry, self.options)):
yield prev_entry
yield entry
prev_entry = None
continue
yield prev_entry
prev_entry = entry
if prev_entry:
yield prev_entry
#=================================================================
def parse_warc_record(self, record):
""" Parse warc record
"""
entry = self._create_index_entry(record.rec_type)
if record.rec_type == 'warcinfo':
entry['url'] = record.rec_headers.get_header('WARC-Filename')
entry['urlkey'] = entry['url']
entry['_warcinfo'] = record.stream.read(record.length)
return entry
entry['url'] = record.rec_headers.get_header('WARC-Target-Uri')
# timestamp
entry['timestamp'] = iso_date_to_timestamp(record.rec_headers.
get_header('WARC-Date'))
# mime
if record.rec_type == 'revisit':
entry['mime'] = 'warc/revisit'
elif self.options.get('minimal'):
entry['mime'] = '-'
else:
def_mime = '-' if record.rec_type == 'request' else 'unk'
entry.extract_mime(record.status_headers.
get_header('Content-Type'),
def_mime)
# status -- only for response records (by convention):
if record.rec_type == 'response' and not self.options.get('minimal'):
entry.extract_status(record.status_headers)
else:
entry['status'] = '-'
# digest
digest = record.rec_headers.get_header('WARC-Payload-Digest')
entry['digest'] = digest
if digest and digest.startswith('sha1:'):
entry['digest'] = digest[len('sha1:'):]
elif not entry.get('digest'):
entry['digest'] = '-'
# optional json metadata, if present
metadata = record.rec_headers.get_header('WARC-Json-Metadata')
if metadata:
entry['metadata'] = metadata
return entry
#=================================================================
def parse_arc_record(self, record):
""" Parse arc record
"""
url = record.rec_headers.get_header('uri')
url = url.replace('\r', '%0D')
url = url.replace('\n', '%0A')
# replace formfeed
url = url.replace('\x0c', '%0C')
# replace nulls
url = url.replace('\x00', '%00')
entry = self._create_index_entry(record.rec_type)
entry['url'] = url
# timestamp
entry['timestamp'] = record.rec_headers.get_header('archive-date')
if len(entry['timestamp']) > 14:
entry['timestamp'] = entry['timestamp'][:14]
if not self.options.get('minimal'):
# mime
entry.extract_mime(record.rec_headers.get_header('content-type'))
# status
entry.extract_status(record.status_headers)
# digest
entry['digest'] = '-'
return entry
def __call__(self, fh):
aiter = ArchiveIterator(fh, self.options.get('minimal', False),
self.options.get('verify_http', False),
self.options.get('arc2warc', False))
entry_iter = self.create_record_iter(aiter)
if self.options.get('append_post'):
entry_iter = self.join_request_records(entry_iter)
for entry in entry_iter:
if (entry.record.rec_type in ('request', 'warcinfo') and
not self.options.get('include_all')):
continue
yield entry
def open(self, filename):
with open(filename, 'rb') as fh:
for entry in self(fh):
yield entry
class ArchiveIndexEntry(ArchiveIndexEntryMixin, dict):
pass
class OrderedArchiveIndexEntry(ArchiveIndexEntryMixin, OrderedDict):
pass

View File

@ -1,22 +1,10 @@
from pywb.utils.timeutils import iso_date_to_timestamp
from pywb.utils.bufferedreaders import DecompressingBufferedReader
from pywb.utils.canonicalize import canonicalize
from pywb.utils.loaders import extract_post_query, append_post_query
from pywb.warc.recordloader import ArcWarcRecordLoader
import hashlib
import base64
import six
import re
import sys
try: # pragma: no cover
from collections import OrderedDict
except ImportError: # pragma: no cover
from ordereddict import OrderedDict
# ============================================================================
BUFF_SIZE = 16384
@ -243,326 +231,3 @@ class ArchiveIterator(six.Iterator):
return record
#=================================================================
class ArchiveIndexEntryMixin(object):
MIME_RE = re.compile('[; ]')
def __init__(self):
super(ArchiveIndexEntryMixin, self).__init__()
self.reset_entry()
def reset_entry(self):
self['urlkey'] = ''
self['metadata'] = ''
self.buffer = None
self.record = None
def extract_mime(self, mime, def_mime='unk'):
""" Utility function to extract mimetype only
from a full content type, removing charset settings
"""
self['mime'] = def_mime
if mime:
self['mime'] = self.MIME_RE.split(mime, 1)[0]
self['_content_type'] = mime
def extract_status(self, status_headers):
""" Extract status code only from status line
"""
self['status'] = status_headers.get_statuscode()
if not self['status']:
self['status'] = '-'
elif self['status'] == '204' and 'Error' in status_headers.statusline:
self['status'] = '-'
def set_rec_info(self, offset, length):
self['length'] = str(length)
self['offset'] = str(offset)
def merge_request_data(self, other, options):
surt_ordered = options.get('surt_ordered', True)
if other.record.rec_type != 'request':
return False
# two requests, not correct
if self.record.rec_type == 'request':
return False
# merge POST/PUT body query
post_query = other.get('_post_query')
if post_query:
url = append_post_query(self['url'], post_query)
self['urlkey'] = canonicalize(url, surt_ordered)
other['urlkey'] = self['urlkey']
referer = other.record.status_headers.get_header('referer')
if referer:
self['_referer'] = referer
return True
#=================================================================
class DefaultRecordParser(object):
def __init__(self, **options):
self.options = options
self.entry_cache = {}
self.digester = None
self.buff = None
def _create_index_entry(self, rec_type):
try:
entry = self.entry_cache[rec_type]
entry.reset_entry()
except:
if self.options.get('cdxj'):
entry = OrderedArchiveIndexEntry()
else:
entry = ArchiveIndexEntry()
# don't reuse when using append post
# entry may be cached
if not self.options.get('append_post'):
self.entry_cache[rec_type] = entry
return entry
def begin_payload(self, compute_digest, entry):
if compute_digest:
self.digester = hashlib.sha1()
else:
self.digester = None
self.entry = entry
entry.buffer = self.create_payload_buffer(entry)
def handle_payload(self, buff):
if self.digester:
self.digester.update(buff)
if self.entry and self.entry.buffer:
self.entry.buffer.write(buff)
def end_payload(self, entry):
if self.digester:
entry['digest'] = base64.b32encode(self.digester.digest()).decode('ascii')
self.entry = None
def create_payload_buffer(self, entry):
return None
def create_record_iter(self, raw_iter):
append_post = self.options.get('append_post')
include_all = self.options.get('include_all')
surt_ordered = self.options.get('surt_ordered', True)
minimal = self.options.get('minimal')
if append_post and minimal:
raise Exception('Sorry, minimal index option and ' +
'append POST options can not be used together')
for record in raw_iter:
entry = None
if not include_all and not minimal and (record.status_headers.get_statuscode() == '-'):
continue
if record.rec_type == 'arc_header':
continue
if record.format == 'warc':
if (record.rec_type in ('request', 'warcinfo') and
not include_all and
not append_post):
continue
elif (not include_all and
record.content_type == 'application/warc-fields'):
continue
entry = self.parse_warc_record(record)
elif record.format == 'arc':
entry = self.parse_arc_record(record)
if not entry:
continue
if entry.get('url') and not entry.get('urlkey'):
entry['urlkey'] = canonicalize(entry['url'], surt_ordered)
compute_digest = False
if (entry.get('digest', '-') == '-' and
record.rec_type not in ('revisit', 'request', 'warcinfo')):
compute_digest = True
elif not minimal and record.rec_type == 'request' and append_post:
method = record.status_headers.protocol
len_ = record.status_headers.get_header('Content-Length')
post_query = extract_post_query(method,
entry.get('_content_type'),
len_,
record.stream)
entry['_post_query'] = post_query
entry.record = record
self.begin_payload(compute_digest, entry)
raw_iter.read_to_end(record, self.handle_payload)
entry.set_rec_info(*raw_iter.member_info)
self.end_payload(entry)
yield entry
def join_request_records(self, entry_iter):
prev_entry = None
for entry in entry_iter:
if not prev_entry:
prev_entry = entry
continue
# check for url match
if (entry['url'] != prev_entry['url']):
pass
# check for concurrency also
elif (entry.record.rec_headers.get_header('WARC-Concurrent-To') !=
prev_entry.record.rec_headers.get_header('WARC-Record-ID')):
pass
elif (entry.merge_request_data(prev_entry, self.options) or
prev_entry.merge_request_data(entry, self.options)):
yield prev_entry
yield entry
prev_entry = None
continue
yield prev_entry
prev_entry = entry
if prev_entry:
yield prev_entry
#=================================================================
def parse_warc_record(self, record):
""" Parse warc record
"""
entry = self._create_index_entry(record.rec_type)
if record.rec_type == 'warcinfo':
entry['url'] = record.rec_headers.get_header('WARC-Filename')
entry['urlkey'] = entry['url']
entry['_warcinfo'] = record.stream.read(record.length)
return entry
entry['url'] = record.rec_headers.get_header('WARC-Target-Uri')
# timestamp
entry['timestamp'] = iso_date_to_timestamp(record.rec_headers.
get_header('WARC-Date'))
# mime
if record.rec_type == 'revisit':
entry['mime'] = 'warc/revisit'
elif self.options.get('minimal'):
entry['mime'] = '-'
else:
def_mime = '-' if record.rec_type == 'request' else 'unk'
entry.extract_mime(record.status_headers.
get_header('Content-Type'),
def_mime)
# status -- only for response records (by convention):
if record.rec_type == 'response' and not self.options.get('minimal'):
entry.extract_status(record.status_headers)
else:
entry['status'] = '-'
# digest
digest = record.rec_headers.get_header('WARC-Payload-Digest')
entry['digest'] = digest
if digest and digest.startswith('sha1:'):
entry['digest'] = digest[len('sha1:'):]
elif not entry.get('digest'):
entry['digest'] = '-'
# optional json metadata, if present
metadata = record.rec_headers.get_header('WARC-Json-Metadata')
if metadata:
entry['metadata'] = metadata
return entry
#=================================================================
def parse_arc_record(self, record):
""" Parse arc record
"""
url = record.rec_headers.get_header('uri')
url = url.replace('\r', '%0D')
url = url.replace('\n', '%0A')
# replace formfeed
url = url.replace('\x0c', '%0C')
# replace nulls
url = url.replace('\x00', '%00')
entry = self._create_index_entry(record.rec_type)
entry['url'] = url
# timestamp
entry['timestamp'] = record.rec_headers.get_header('archive-date')
if len(entry['timestamp']) > 14:
entry['timestamp'] = entry['timestamp'][:14]
if not self.options.get('minimal'):
# mime
entry.extract_mime(record.rec_headers.get_header('content-type'))
# status
entry.extract_status(record.status_headers)
# digest
entry['digest'] = '-'
return entry
def __call__(self, fh):
aiter = ArchiveIterator(fh, self.options.get('minimal', False),
self.options.get('verify_http', False),
self.options.get('arc2warc', False))
entry_iter = self.create_record_iter(aiter)
if self.options.get('append_post'):
entry_iter = self.join_request_records(entry_iter)
for entry in entry_iter:
if (entry.record.rec_type in ('request', 'warcinfo') and
not self.options.get('include_all')):
continue
yield entry
def open(self, filename):
with open(filename, 'rb') as fh:
for entry in self(fh):
yield entry
class ArchiveIndexEntry(ArchiveIndexEntryMixin, dict):
pass
class OrderedArchiveIndexEntry(ArchiveIndexEntryMixin, OrderedDict):
pass

View File

@ -31,7 +31,7 @@ from bisect import insort
from six import StringIO
from pywb.warc.archiveiterator import DefaultRecordParser
from pywb.warc.archiveindexer import DefaultRecordParser
import codecs
import six

View File

@ -1,5 +1,5 @@
from pywb.utils.statusandheaders import StatusAndHeaders
from pywb.recorder.warcwriter import SimpleTempWARCWriter
from pywb.warc.warcwriter import BufferWARCWriter
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.warc.archiveiterator import ArchiveIterator
@ -9,7 +9,7 @@ import json
# ============================================================================
class FixedTestWARCWriter(SimpleTempWARCWriter):
class FixedTestWARCWriter(BufferWARCWriter):
@classmethod
def _make_warc_id(cls, id_=None):
return '<urn:uuid:12345678-feb0-11e6-8f83-68a86d1772ce>'
@ -36,7 +36,7 @@ class TestWarcWriter(object):
record = simplewriter.create_warcinfo_record('testfile.warc.gz', params)
simplewriter.write_record(record)
buff = simplewriter.get_buffer()
buff = simplewriter.get_contents()
assert isinstance(buff, bytes)
buff = BytesIO(buff)
@ -71,7 +71,7 @@ json-metadata: {"foo": "bar"}\r\n\
\r\n\
'
assert simplewriter.get_buffer().decode('utf-8') == warcinfo_record
assert simplewriter.get_contents().decode('utf-8') == warcinfo_record
def test_generate_response(self):
headers_list = [('Content-Type', 'text/plain; charset="UTF-8"'),
@ -93,7 +93,7 @@ json-metadata: {"foo": "bar"}\r\n\
writer.write_record(record)
buff = writer.get_buffer()
buff = writer.get_contents()
self._validate_record_content_len(BytesIO(buff))

View File

@ -4,31 +4,24 @@ import base64
import hashlib
import datetime
import zlib
import sys
import os
import six
import shutil
import traceback
from socket import gethostname
from io import BytesIO
import portalocker
from pywb.utils.loaders import LimitReader, to_native_str
from pywb.utils.bufferedreaders import BufferedReader
from pywb.utils.timeutils import timestamp20_now, datetime_to_iso_date
from pywb.utils.loaders import to_native_str
from pywb.utils.timeutils import datetime_to_iso_date
from pywb.utils.statusandheaders import StatusAndHeadersParser, StatusAndHeaders
from pywb.warc.recordloader import ArcWarcRecord
from pywb.warc.recordloader import ArcWarcRecordLoader
from pywb.webagg.utils import res_template, BUFF_SIZE
# ============================================================================
class BaseWARCWriter(object):
BUFF_SIZE = 16384
WARC_RECORDS = {'warcinfo': 'application/warc-fields',
'response': 'application/http; msgtype=response',
'revisit': 'application/http; msgtype=response',
@ -38,25 +31,20 @@ class BaseWARCWriter(object):
REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/uri-agnostic-identical-payload-digest'
FILE_TEMPLATE = 'rec-{timestamp}-{hostname}.warc.gz'
WARC_VERSION = 'WARC/1.0'
def __init__(self, gzip=True, dedup_index=None,
header_filter=None, *args, **kwargs):
def __init__(self, gzip=True, header_filter=None, *args, **kwargs):
self.gzip = gzip
self.dedup_index = dedup_index
self.header_filter = header_filter
self.hostname = gethostname()
self.parser = StatusAndHeadersParser([], verify=False)
self.warc_version = kwargs.get('warc_version', self.WARC_VERSION)
@staticmethod
def _iter_stream(stream):
@classmethod
def _iter_stream(cls, stream):
while True:
buf = stream.read(BUFF_SIZE)
buf = stream.read(cls.BUFF_SIZE)
if not buf:
return
@ -94,25 +82,6 @@ class BaseWARCWriter(object):
buff = record.status_headers.to_bytes(exclude_list)
record.status_headers.headers_buff = buff
def write_req_resp(self, req, resp, params):
url = resp.rec_headers.get_header('WARC-Target-URI')
dt = resp.rec_headers.get_header('WARC-Date')
#req.rec_headers['Content-Type'] = req.content_type
req.rec_headers.replace_header('WARC-Target-URI', url)
req.rec_headers.replace_header('WARC-Date', dt)
resp_id = resp.rec_headers.get_header('WARC-Record-ID')
if resp_id:
req.rec_headers.add_header('WARC-Concurrent-To', resp_id)
resp = self._check_revisit(resp, params)
if not resp:
print('Skipping due to dedup')
return
self._do_write_req_resp(req, resp, params)
def create_warcinfo_record(self, filename, info):
warc_headers = StatusAndHeaders(self.warc_version, [])
warc_headers.add_header('WARC-Type', 'warcinfo')
@ -182,31 +151,6 @@ class BaseWARCWriter(object):
return record
def _check_revisit(self, record, params):
if not self.dedup_index:
return record
try:
url = record.rec_headers.get_header('WARC-Target-URI')
digest = record.rec_headers.get_header('WARC-Payload-Digest')
iso_dt = record.rec_headers.get_header('WARC-Date')
result = self.dedup_index.lookup_revisit(params, digest, url, iso_dt)
except Exception as e:
traceback.print_exc()
result = None
if result == 'skip':
return None
if isinstance(result, tuple) and result[0] == 'revisit':
record.rec_headers.replace_header('WARC-Type', 'revisit')
record.rec_headers.add_header('WARC-Profile', self.REVISIT_PROFILE)
record.rec_headers.add_header('WARC-Refers-To-Target-URI', result[1])
record.rec_headers.add_header('WARC-Refers-To-Date', result[2])
return record
def _write_warc_record(self, out, record, adjust_cl=True):
if self.gzip:
out = GzippingWrapper(out)
@ -321,231 +265,40 @@ class Digester(object):
# ============================================================================
class MultiFileWARCWriter(BaseWARCWriter):
def __init__(self, dir_template, filename_template=None, max_size=0,
max_idle_secs=1800, *args, **kwargs):
super(MultiFileWARCWriter, self).__init__(*args, **kwargs)
if not filename_template:
dir_template, filename_template = os.path.split(dir_template)
dir_template += os.path.sep
if not filename_template:
filename_template = self.FILE_TEMPLATE
self.dir_template = dir_template
self.key_template = kwargs.get('key_template', self.dir_template)
self.filename_template = filename_template
self.max_size = max_size
if max_idle_secs > 0:
self.max_idle_time = datetime.timedelta(seconds=max_idle_secs)
else:
self.max_idle_time = None
self.fh_cache = {}
def get_new_filename(self, dir_, params):
timestamp = timestamp20_now()
randstr = base64.b32encode(os.urandom(5)).decode('utf-8')
filename = dir_ + res_template(self.filename_template, params,
hostname=self.hostname,
timestamp=timestamp,
random=randstr)
return filename
def allow_new_file(self, filename, params):
return True
def _open_file(self, filename, params):
path, name = os.path.split(filename)
try:
os.makedirs(path)
except:
pass
fh = open(filename, 'a+b')
if self.dedup_index:
self.dedup_index.add_warc_file(filename, params)
return fh
def _close_file(self, fh):
try:
portalocker.lock(fh, portalocker.LOCK_UN)
fh.close()
except Exception as e:
print(e)
def get_dir_key(self, params):
return res_template(self.key_template, params)
def close_key(self, dir_key):
if isinstance(dir_key, dict):
dir_key = self.get_dir_key(dir_key)
result = self.fh_cache.pop(dir_key, None)
if not result:
return
out, filename = result
self._close_file(out)
return filename
def close_file(self, match_filename):
for dir_key, out, filename in self.iter_open_files():
if filename == match_filename:
return self.close_key(dir_key)
def _is_write_resp(self, resp, params):
return True
def _is_write_req(self, req, params):
return True
def write_record(self, record, params=None):
params = params or {}
self._do_write_req_resp(None, record, params)
def _do_write_req_resp(self, req, resp, params):
def write_callback(out, filename):
#url = resp.rec_headers.get_header('WARC-Target-URI')
#print('Writing req/resp {0} to {1} '.format(url, filename))
if resp and self._is_write_resp(resp, params):
self._write_warc_record(out, resp)
if req and self._is_write_req(req, params):
self._write_warc_record(out, req)
return self._write_to_file(params, write_callback)
def write_stream_to_file(self, params, stream):
def write_callback(out, filename):
#print('Writing stream to {0}'.format(filename))
shutil.copyfileobj(stream, out)
return self._write_to_file(params, write_callback)
def _write_to_file(self, params, write_callback):
full_dir = res_template(self.dir_template, params)
dir_key = self.get_dir_key(params)
result = self.fh_cache.get(dir_key)
close_file = False
if result:
out, filename = result
is_new = False
else:
filename = self.get_new_filename(full_dir, params)
if not self.allow_new_file(filename, params):
return False
out = self._open_file(filename, params)
is_new = True
try:
start = out.tell()
write_callback(out, filename)
out.flush()
new_size = out.tell()
out.seek(start)
if self.dedup_index:
self.dedup_index.add_urls_to_index(out, params,
filename,
new_size - start)
return True
except Exception as e:
traceback.print_exc()
close_file = True
return False
finally:
# check for rollover
if self.max_size and new_size > self.max_size:
close_file = True
if close_file:
self._close_file(out)
if not is_new:
self.fh_cache.pop(dir_key, None)
elif is_new:
portalocker.lock(out, portalocker.LOCK_EX | portalocker.LOCK_NB)
self.fh_cache[dir_key] = (out, filename)
def iter_open_files(self):
for n, v in list(self.fh_cache.items()):
out, filename = v
yield n, out, filename
def close(self):
for dir_key, out, filename in self.iter_open_files():
self._close_file(out)
self.fh_cache = {}
def close_idle_files(self):
if not self.max_idle_time:
return
now = datetime.datetime.now()
for dir_key, out, filename in self.iter_open_files():
try:
mtime = os.path.getmtime(filename)
except:
self.close_key(dir_key)
return
mtime = datetime.datetime.fromtimestamp(mtime)
if (now - mtime) > self.max_idle_time:
print('Closing idle ' + filename)
self.close_key(dir_key)
# ============================================================================
class PerRecordWARCWriter(MultiFileWARCWriter):
class BufferWARCWriter(BaseWARCWriter):
def __init__(self, *args, **kwargs):
kwargs['max_size'] = 1
super(PerRecordWARCWriter, self).__init__(*args, **kwargs)
# ============================================================================
class SimpleTempWARCWriter(BaseWARCWriter):
def __init__(self, *args, **kwargs):
super(SimpleTempWARCWriter, self).__init__(*args, **kwargs)
super(BufferWARCWriter, self).__init__(*args, **kwargs)
self.out = self._create_buffer()
def _create_buffer(self):
return tempfile.SpooledTemporaryFile(max_size=512*1024)
def _do_write_req_resp(self, req, resp, params):
self._write_warc_record(self.out, resp)
self._write_warc_record(self.out, req)
def write_record(self, record, params=None):
def write_record(self, record):
self._write_warc_record(self.out, record)
def get_buffer(self):
def get_contents(self):
pos = self.out.tell()
self.out.seek(0)
buff = self.out.read()
self.out.seek(pos)
return buff
# ============================================================================
class FileWARCWriter(BufferWARCWriter):
def __init__(self, *args, **kwargs):
file_or_buff = None
if len(args) > 0:
file_or_buff = args[0]
else:
file_or_buff = kwargs.get('file')
if isinstance(file_or_buff, str):
self.out = open(file_or_buff, 'rb')
elif hasattr(file_or_buff, 'read'):
self.out = file_or_buff
else:
raise Exception('file must be a readable or valid filename')