diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..1276f2e --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +recursive-include tests *.py *.sh Dockerfile diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..df07c14 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[aliases] +test=pytest + +[tool:pytest] +addopts=-v +testpaths=tests diff --git a/setup.py b/setup.py index 9b61523..66e8a07 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ ''' setup.py - setuptools installation configuration for warcprox -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -22,18 +22,6 @@ USA. import sys import setuptools -import setuptools.command.test - -class PyTest(setuptools.command.test.test): - def finalize_options(self): - setuptools.command.test.test.finalize_options(self) - self.test_args = [] - self.test_suite = True - def run_tests(self): - # import here, because outside the eggs aren't loaded - import pytest - errno = pytest.main(self.test_args) - sys.exit(errno) deps = [ 'certauth==1.1.6', @@ -52,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b1.dev144', + version='2.4b2.dev149', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', @@ -61,9 +49,8 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, + setup_requires=['pytest-runner'], tests_require=['mock', 'pytest', 'warcio'], - cmdclass = {'test': PyTest}, - test_suite='warcprox.tests', entry_points={ 'console_scripts': [ 'warcprox=warcprox.main:main', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 886902f..da36df3 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -249,6 +249,14 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): elif self.path == '/empty-response': headers = b'' payload = b'' + elif self.path == '/slow-response': + time.sleep(6) + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'\r\n') + payload = b'Test.' + actual_headers = (b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') else: payload = b'404 Not Found\n' headers = (b'HTTP/1.1 404 Not Found\r\n' @@ -356,7 +364,8 @@ def warcprox_(request): '--port=0', '--playback-port=0', '--onion-tor-socks-proxy=localhost:9050', - '--crawl-log-dir=crawl-logs'] + '--crawl-log-dir=crawl-logs', + '--socket-timeout=4'] if request.config.getoption('--rethinkdb-dedup-url'): argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) # test these here only @@ -758,10 +767,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) # close the warc - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] + warc = writer._available_warcs.queue[0] + warc_path = os.path.join(warc.directory, warc.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -1380,20 +1391,16 @@ def test_controller_with_defaults(): assert controller.proxy.server_port == 8000 assert controller.proxy.running_stats assert not controller.proxy.stats_db - wwt = controller.warc_writer_thread - assert wwt - assert wwt.inq - assert wwt.outq - assert wwt.writer_pool - assert wwt.writer_pool.default_warc_writer - assert wwt.writer_pool.default_warc_writer.directory == './warcs' - assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None - assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 - assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' - assert wwt.writer_pool.default_warc_writer.gzip is False - assert wwt.writer_pool.default_warc_writer.record_builder - assert not wwt.writer_pool.default_warc_writer.record_builder.base32 - assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + wwp = controller.warc_writer_processor + assert wwp + assert wwp.inq + assert wwp.outq + assert wwp.writer_pool + assert wwp.writer_pool.default_warc_writer + assert wwp.writer_pool.default_warc_writer.gzip is False + assert wwp.writer_pool.default_warc_writer.record_builder + assert not wwp.writer_pool.default_warc_writer.record_builder.base32 + assert wwp.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_load_plugin(): options = warcprox.Options(port=0, plugins=[ @@ -1473,7 +1480,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath + warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: @@ -1691,10 +1698,11 @@ def test_long_warcprox_meta( wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that warcprox-meta was parsed and honored ("warc-prefix" param) - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] - warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] + warc = writer._available_warcs.queue[0] + warc_path = os.path.join(warc.directory, warc.finalname) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -1711,6 +1719,16 @@ def test_long_warcprox_meta( with pytest.raises(StopIteration): next(rec_iter) +def test_socket_timeout_response( + warcprox_, http_daemon, https_daemon, archiving_proxies, + playback_proxies): + """Response will timeout because we use --socket-timeout=4 whereas the + target URL will return after 6 sec. + """ + url = 'http://localhost:%s/slow-response' % http_daemon.server_port + response = requests.get(url, proxies=archiving_proxies, verify=False) + assert response.status_code == 502 + def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): diff --git a/tests/test_writer.py b/tests/test_writer.py index 0a18c33..fb39378 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -61,7 +61,8 @@ def test_warc_writer_locking(tmpdir): timestamp=datetime.utcnow()) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) - wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) + wwriter = WarcWriter(Options( + directory=dirname, no_warc_open_suffix=True, writer_threads=1)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] assert warcs @@ -93,7 +94,8 @@ def test_special_dont_write_prefix(): logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - wwt = warcprox.writerthread.WarcWriterThread(Options(prefix='-')) + wwt = warcprox.writerthread.WarcWriterProcessor( + Options(prefix='-', writer_threads=1)) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -126,7 +128,8 @@ def test_special_dont_write_prefix(): wwt.stop.set() wwt.join() - wwt = warcprox.writerthread.WarcWriterThread() + wwt = warcprox.writerthread.WarcWriterProcessor( + Options(writer_threads=1)) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -172,8 +175,11 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) + warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}', + writer_threads=1)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs - assert re.search('\d{17}_foo_\d{14}_00000.warc.open', wwriter._fpath) + assert re.search( + r'\d{17}_foo_\d{14}_00000.warc.open', + wwriter._available_warcs.queue[0].path) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 39b31ea..33af61a 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -237,6 +237,14 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.logger.error( '%s raised exception', listener.stop, exc_info=True) +def timestamp17(): + now = datetime.datetime.utcnow() + return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) + +def timestamp14(): + now = datetime.datetime.utcnow() + return '{:%Y%m%d%H%M%S}'.format(now) + # monkey-patch log levels TRACE and NOTICE TRACE = 5 def _logger_trace(self, msg, *args, **kwargs): diff --git a/warcprox/controller.py b/warcprox/controller.py index 85fa42d..644fdec 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -57,7 +57,6 @@ class Factory: @staticmethod def stats_processor(options): - # return warcprox.stats.StatsProcessor(options) if options.rethinkdb_stats_url: stats_processor = warcprox.stats.RethinkStatsProcessor(options) elif options.stats_db_file in (None, '', '/dev/null'): @@ -68,8 +67,8 @@ class Factory: return stats_processor @staticmethod - def warc_writer(options): - return warcprox.writerthread.WarcWriterThread(options) + def warc_writer_processor(options): + return warcprox.writerthread.WarcWriterProcessor(options) @staticmethod def playback_proxy(ca, options): @@ -142,6 +141,12 @@ class WarcproxController(object): self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) + # default number of warc writer threads = sqrt(proxy.max_threads) + # pulled out of thin air because it strikes me as reasonable + # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + if not self.options.writer_threads: + self.options.writer_threads = int(self.proxy.max_threads ** 0.5) + self.build_postfetch_chain(self.proxy.recorded_url_q) self.service_registry = Factory.service_registry(options) @@ -181,8 +186,8 @@ class WarcproxController(object): if self.dedup_db: self._postfetch_chain.append(self.dedup_db.loader()) - self.warc_writer_thread = Factory.warc_writer(self.options) - self._postfetch_chain.append(self.warc_writer_thread) + self.warc_writer_processor = Factory.warc_writer_processor(self.options) + self._postfetch_chain.append(self.warc_writer_processor) if self.dedup_db: self._postfetch_chain.append(self.dedup_db.storer()) @@ -207,6 +212,8 @@ class WarcproxController(object): self._postfetch_chain.append( warcprox.ListenerPostfetchProcessor( plugin, self.options)) + elif hasattr(plugin, 'CHAIN_POSITION') and plugin.CHAIN_POSITION == 'early': + self._postfetch_chain.insert(0, plugin) # or insert early but later than 0? else: self._postfetch_chain.append(plugin) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8b27874..8d63f96 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -206,9 +206,14 @@ class CdxServerDedup(DedupDb): def __init__(self, cdx_url="https://web.archive.org/cdx/search", maxsize=200, options=warcprox.Options()): + """Initialize cdx server connection pool and related parameters. + Use low timeout value and no retries to avoid blocking warcprox + operation by a slow CDX server. + """ self.cdx_url = cdx_url self.options = options - self.http_pool = urllib3.PoolManager(maxsize=maxsize) + self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, + timeout=2.0) if options.cdxserver_dedup_cookies: self.cookies = options.cdxserver_dedup_cookies @@ -271,7 +276,7 @@ class CdxServerDedup(DedupDb): class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=50) + self.pool = futures.ThreadPoolExecutor(max_workers=200) self.batch = set() self.cdx_dedup = cdx_dedup diff --git a/warcprox/main.py b/warcprox/main.py index f663d0d..8d16d3b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -162,6 +162,10 @@ def _build_arg_parser(prog='warcprox'): default=None, help=( 'host:port of tor socks proxy, used only to connect to ' '.onion sites')) + # Configurable connection socket timeout, default is 60 sec. + arg_parser.add_argument( + '--socket-timeout', dest='socket_timeout', type=float, + default=None, help=argparse.SUPPRESS) arg_parser.add_argument( '--crawl-log-dir', dest='crawl_log_dir', default=None, help=( 'if specified, write crawl log files in the specified ' diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 7792c5c..95d5b31 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -205,12 +205,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): and records the bytes in transit as it proxies them. ''' logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") + _socket_timeout = 60 def __init__(self, request, client_address, server): threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) self.is_connect = False self._headers_buffer = [] - request.settimeout(60) # XXX what value should this have? + request.settimeout(self._socket_timeout) http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server) def _determine_host_port(self): @@ -247,8 +248,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock = socket.socket() self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - # XXX what value should this timeout have? - self._remote_server_sock.settimeout(60) + self._remote_server_sock.settimeout(self._socket_timeout) self._remote_server_sock.connect((self.hostname, int(self.port))) # Wrap socket if SSL is required diff --git a/warcprox/trough.py b/warcprox/trough.py index 2848bbe..4128e50 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -103,17 +103,13 @@ class TroughClient(object): elif isinstance(x, bool): return int(x) elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r + # the only character that needs escaped in sqlite string literals + # is single-quote, which is escaped as two single-quotes + if isinstance(x, bytes): + s = x.decode('utf-8') else: - return r[1:] + s = x + return "'" + s.replace("'", "''") + "'" elif isinstance(x, (int, float)): return x else: @@ -196,7 +192,7 @@ class TroughClient(object): response.status_code, response.reason, response.text, write_url, sql) return - self.logger.debug('posted %r to %s', sql, write_url) + self.logger.debug('posted to %s: %r', write_url, sql) def read(self, segment_id, sql_tmpl, values=()): read_url = self.read_url(segment_id) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 7ae5ab4..e55b295 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -330,7 +330,7 @@ class RecordedUrl: warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, - payload_digest=None, warc_records=None): + payload_digest=None, warc_records=None, do_not_archive=False): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -370,6 +370,7 @@ class RecordedUrl: self.referer = referer self.payload_digest = payload_digest self.warc_records = warc_records + self.do_not_archive = do_not_archive # inherit from object so that multiple inheritance from this class works # properly in python 2 @@ -397,6 +398,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): WarcProxyHandler.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy WarcProxyHandler.onion_tor_socks_proxy_port = None + if options.socket_timeout: + WarcProxyHandler._socket_timeout = options.socket_timeout + http_server.HTTPServer.__init__( self, server_address, WarcProxyHandler, bind_and_activate=True) diff --git a/warcprox/writer.py b/warcprox/writer.py index 3fd6c7d..dbabafb 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -22,133 +22,172 @@ USA. from __future__ import absolute_import import logging -from datetime import datetime from hanzo import warctools import fcntl import time import warcprox import os import socket -import string import random import threading +try: + import queue +except ImportError: + import Queue as queue +import contextlib -class WarcWriter: - logger = logging.getLogger('warcprox.writer.WarcWriter') - - def __init__(self, options=warcprox.Options()): +class _OneWritableWarc: + logger = logging.getLogger('warcprox.writer._OneWritableWarc') + ''' + Utility class used by WarcWriter + ''' + def __init__(self, options=warcprox.Options(), randomtoken='0'): + self.f = None + self.path = None + self.finalname = None + self.gzip = options.gzip or False + self.prefix = options.prefix or 'warcprox' + self.open_suffix = '' if options.no_warc_open_suffix else '.open' + self.randomtoken = randomtoken self.rollover_size = options.rollover_size or 1000000000 self.rollover_idle_time = options.rollover_idle_time or None - self._last_activity = time.time() - - self.gzip = options.gzip or False - self.warc_filename = options.warc_filename or \ - '{prefix}-{timestamp17}-{randomtoken}-{serialno}' - digest_algorithm = options.digest_algorithm or 'sha1' - base32 = options.base32 - self.record_builder = warcprox.warc.WarcRecordBuilder( - digest_algorithm=digest_algorithm, base32=base32) - - # warc path and filename stuff self.directory = options.directory or './warcs' - self.prefix = options.prefix or 'warcprox' - - self._f = None - self._fpath = None - self._f_finalname = None - self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' - self._serial = 0 - self._lock = threading.RLock() - - self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8)) - - if not os.path.exists(self.directory): - self.logger.info("warc destination directory {} doesn't exist, creating it".format(self.directory)) - os.mkdir(self.directory) - - def timestamp17(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) - - def timestamp14(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}'.format(now) - - def close_writer(self): - with self._lock: - if self._fpath: - self.logger.info('closing %s', self._f_finalname) - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f, fcntl.LOCK_UN) - except IOError as exc: - self.logger.error('could not unlock file %s (%s)', - self._fpath, exc) - self._f.close() - finalpath = os.path.sep.join( - [self.directory, self._f_finalname]) - os.rename(self._fpath, finalpath) - - self._fpath = None - self._f = None - - def serial(self): - return '{:05d}'.format(self._serial) + self.filename_template = options.warc_filename or \ + '{prefix}-{timestamp17}-{randomtoken}-{serialno}' + self.last_activity = time.time() # h3 default - def _warc_filename(self): + def next_filename(self, serial): """WARC filename is configurable with CLI parameter --warc-filename. - Default: '{prefix}-{timestamp17}-{serialno}-{randomtoken}' + Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}' Available variables are: prefix, timestamp14, timestamp17, serialno, randomtoken, hostname, shorthostname. Extension ``.warc`` or ``.warc.gz`` is appended automatically. """ hostname = socket.getfqdn() shorthostname = hostname.split('.')[0] - fname = self.warc_filename.format(prefix=self.prefix, - timestamp14=self.timestamp14(), - timestamp17=self.timestamp17(), - serialno=self.serial(), - randomtoken=self._randomtoken, - hostname=hostname, - shorthostname=shorthostname) + fname = self.filename_template.format( + prefix=self.prefix, timestamp14=warcprox.timestamp14(), + timestamp17=warcprox.timestamp17(), + serialno='{:05d}'.format(serial), + randomtoken=self.randomtoken, hostname=hostname, + shorthostname=shorthostname) if self.gzip: fname = fname + '.warc.gz' else: fname = fname + '.warc' return fname - def _writer(self): - with self._lock: - if self._fpath and os.path.getsize( - self._fpath) > self.rollover_size: - self.close_writer() + def open(self, serial): + if not os.path.exists(self.directory): + self.logger.info( + "warc destination directory %s doesn't exist, creating it", + self.directory) + os.mkdir(self.directory) - if self._f == None: - self._f_finalname = self._warc_filename() - self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + self._f_open_suffix]) + self.finalname = self.next_filename(serial) + self.path = os.path.sep.join( + [self.directory, self.finalname + self.open_suffix]) - self._f = open(self._fpath, 'wb') - # if no '.open' suffix is used for WARC, acquire an exclusive - # file lock. - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError as exc: - self.logger.error('could not lock file %s (%s)', - self._fpath, exc) + self.f = open(self.path, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self.open_suffix == '': + try: + fcntl.lockf(self.f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error( + 'could not lock file %s (%s)', self.path, exc) + return self.f - warcinfo_record = self.record_builder.build_warcinfo_record( - self._f_finalname) - self.logger.debug( - 'warcinfo_record.headers=%s', warcinfo_record.headers) - warcinfo_record.write_to(self._f, gzip=self.gzip) + def close(self): + if self.path: + self.logger.trace('closing %s', self.finalname) + if self.open_suffix == '': + try: + fcntl.lockf(self.f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error( + 'could not unlock file %s (%s)', self.path, exc) + self.f.close() + finalpath = os.path.sep.join( + [self.directory, self.finalname]) + os.rename(self.path, finalpath) + self.path = None + self.f = None + + def maybe_idle_rollover(self): + if (self.path and self.rollover_idle_time + and self.rollover_idle_time > 0 + and time.time() - self.last_activity > self.rollover_idle_time): + self.logger.info( + 'rolling over %s after %0.1f seconds idle', + self.finalname, time.time() - self.last_activity) + self.close() + + def maybe_size_rollover(self): + if self.path and os.path.getsize(self.path) > self.rollover_size: + self.logger.info( + 'rolling over %s because it has reached %s bytes in size', + self.finalname, os.path.getsize(self.path)) + self.close() + +class WarcWriter: + logger = logging.getLogger('warcprox.writer.WarcWriter') + + def __init__(self, options=warcprox.Options()): + self.options = options + + self.gzip = options.gzip or False + self.record_builder = warcprox.warc.WarcRecordBuilder( + digest_algorithm=options.digest_algorithm or 'sha1', + base32=options.base32) + + self._available_warcs = queue.Queue() + self._warc_count = 0 + self._warc_count_lock = threading.Lock() + + self._serial = 0 + self._serial_lock = threading.Lock() + + self._randomtoken = ''.join( + random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8)) + + def _bespeak_warc(self): + try: + return self._available_warcs.get(block=False) + except queue.Empty: + with self._warc_count_lock: + if self._warc_count < self.options.writer_threads: + self._warc_count += 1 + return _OneWritableWarc(self.options, self._randomtoken) + # else we're maxed out, wait for one to free up + return self._available_warcs.get(block=True) + + @contextlib.contextmanager + def _warc(self): + warc = self._bespeak_warc() + + warc.maybe_size_rollover() + + # lazy file open + if warc.f == None: + with self._serial_lock: + serial = self._serial self._serial += 1 + warc.open(serial) + warcinfo = self.record_builder.build_warcinfo_record(warc.finalname) + self.logger.debug('warcinfo.headers=%s', warcinfo.headers) + warcinfo.write_to(warc.f, gzip=self.gzip) - return self._f + yield warc + + # __exit__() + warc.f.flush() + warc.last_activity = time.time() + self._available_warcs.put(warc) def write_records(self, recorded_url): """Returns tuple of records written, which are instances of @@ -156,44 +195,47 @@ class WarcWriter: "offset" attributes.""" records = self.record_builder.build_warc_records(recorded_url) - with self._lock: - writer = self._writer() - + with self._warc() as warc: for record in records: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) + offset = warc.f.tell() + record.write_to(warc.f, gzip=self.gzip) record.offset = offset - record.length = writer.tell() - offset - record.warc_filename = self._f_finalname + record.length = warc.f.tell() - offset + record.warc_filename = warc.finalname self.logger.debug( 'wrote warc record: warc_type=%s content_length=%s ' 'url=%s warc=%s offset=%d', record.get_header(warctools.WarcRecord.TYPE), record.get_header(warctools.WarcRecord.CONTENT_LENGTH), record.get_header(warctools.WarcRecord.URL), - self._fpath, record.offset) - - self._f.flush() - self._last_activity = time.time() + warc.path, record.offset) return records def maybe_idle_rollover(self): - with self._lock: - if (self._fpath is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.info( - 'rolling over %s after %s seconds idle', - self._f_finalname, time.time() - self._last_activity) - self.close_writer() + warcs = [] + while True: + try: + warc = self._available_warcs.get(block=False) + warcs.append(warc) + except queue.Empty: + break + for warc in warcs: + warc.maybe_idle_rollover() + self._available_warcs.put(warc) + + def close_writer(self): + while self._warc_count > 0: + with self._warc_count_lock: + warc = self._available_warcs.get() + warc.close() + self._warc_count -= 1 class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): - self.default_warc_writer = WarcWriter(options=options) + self.default_warc_writer = WarcWriter(options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options self._lock = threading.RLock() @@ -208,8 +250,7 @@ class WarcWriterPool: options.prefix = recorded_url.warcprox_meta["warc-prefix"] with self._lock: if not options.prefix in self.warc_writers: - self.warc_writers[options.prefix] = WarcWriter( - options=options) + self.warc_writers[options.prefix] = WarcWriter(options) w = self.warc_writers[options.prefix] return w diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 632ea2c..854319c 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -30,33 +30,44 @@ except ImportError: import logging import time import warcprox +from concurrent import futures -class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): - logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") +class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): + logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} def __init__(self, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.options = options self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) + self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1) + self.batch = set() def _get_process_put(self): - try: - warcprox.BaseStandardPostfetchProcessor._get_process_put(self) - finally: - self.writer_pool.maybe_idle_rollover() + recorded_url = self.inq.get(block=True, timeout=0.5) + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) def _process_url(self, recorded_url): - records = [] - if self._should_archive(recorded_url): - records = self.writer_pool.write_records(recorded_url) - recorded_url.warc_records = records - self._log(recorded_url, records) - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() + try: + records = [] + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() + except: + logging.error( + 'caught exception processing %s', recorded_url.url, + exc_info=True) + finally: + self.batch.remove(recorded_url) + if self.outq: + self.outq.put(recorded_url) + self.writer_pool.maybe_idle_rollover() def _filter_accepts(self, recorded_url): if not self.method_filter: @@ -70,8 +81,12 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): if recorded_url.warcprox_meta and 'warc-prefix' in recorded_url.warcprox_meta else self.options.prefix) + do_not_archive = (recorded_url.do_not_archive + if recorded_url.do_not_archive + else False) # special warc name prefix '-' means "don't archive" - return prefix != '-' and self._filter_accepts(recorded_url) + return (prefix != '-' and (not do_not_archive) + and self._filter_accepts(recorded_url)) def _log(self, recorded_url, records): try: