From 428a03689f976cb47fe7fb797b3e33779443c3ea Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sat, 27 Jan 2018 15:38:44 +0000 Subject: [PATCH 01/18] Make remote server connection timeout configurable Default is 60 sec (the previously hard-coded value) and you can override it with --remote-server-timeout=XX --- warcprox/main.py | 4 ++++ warcprox/mitmproxy.py | 3 ++- warcprox/warcproxy.py | 3 +++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/warcprox/main.py b/warcprox/main.py index f663d0d..b79d933 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -162,6 +162,10 @@ def _build_arg_parser(prog='warcprox'): default=None, help=( 'host:port of tor socks proxy, used only to connect to ' '.onion sites')) + # Configurable connection timeout to target sites, default is 60 sec. + arg_parser.add_argument( + '--remote-server-timeout', dest='remote_server_timeout', type=float, + default=None, help=argparse.SUPPRESS) arg_parser.add_argument( '--crawl-log-dir', dest='crawl_log_dir', default=None, help=( 'if specified, write crawl log files in the specified ' diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 7792c5c..b25901b 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -205,6 +205,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): and records the bytes in transit as it proxies them. ''' logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") + _remote_server_timeout = 60 def __init__(self, request, client_address, server): threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) @@ -248,7 +249,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # XXX what value should this timeout have? - self._remote_server_sock.settimeout(60) + self._remote_server_sock.settimeout(self._remote_server_timeout) self._remote_server_sock.connect((self.hostname, int(self.port))) # Wrap socket if SSL is required diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 7ae5ab4..6d8accb 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -397,6 +397,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): WarcProxyHandler.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy WarcProxyHandler.onion_tor_socks_proxy_port = None + if options.remote_server_timeout: + WarcProxyHandler._remote_server_timeout = options.remote_server_timeout + http_server.HTTPServer.__init__( self, server_address, WarcProxyHandler, bind_and_activate=True) From 9474a7ae7faa7795d23ef51d4fc217cdb6888c3c Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 30 Jan 2018 07:03:58 +0000 Subject: [PATCH 02/18] Rename remote-server-timeout to socket-timeout Also apply it to both remote target and local proxy client connections. --- warcprox/main.py | 4 ++-- warcprox/mitmproxy.py | 7 +++---- warcprox/warcproxy.py | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/warcprox/main.py b/warcprox/main.py index b79d933..8d16d3b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -162,9 +162,9 @@ def _build_arg_parser(prog='warcprox'): default=None, help=( 'host:port of tor socks proxy, used only to connect to ' '.onion sites')) - # Configurable connection timeout to target sites, default is 60 sec. + # Configurable connection socket timeout, default is 60 sec. arg_parser.add_argument( - '--remote-server-timeout', dest='remote_server_timeout', type=float, + '--socket-timeout', dest='socket_timeout', type=float, default=None, help=argparse.SUPPRESS) arg_parser.add_argument( '--crawl-log-dir', dest='crawl_log_dir', default=None, help=( diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index b25901b..95d5b31 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -205,13 +205,13 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): and records the bytes in transit as it proxies them. ''' logger = logging.getLogger("warcprox.mitmproxy.MitmProxyHandler") - _remote_server_timeout = 60 + _socket_timeout = 60 def __init__(self, request, client_address, server): threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1]) self.is_connect = False self._headers_buffer = [] - request.settimeout(60) # XXX what value should this have? + request.settimeout(self._socket_timeout) http_server.BaseHTTPRequestHandler.__init__(self, request, client_address, server) def _determine_host_port(self): @@ -248,8 +248,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock = socket.socket() self._remote_server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - # XXX what value should this timeout have? - self._remote_server_sock.settimeout(self._remote_server_timeout) + self._remote_server_sock.settimeout(self._socket_timeout) self._remote_server_sock.connect((self.hostname, int(self.port))) # Wrap socket if SSL is required diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 6d8accb..5b36300 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -397,8 +397,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object): WarcProxyHandler.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy WarcProxyHandler.onion_tor_socks_proxy_port = None - if options.remote_server_timeout: - WarcProxyHandler._remote_server_timeout = options.remote_server_timeout + if options.socket_timeout: + WarcProxyHandler._socket_timeout = options.socket_timeout http_server.HTTPServer.__init__( self, server_address, WarcProxyHandler, bind_and_activate=True) From 8d1df04797382236d1072a7092dbc9b1158fc9ae Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 30 Jan 2018 23:02:10 +0000 Subject: [PATCH 03/18] Add socket-timeout unit test Add socket-timeout=4 in ``warcprox_`` test fixture. Create test URL `/slow-url` which returns after 6 sec. Trying to access the target URL raises a ``socket.timeout`` and returns HTTP status 502. The new ``--socket-timeout`` option does not hurt any other test using the ``warcprox_`` fixture because they are too fast anyway. --- tests/test_warcprox.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 886902f..0e0b298 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -249,6 +249,14 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler): elif self.path == '/empty-response': headers = b'' payload = b'' + elif self.path == '/slow-response': + time.sleep(6) + headers = (b'HTTP/1.1 200 OK\r\n' + + b'Content-Type: text/plain\r\n' + + b'\r\n') + payload = b'Test.' + actual_headers = (b'Content-Type: text/plain\r\n' + + b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n') else: payload = b'404 Not Found\n' headers = (b'HTTP/1.1 404 Not Found\r\n' @@ -356,7 +364,8 @@ def warcprox_(request): '--port=0', '--playback-port=0', '--onion-tor-socks-proxy=localhost:9050', - '--crawl-log-dir=crawl-logs'] + '--crawl-log-dir=crawl-logs', + '--socket-timeout=4'] if request.config.getoption('--rethinkdb-dedup-url'): argv.append('--rethinkdb-dedup-url=%s' % request.config.getoption('--rethinkdb-dedup-url')) # test these here only @@ -1711,6 +1720,16 @@ def test_long_warcprox_meta( with pytest.raises(StopIteration): next(rec_iter) +def test_socket_timeout_response( + warcprox_, http_daemon, https_daemon, archiving_proxies, + playback_proxies): + """Response will timeout because we use --socket-timeout=4 whereas the + target URL will return after 6 sec. + """ + url = 'http://localhost:%s/slow-response' % http_daemon.server_port + response = requests.get(url, proxies=archiving_proxies, verify=False) + assert response.status_code == 502 + def test_empty_response( warcprox_, http_daemon, https_daemon, archiving_proxies, playback_proxies): From 322512dab6ca52dbaeef47ffa81a59a9484162d4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 2 Feb 2018 14:00:30 -0800 Subject: [PATCH 04/18] bump version number after latest pull request --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9b61523..36ee6bd 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b1.dev144', + version='2.4b1.dev145', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 2ceedd3fd2965e157bcd330cdc64d75212b3dff8 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 7 Feb 2018 15:43:01 -0800 Subject: [PATCH 05/18] 2.4b1 for pypi --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 36ee6bd..2a184c4 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b1.dev145', + version='2.4b1', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From e68be9354d879724c504ea218a8210b1006c10f7 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 7 Feb 2018 15:48:19 -0800 Subject: [PATCH 06/18] back to dev version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2a184c4..5f3a594 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b1', + version='2.4b2.dev146', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From d6fdc07f384936c511df20ae3234a088d7fc8a29 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 5 Feb 2018 10:38:48 +0000 Subject: [PATCH 07/18] Implement WarcWriterMultiThread --- warcprox/writerthread.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 632ea2c..6347e05 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -30,6 +30,7 @@ except ImportError: import logging import time import warcprox +from concurrent import futures class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") @@ -93,3 +94,35 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): def _shutdown(self): self.writer_pool.close_writers() + +class WarcWriterMultiThread(WarcWriterThread): + logger = logging.getLogger("warcprox.writerthread.WarcWriterMultiThread") + + def __init__(self, options=warcprox.Options()): + warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) + self.pool = futures.ThreadPoolExecutor(max_workers=10) + self.batch = set() + self.options = options + self.writer_pool = warcprox.writer.WarcWriterPool(options) + self.method_filter = set(method.upper() for method in self.options.method_filter or []) + + def _get_process_put(self): + recorded_url = self.inq.get(block=True, timeout=0.5) + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) + + def _process_url(self, recorded_url): + try: + records = [] + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() + finally: + self.batch.remove(recorded_url) + if self.outq: + self.outq.put(recorded_url) + self.writer_pool.maybe_idle_rollover() From e6f6993baf8d0c64bf9d9e6bf76f7d3d4ffeb5ec Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 5 Feb 2018 10:41:37 +0000 Subject: [PATCH 08/18] Implement MultiWarcWriter --- warcprox/writer.py | 100 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/warcprox/writer.py b/warcprox/writer.py index 3fd6c7d..7b3414f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -189,11 +189,109 @@ class WarcWriter: self._f_finalname, time.time() - self._last_activity) self.close_writer() +class MultiWarcWriter(WarcWriter): + logger = logging.getLogger("warcprox.writer.MultiWarcWriter") + + def __init__(self, options=warcprox.Options()): + super().__init__(options) + self._f = [None] * 3 + self._fpath = [None] * 3 + self._f_finalname = [None] * 3 + self._lock = [threading.RLock()] * 3 + + def _writer(self, curr): + with self._lock[curr]: + if self._fpath[curr] and os.path.getsize( + self._fpath[curr]) > self.rollover_size: + self.close_writer() + + if self._f[curr] == None: + self._f_finalname[curr] = self._warc_filename() + self._fpath[curr] = os.path.sep.join([ + self.directory, self._f_finalname[curr] + self._f_open_suffix]) + + self._f[curr] = open(self._fpath[curr], 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f[curr], fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error('could not lock file %s (%s)', + self._fpath, exc) + + warcinfo_record = self.record_builder.build_warcinfo_record( + self._f_finalname[curr]) + self.logger.debug( + 'warcinfo_record.headers=%s', warcinfo_record.headers) + warcinfo_record.write_to(self._f[curr], gzip=self.gzip) + + self._serial += 1 + + return self._f[curr] + + def write_records(self, recorded_url): + """Returns tuple of records written, which are instances of + hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and + "offset" attributes.""" + records = self.record_builder.build_warc_records(recorded_url) + curr = random.choice([0, 1, 2]) + + with self._lock[curr]: + writer = self._writer(curr) + + for record in records: + offset = writer.tell() + record.write_to(writer, gzip=self.gzip) + record.offset = offset + record.length = writer.tell() - offset + record.warc_filename = self._f_finalname[curr] + self.logger.debug( + 'wrote warc record: warc_type=%s content_length=%s ' + 'url=%s warc=%s offset=%d', + record.get_header(warctools.WarcRecord.TYPE), + record.get_header(warctools.WarcRecord.CONTENT_LENGTH), + record.get_header(warctools.WarcRecord.URL), + self._fpath[curr], record.offset) + + self._f[curr].flush() + self._last_activity = time.time() + + return records + + def maybe_idle_rollover(self): + for curr in range(0, 3): + with self._lock[curr]: + if (self._fpath[curr] is not None + and self.rollover_idle_time is not None + and self.rollover_idle_time > 0 + and time.time() - self._last_activity > self.rollover_idle_time): + self.logger.info( + 'rolling over %s after %s seconds idle', + self._f_finalname[curr], time.time() - self._last_activity) + self.close_writer(curr) + + def close_writer(self, curr): + with self._lock[curr]: + if self._fpath[curr]: + self.logger.info('closing %s', self._f_finalname[curr]) + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f[curr], fcntl.LOCK_UN) + except IOError as exc: + self.logger.error('could not unlock file %s (%s)', + self._fpath[curr], exc) + self._f[curr].close() + finalpath = os.path.sep.join( + [self.directory, self._f_finalname[curr]]) + os.rename(self._fpath[curr], finalpath) + class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): - self.default_warc_writer = WarcWriter(options=options) + # self.default_warc_writer = WarcWriter(options=options) + self.default_warc_writer = MultiWarcWriter(options=options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options self._lock = threading.RLock() From d2bdc9e213bff061b09183f871920c9163ac467d Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 5 Feb 2018 13:05:43 +0000 Subject: [PATCH 09/18] Set number of threads using --writer-threads cli option When the option is not set, use existing single threader writer architecture. If available, load ``WarcWriterMultiThread`` with pool size equal to ``--writer-threads``. --- warcprox/controller.py | 5 +++- warcprox/writer.py | 50 +++++++++++++++++++++++++++++----------- warcprox/writerthread.py | 2 +- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/warcprox/controller.py b/warcprox/controller.py index 85fa42d..b33b42e 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -69,7 +69,10 @@ class Factory: @staticmethod def warc_writer(options): - return warcprox.writerthread.WarcWriterThread(options) + if options.writer_threads: + return warcprox.writerthread.WarcWriterMultiThread(options) + else: + return warcprox.writerthread.WarcWriterThread(options) @staticmethod def playback_proxy(ca, options): diff --git a/warcprox/writer.py b/warcprox/writer.py index 7b3414f..7779c6f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -32,6 +32,11 @@ import socket import string import random import threading +try: + import queue +except ImportError: + import Queue as queue + class WarcWriter: logger = logging.getLogger('warcprox.writer.WarcWriter') @@ -194,16 +199,20 @@ class MultiWarcWriter(WarcWriter): def __init__(self, options=warcprox.Options()): super().__init__(options) - self._f = [None] * 3 - self._fpath = [None] * 3 - self._f_finalname = [None] * 3 - self._lock = [threading.RLock()] * 3 + self._thread_num = options.writer_threads + self._f = [None] * self._thread_num + self._fpath = [None] * self._thread_num + self._f_finalname = [None] * self._thread_num + self._lock = [threading.RLock()] * self._thread_num + self._available_threads = queue.Queue() + for i in range(self._thread_num): + self._available_threads.put(i) def _writer(self, curr): with self._lock[curr]: if self._fpath[curr] and os.path.getsize( self._fpath[curr]) > self.rollover_size: - self.close_writer() + self.close_writer(curr) if self._f[curr] == None: self._f_finalname[curr] = self._warc_filename() @@ -235,8 +244,9 @@ class MultiWarcWriter(WarcWriter): hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and "offset" attributes.""" records = self.record_builder.build_warc_records(recorded_url) - curr = random.choice([0, 1, 2]) - + curr = self._available_threads.get() + # we could also remove that lock?? The queue guaranties that no two + # threads have the same curr open. with self._lock[curr]: writer = self._writer(curr) @@ -256,11 +266,11 @@ class MultiWarcWriter(WarcWriter): self._f[curr].flush() self._last_activity = time.time() - + self._available_threads.put(curr) return records def maybe_idle_rollover(self): - for curr in range(0, 3): + for curr in range(0, self._thread_num): with self._lock[curr]: if (self._fpath[curr] is not None and self.rollover_idle_time is not None @@ -271,7 +281,15 @@ class MultiWarcWriter(WarcWriter): self._f_finalname[curr], time.time() - self._last_activity) self.close_writer(curr) - def close_writer(self, curr): + def close_writer(self, curr=None): + """When this method is invoked without any argument (program termination) + close all writer. + """ + if not curr: + for curr in range(0, self._thread_num): + self.close_writer(curr) + return + with self._lock[curr]: if self._fpath[curr]: self.logger.info('closing %s', self._f_finalname[curr]) @@ -290,8 +308,10 @@ class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): - # self.default_warc_writer = WarcWriter(options=options) - self.default_warc_writer = MultiWarcWriter(options=options) + if options.writer_threads: + self.default_warc_writer = MultiWarcWriter(options=options) + else: + self.default_warc_writer = WarcWriter(options=options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options self._lock = threading.RLock() @@ -306,7 +326,11 @@ class WarcWriterPool: options.prefix = recorded_url.warcprox_meta["warc-prefix"] with self._lock: if not options.prefix in self.warc_writers: - self.warc_writers[options.prefix] = WarcWriter( + if self.options.writer_threads: + self.warc_writers[options.prefix] = MultiWarcWriter( + options=options) + else: + self.warc_writers[options.prefix] = WarcWriter( options=options) w = self.warc_writers[options.prefix] return w diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 6347e05..c78e8c0 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -100,7 +100,7 @@ class WarcWriterMultiThread(WarcWriterThread): def __init__(self, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.pool = futures.ThreadPoolExecutor(max_workers=10) + self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads) self.batch = set() self.options = options self.writer_pool = warcprox.writer.WarcWriterPool(options) From fd8119051705c2ea8f15e7e163c970db4e972879 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 5 Feb 2018 17:22:09 -0800 Subject: [PATCH 10/18] refactor the multithreaded warc writing main functional change is that only as man warc files are created as are needed to keep up with the throughput --- tests/test_warcprox.py | 45 +++-- tests/test_writer.py | 16 +- warcprox/__init__.py | 8 + warcprox/controller.py | 18 +- warcprox/writer.py | 365 +++++++++++++++------------------------ warcprox/writerthread.py | 72 +++----- 6 files changed, 218 insertions(+), 306 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 0e0b298..da36df3 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -767,10 +767,12 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 4) # close the warc - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] - warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"] + warc = writer._available_warcs.queue[0] + warc_path = os.path.join(warc.directory, warc.finalname) + assert not os.path.exists(warc_path) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets"].close_writer() assert os.path.exists(warc_path) # read the warc @@ -1389,20 +1391,16 @@ def test_controller_with_defaults(): assert controller.proxy.server_port == 8000 assert controller.proxy.running_stats assert not controller.proxy.stats_db - wwt = controller.warc_writer_thread - assert wwt - assert wwt.inq - assert wwt.outq - assert wwt.writer_pool - assert wwt.writer_pool.default_warc_writer - assert wwt.writer_pool.default_warc_writer.directory == './warcs' - assert wwt.writer_pool.default_warc_writer.rollover_idle_time is None - assert wwt.writer_pool.default_warc_writer.rollover_size == 1000000000 - assert wwt.writer_pool.default_warc_writer.prefix == 'warcprox' - assert wwt.writer_pool.default_warc_writer.gzip is False - assert wwt.writer_pool.default_warc_writer.record_builder - assert not wwt.writer_pool.default_warc_writer.record_builder.base32 - assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' + wwp = controller.warc_writer_processor + assert wwp + assert wwp.inq + assert wwp.outq + assert wwp.writer_pool + assert wwp.writer_pool.default_warc_writer + assert wwp.writer_pool.default_warc_writer.gzip is False + assert wwp.writer_pool.default_warc_writer.record_builder + assert not wwp.writer_pool.default_warc_writer.record_builder.base32 + assert wwp.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' def test_load_plugin(): options = warcprox.Options(port=0, plugins=[ @@ -1482,7 +1480,7 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback assert response.status_code == 200 assert not 'via' in playback_response - warc = warcprox_.warc_writer_thread.writer_pool.default_warc_writer._fpath + warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path with open(warc, 'rb') as f: for record in warcio.archiveiterator.ArchiveIterator(f): if record.rec_headers.get_header('warc-target-uri') == url: @@ -1700,10 +1698,11 @@ def test_long_warcprox_meta( wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1) # check that warcprox-meta was parsed and honored ("warc-prefix" param) - assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] - writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"] - warc_path = os.path.join(writer.directory, writer._f_finalname) - warcprox_.warc_writer_thread.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"] + warc = writer._available_warcs.queue[0] + warc_path = os.path.join(warc.directory, warc.finalname) + warcprox_.warc_writer_processor.writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() assert os.path.exists(warc_path) # read the warc diff --git a/tests/test_writer.py b/tests/test_writer.py index 0a18c33..fb39378 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -61,7 +61,8 @@ def test_warc_writer_locking(tmpdir): timestamp=datetime.utcnow()) dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) - wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) + wwriter = WarcWriter(Options( + directory=dirname, no_warc_open_suffix=True, writer_threads=1)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] assert warcs @@ -93,7 +94,8 @@ def test_special_dont_write_prefix(): logging.debug('cd %s', tmpdir) os.chdir(tmpdir) - wwt = warcprox.writerthread.WarcWriterThread(Options(prefix='-')) + wwt = warcprox.writerthread.WarcWriterProcessor( + Options(prefix='-', writer_threads=1)) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -126,7 +128,8 @@ def test_special_dont_write_prefix(): wwt.stop.set() wwt.join() - wwt = warcprox.writerthread.WarcWriterThread() + wwt = warcprox.writerthread.WarcWriterProcessor( + Options(writer_threads=1)) wwt.inq = warcprox.TimestampedQueue(maxsize=1) wwt.outq = warcprox.TimestampedQueue(maxsize=1) try: @@ -172,8 +175,11 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) + warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}', + writer_threads=1)) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs - assert re.search('\d{17}_foo_\d{14}_00000.warc.open', wwriter._fpath) + assert re.search( + r'\d{17}_foo_\d{14}_00000.warc.open', + wwriter._available_warcs.queue[0].path) diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 39b31ea..33af61a 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -237,6 +237,14 @@ class ListenerPostfetchProcessor(BaseStandardPostfetchProcessor): self.logger.error( '%s raised exception', listener.stop, exc_info=True) +def timestamp17(): + now = datetime.datetime.utcnow() + return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) + +def timestamp14(): + now = datetime.datetime.utcnow() + return '{:%Y%m%d%H%M%S}'.format(now) + # monkey-patch log levels TRACE and NOTICE TRACE = 5 def _logger_trace(self, msg, *args, **kwargs): diff --git a/warcprox/controller.py b/warcprox/controller.py index b33b42e..30446c3 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -57,7 +57,6 @@ class Factory: @staticmethod def stats_processor(options): - # return warcprox.stats.StatsProcessor(options) if options.rethinkdb_stats_url: stats_processor = warcprox.stats.RethinkStatsProcessor(options) elif options.stats_db_file in (None, '', '/dev/null'): @@ -68,11 +67,8 @@ class Factory: return stats_processor @staticmethod - def warc_writer(options): - if options.writer_threads: - return warcprox.writerthread.WarcWriterMultiThread(options) - else: - return warcprox.writerthread.WarcWriterThread(options) + def warc_writer_processor(options): + return warcprox.writerthread.WarcWriterProcessor(options) @staticmethod def playback_proxy(ca, options): @@ -145,6 +141,12 @@ class WarcproxController(object): self.playback_proxy = Factory.playback_proxy( self.proxy.ca, self.options) + # default number of warc writer threads = sqrt(proxy.max_threads) + # pulled out of thin air because it strikes me as reasonable + # 1=>1 2=>1 5=>2 10=>3 50=>7 100=>10 200=>14 500=>22 1000=>32 2000=>45 + if not self.options.writer_threads: + self.options.writer_threads = int(self.proxy.max_threads ** 0.5) + self.build_postfetch_chain(self.proxy.recorded_url_q) self.service_registry = Factory.service_registry(options) @@ -184,8 +186,8 @@ class WarcproxController(object): if self.dedup_db: self._postfetch_chain.append(self.dedup_db.loader()) - self.warc_writer_thread = Factory.warc_writer(self.options) - self._postfetch_chain.append(self.warc_writer_thread) + self.warc_writer_processor = Factory.warc_writer_processor(self.options) + self._postfetch_chain.append(self.warc_writer_processor) if self.dedup_db: self._postfetch_chain.append(self.dedup_db.storer()) diff --git a/warcprox/writer.py b/warcprox/writer.py index 7779c6f..dbabafb 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -22,296 +22,220 @@ USA. from __future__ import absolute_import import logging -from datetime import datetime from hanzo import warctools import fcntl import time import warcprox import os import socket -import string import random import threading try: import queue except ImportError: import Queue as queue +import contextlib +class _OneWritableWarc: + logger = logging.getLogger('warcprox.writer._OneWritableWarc') -class WarcWriter: - logger = logging.getLogger('warcprox.writer.WarcWriter') - - def __init__(self, options=warcprox.Options()): - + ''' + Utility class used by WarcWriter + ''' + def __init__(self, options=warcprox.Options(), randomtoken='0'): + self.f = None + self.path = None + self.finalname = None + self.gzip = options.gzip or False + self.prefix = options.prefix or 'warcprox' + self.open_suffix = '' if options.no_warc_open_suffix else '.open' + self.randomtoken = randomtoken self.rollover_size = options.rollover_size or 1000000000 self.rollover_idle_time = options.rollover_idle_time or None - self._last_activity = time.time() - - self.gzip = options.gzip or False - self.warc_filename = options.warc_filename or \ - '{prefix}-{timestamp17}-{randomtoken}-{serialno}' - digest_algorithm = options.digest_algorithm or 'sha1' - base32 = options.base32 - self.record_builder = warcprox.warc.WarcRecordBuilder( - digest_algorithm=digest_algorithm, base32=base32) - - # warc path and filename stuff self.directory = options.directory or './warcs' - self.prefix = options.prefix or 'warcprox' - - self._f = None - self._fpath = None - self._f_finalname = None - self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' - self._serial = 0 - self._lock = threading.RLock() - - self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8)) - - if not os.path.exists(self.directory): - self.logger.info("warc destination directory {} doesn't exist, creating it".format(self.directory)) - os.mkdir(self.directory) - - def timestamp17(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) - - def timestamp14(self): - now = datetime.utcnow() - return '{:%Y%m%d%H%M%S}'.format(now) - - def close_writer(self): - with self._lock: - if self._fpath: - self.logger.info('closing %s', self._f_finalname) - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f, fcntl.LOCK_UN) - except IOError as exc: - self.logger.error('could not unlock file %s (%s)', - self._fpath, exc) - self._f.close() - finalpath = os.path.sep.join( - [self.directory, self._f_finalname]) - os.rename(self._fpath, finalpath) - - self._fpath = None - self._f = None - - def serial(self): - return '{:05d}'.format(self._serial) + self.filename_template = options.warc_filename or \ + '{prefix}-{timestamp17}-{randomtoken}-{serialno}' + self.last_activity = time.time() # h3 default - def _warc_filename(self): + def next_filename(self, serial): """WARC filename is configurable with CLI parameter --warc-filename. - Default: '{prefix}-{timestamp17}-{serialno}-{randomtoken}' + Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}' Available variables are: prefix, timestamp14, timestamp17, serialno, randomtoken, hostname, shorthostname. Extension ``.warc`` or ``.warc.gz`` is appended automatically. """ hostname = socket.getfqdn() shorthostname = hostname.split('.')[0] - fname = self.warc_filename.format(prefix=self.prefix, - timestamp14=self.timestamp14(), - timestamp17=self.timestamp17(), - serialno=self.serial(), - randomtoken=self._randomtoken, - hostname=hostname, - shorthostname=shorthostname) + fname = self.filename_template.format( + prefix=self.prefix, timestamp14=warcprox.timestamp14(), + timestamp17=warcprox.timestamp17(), + serialno='{:05d}'.format(serial), + randomtoken=self.randomtoken, hostname=hostname, + shorthostname=shorthostname) if self.gzip: fname = fname + '.warc.gz' else: fname = fname + '.warc' return fname - def _writer(self): - with self._lock: - if self._fpath and os.path.getsize( - self._fpath) > self.rollover_size: - self.close_writer() + def open(self, serial): + if not os.path.exists(self.directory): + self.logger.info( + "warc destination directory %s doesn't exist, creating it", + self.directory) + os.mkdir(self.directory) - if self._f == None: - self._f_finalname = self._warc_filename() - self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + self._f_open_suffix]) + self.finalname = self.next_filename(serial) + self.path = os.path.sep.join( + [self.directory, self.finalname + self.open_suffix]) - self._f = open(self._fpath, 'wb') - # if no '.open' suffix is used for WARC, acquire an exclusive - # file lock. - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError as exc: - self.logger.error('could not lock file %s (%s)', - self._fpath, exc) + self.f = open(self.path, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self.open_suffix == '': + try: + fcntl.lockf(self.f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error( + 'could not lock file %s (%s)', self.path, exc) + return self.f - warcinfo_record = self.record_builder.build_warcinfo_record( - self._f_finalname) - self.logger.debug( - 'warcinfo_record.headers=%s', warcinfo_record.headers) - warcinfo_record.write_to(self._f, gzip=self.gzip) + def close(self): + if self.path: + self.logger.trace('closing %s', self.finalname) + if self.open_suffix == '': + try: + fcntl.lockf(self.f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error( + 'could not unlock file %s (%s)', self.path, exc) + self.f.close() + finalpath = os.path.sep.join( + [self.directory, self.finalname]) + os.rename(self.path, finalpath) - self._serial += 1 - - return self._f - - def write_records(self, recorded_url): - """Returns tuple of records written, which are instances of - hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and - "offset" attributes.""" - records = self.record_builder.build_warc_records(recorded_url) - - with self._lock: - writer = self._writer() - - for record in records: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) - record.offset = offset - record.length = writer.tell() - offset - record.warc_filename = self._f_finalname - self.logger.debug( - 'wrote warc record: warc_type=%s content_length=%s ' - 'url=%s warc=%s offset=%d', - record.get_header(warctools.WarcRecord.TYPE), - record.get_header(warctools.WarcRecord.CONTENT_LENGTH), - record.get_header(warctools.WarcRecord.URL), - self._fpath, record.offset) - - self._f.flush() - self._last_activity = time.time() - - return records + self.path = None + self.f = None def maybe_idle_rollover(self): - with self._lock: - if (self._fpath is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.info( - 'rolling over %s after %s seconds idle', - self._f_finalname, time.time() - self._last_activity) - self.close_writer() + if (self.path and self.rollover_idle_time + and self.rollover_idle_time > 0 + and time.time() - self.last_activity > self.rollover_idle_time): + self.logger.info( + 'rolling over %s after %0.1f seconds idle', + self.finalname, time.time() - self.last_activity) + self.close() -class MultiWarcWriter(WarcWriter): - logger = logging.getLogger("warcprox.writer.MultiWarcWriter") + def maybe_size_rollover(self): + if self.path and os.path.getsize(self.path) > self.rollover_size: + self.logger.info( + 'rolling over %s because it has reached %s bytes in size', + self.finalname, os.path.getsize(self.path)) + self.close() + +class WarcWriter: + logger = logging.getLogger('warcprox.writer.WarcWriter') def __init__(self, options=warcprox.Options()): - super().__init__(options) - self._thread_num = options.writer_threads - self._f = [None] * self._thread_num - self._fpath = [None] * self._thread_num - self._f_finalname = [None] * self._thread_num - self._lock = [threading.RLock()] * self._thread_num - self._available_threads = queue.Queue() - for i in range(self._thread_num): - self._available_threads.put(i) + self.options = options - def _writer(self, curr): - with self._lock[curr]: - if self._fpath[curr] and os.path.getsize( - self._fpath[curr]) > self.rollover_size: - self.close_writer(curr) + self.gzip = options.gzip or False + self.record_builder = warcprox.warc.WarcRecordBuilder( + digest_algorithm=options.digest_algorithm or 'sha1', + base32=options.base32) - if self._f[curr] == None: - self._f_finalname[curr] = self._warc_filename() - self._fpath[curr] = os.path.sep.join([ - self.directory, self._f_finalname[curr] + self._f_open_suffix]) + self._available_warcs = queue.Queue() + self._warc_count = 0 + self._warc_count_lock = threading.Lock() - self._f[curr] = open(self._fpath[curr], 'wb') - # if no '.open' suffix is used for WARC, acquire an exclusive - # file lock. - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f[curr], fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError as exc: - self.logger.error('could not lock file %s (%s)', - self._fpath, exc) + self._serial = 0 + self._serial_lock = threading.Lock() - warcinfo_record = self.record_builder.build_warcinfo_record( - self._f_finalname[curr]) - self.logger.debug( - 'warcinfo_record.headers=%s', warcinfo_record.headers) - warcinfo_record.write_to(self._f[curr], gzip=self.gzip) + self._randomtoken = ''.join( + random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8)) + def _bespeak_warc(self): + try: + return self._available_warcs.get(block=False) + except queue.Empty: + with self._warc_count_lock: + if self._warc_count < self.options.writer_threads: + self._warc_count += 1 + return _OneWritableWarc(self.options, self._randomtoken) + # else we're maxed out, wait for one to free up + return self._available_warcs.get(block=True) + + @contextlib.contextmanager + def _warc(self): + warc = self._bespeak_warc() + + warc.maybe_size_rollover() + + # lazy file open + if warc.f == None: + with self._serial_lock: + serial = self._serial self._serial += 1 + warc.open(serial) + warcinfo = self.record_builder.build_warcinfo_record(warc.finalname) + self.logger.debug('warcinfo.headers=%s', warcinfo.headers) + warcinfo.write_to(warc.f, gzip=self.gzip) - return self._f[curr] + yield warc + + # __exit__() + warc.f.flush() + warc.last_activity = time.time() + self._available_warcs.put(warc) def write_records(self, recorded_url): """Returns tuple of records written, which are instances of hanzo.warctools.warc.WarcRecord, decorated with "warc_filename" and "offset" attributes.""" records = self.record_builder.build_warc_records(recorded_url) - curr = self._available_threads.get() - # we could also remove that lock?? The queue guaranties that no two - # threads have the same curr open. - with self._lock[curr]: - writer = self._writer(curr) + with self._warc() as warc: for record in records: - offset = writer.tell() - record.write_to(writer, gzip=self.gzip) + offset = warc.f.tell() + record.write_to(warc.f, gzip=self.gzip) record.offset = offset - record.length = writer.tell() - offset - record.warc_filename = self._f_finalname[curr] + record.length = warc.f.tell() - offset + record.warc_filename = warc.finalname self.logger.debug( 'wrote warc record: warc_type=%s content_length=%s ' 'url=%s warc=%s offset=%d', record.get_header(warctools.WarcRecord.TYPE), record.get_header(warctools.WarcRecord.CONTENT_LENGTH), record.get_header(warctools.WarcRecord.URL), - self._fpath[curr], record.offset) + warc.path, record.offset) - self._f[curr].flush() - self._last_activity = time.time() - self._available_threads.put(curr) return records def maybe_idle_rollover(self): - for curr in range(0, self._thread_num): - with self._lock[curr]: - if (self._fpath[curr] is not None - and self.rollover_idle_time is not None - and self.rollover_idle_time > 0 - and time.time() - self._last_activity > self.rollover_idle_time): - self.logger.info( - 'rolling over %s after %s seconds idle', - self._f_finalname[curr], time.time() - self._last_activity) - self.close_writer(curr) + warcs = [] + while True: + try: + warc = self._available_warcs.get(block=False) + warcs.append(warc) + except queue.Empty: + break + for warc in warcs: + warc.maybe_idle_rollover() + self._available_warcs.put(warc) - def close_writer(self, curr=None): - """When this method is invoked without any argument (program termination) - close all writer. - """ - if not curr: - for curr in range(0, self._thread_num): - self.close_writer(curr) - return - - with self._lock[curr]: - if self._fpath[curr]: - self.logger.info('closing %s', self._f_finalname[curr]) - if self._f_open_suffix == '': - try: - fcntl.lockf(self._f[curr], fcntl.LOCK_UN) - except IOError as exc: - self.logger.error('could not unlock file %s (%s)', - self._fpath[curr], exc) - self._f[curr].close() - finalpath = os.path.sep.join( - [self.directory, self._f_finalname[curr]]) - os.rename(self._fpath[curr], finalpath) + def close_writer(self): + while self._warc_count > 0: + with self._warc_count_lock: + warc = self._available_warcs.get() + warc.close() + self._warc_count -= 1 class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") def __init__(self, options=warcprox.Options()): - if options.writer_threads: - self.default_warc_writer = MultiWarcWriter(options=options) - else: - self.default_warc_writer = WarcWriter(options=options) + self.default_warc_writer = WarcWriter(options) self.warc_writers = {} # {prefix:WarcWriter} self.options = options self._lock = threading.RLock() @@ -326,12 +250,7 @@ class WarcWriterPool: options.prefix = recorded_url.warcprox_meta["warc-prefix"] with self._lock: if not options.prefix in self.warc_writers: - if self.options.writer_threads: - self.warc_writers[options.prefix] = MultiWarcWriter( - options=options) - else: - self.warc_writers[options.prefix] = WarcWriter( - options=options) + self.warc_writers[options.prefix] = WarcWriter(options) w = self.warc_writers[options.prefix] return w diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index c78e8c0..1010161 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -32,32 +32,42 @@ import time import warcprox from concurrent import futures -class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): - logger = logging.getLogger("warcprox.writerthread.WarcWriterThread") +class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): + logger = logging.getLogger("warcprox.writerthread.WarcWriterProcessor") _ALWAYS_ACCEPT = {'WARCPROX_WRITE_RECORD'} def __init__(self, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.options = options self.writer_pool = warcprox.writer.WarcWriterPool(options) self.method_filter = set(method.upper() for method in self.options.method_filter or []) + self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads or 1) + self.batch = set() def _get_process_put(self): - try: - warcprox.BaseStandardPostfetchProcessor._get_process_put(self) - finally: - self.writer_pool.maybe_idle_rollover() + recorded_url = self.inq.get(block=True, timeout=0.5) + self.batch.add(recorded_url) + self.pool.submit(self._process_url, recorded_url) def _process_url(self, recorded_url): - records = [] - if self._should_archive(recorded_url): - records = self.writer_pool.write_records(recorded_url) - recorded_url.warc_records = records - self._log(recorded_url, records) - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() + try: + records = [] + if self._should_archive(recorded_url): + records = self.writer_pool.write_records(recorded_url) + recorded_url.warc_records = records + self._log(recorded_url, records) + # try to release resources in a timely fashion + if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: + recorded_url.response_recorder.tempfile.close() + except: + logging.error( + 'caught exception processing %s', recorded_url.url, + exc_info=True) + finally: + self.batch.remove(recorded_url) + if self.outq: + self.outq.put(recorded_url) + self.writer_pool.maybe_idle_rollover() def _filter_accepts(self, recorded_url): if not self.method_filter: @@ -94,35 +104,3 @@ class WarcWriterThread(warcprox.BaseStandardPostfetchProcessor): def _shutdown(self): self.writer_pool.close_writers() - -class WarcWriterMultiThread(WarcWriterThread): - logger = logging.getLogger("warcprox.writerthread.WarcWriterMultiThread") - - def __init__(self, options=warcprox.Options()): - warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) - self.pool = futures.ThreadPoolExecutor(max_workers=options.writer_threads) - self.batch = set() - self.options = options - self.writer_pool = warcprox.writer.WarcWriterPool(options) - self.method_filter = set(method.upper() for method in self.options.method_filter or []) - - def _get_process_put(self): - recorded_url = self.inq.get(block=True, timeout=0.5) - self.batch.add(recorded_url) - self.pool.submit(self._process_url, recorded_url) - - def _process_url(self, recorded_url): - try: - records = [] - if self._should_archive(recorded_url): - records = self.writer_pool.write_records(recorded_url) - recorded_url.warc_records = records - self._log(recorded_url, records) - # try to release resources in a timely fashion - if recorded_url.response_recorder and recorded_url.response_recorder.tempfile: - recorded_url.response_recorder.tempfile.close() - finally: - self.batch.remove(recorded_url) - if self.outq: - self.outq.put(recorded_url) - self.writer_pool.maybe_idle_rollover() From 688e53d889b5d39bc1eca37f45b6a75394ee6b7c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 7 Feb 2018 15:49:35 -0800 Subject: [PATCH 11/18] bump version number after pull request --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5f3a594..26056c2 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev146', + version='2.4b2.dev147', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From b2a1f15bf6999b727fcb9f4f29f2b74c65c49c5c Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 7 Feb 2018 16:06:46 -0800 Subject: [PATCH 12/18] clean up test infrastructure - fix crufty, broken test in setup.py - include tests in sdist tarball for pypi --- MANIFEST.in | 1 + setup.cfg | 6 ++++++ setup.py | 19 +++---------------- 3 files changed, 10 insertions(+), 16 deletions(-) create mode 100644 MANIFEST.in create mode 100644 setup.cfg diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..1276f2e --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +recursive-include tests *.py *.sh Dockerfile diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..df07c14 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[aliases] +test=pytest + +[tool:pytest] +addopts=-v +testpaths=tests diff --git a/setup.py b/setup.py index 26056c2..83e8187 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ ''' setup.py - setuptools installation configuration for warcprox -Copyright (C) 2013-2016 Internet Archive +Copyright (C) 2013-2018 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -22,18 +22,6 @@ USA. import sys import setuptools -import setuptools.command.test - -class PyTest(setuptools.command.test.test): - def finalize_options(self): - setuptools.command.test.test.finalize_options(self) - self.test_args = [] - self.test_suite = True - def run_tests(self): - # import here, because outside the eggs aren't loaded - import pytest - errno = pytest.main(self.test_args) - sys.exit(errno) deps = [ 'certauth==1.1.6', @@ -52,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev147', + version='2.4b2.dev148', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', @@ -61,9 +49,8 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, + setup_requires=['pytest-runner'], tests_require=['mock', 'pytest', 'warcio'], - cmdclass = {'test': PyTest}, - test_suite='warcprox.tests', entry_points={ 'console_scripts': [ 'warcprox=warcprox.main:main', From 0d8fe4a38fa46aa07eb4ff6ca0b86f6ffb0297a1 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 8 Feb 2018 22:24:20 +0000 Subject: [PATCH 13/18] Disable retries and set timeout=2.0 for CDX Dedup server Its better to skip CDX server dedup than slow down when its unresponsive. Also increase pool size from 50 to 200. --- warcprox/dedup.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8b27874..8d63f96 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -206,9 +206,14 @@ class CdxServerDedup(DedupDb): def __init__(self, cdx_url="https://web.archive.org/cdx/search", maxsize=200, options=warcprox.Options()): + """Initialize cdx server connection pool and related parameters. + Use low timeout value and no retries to avoid blocking warcprox + operation by a slow CDX server. + """ self.cdx_url = cdx_url self.options = options - self.http_pool = urllib3.PoolManager(maxsize=maxsize) + self.http_pool = urllib3.PoolManager(maxsize=maxsize, retries=0, + timeout=2.0) if options.cdxserver_dedup_cookies: self.cookies = options.cdxserver_dedup_cookies @@ -271,7 +276,7 @@ class CdxServerDedup(DedupDb): class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor): def __init__(self, cdx_dedup, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - self.pool = futures.ThreadPoolExecutor(max_workers=50) + self.pool = futures.ThreadPoolExecutor(max_workers=200) self.batch = set() self.cdx_dedup = cdx_dedup From 6d6f2c9aa0c7a2bf2aa54f7d74e25e072135fae4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 12 Feb 2018 11:42:35 -0800 Subject: [PATCH 14/18] fix sqlite3 string escaping --- setup.py | 2 +- warcprox/trough.py | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/setup.py b/setup.py index 83e8187..66e8a07 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ except: setuptools.setup( name='warcprox', - version='2.4b2.dev148', + version='2.4b2.dev149', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/trough.py b/warcprox/trough.py index 2848bbe..4128e50 100644 --- a/warcprox/trough.py +++ b/warcprox/trough.py @@ -103,17 +103,13 @@ class TroughClient(object): elif isinstance(x, bool): return int(x) elif isinstance(x, str) or isinstance(x, bytes): - # py3: repr(u'abc') => 'abc' - # repr(b'abc') => b'abc' - # py2: repr(u'abc') => u'abc' - # repr(b'abc') => 'abc' - # Repr gives us a prefix we don't want in different situations - # depending on whether this is py2 or py3. Chop it off either way. - r = repr(x) - if r[:1] == "'": - return r + # the only character that needs escaped in sqlite string literals + # is single-quote, which is escaped as two single-quotes + if isinstance(x, bytes): + s = x.decode('utf-8') else: - return r[1:] + s = x + return "'" + s.replace("'", "''") + "'" elif isinstance(x, (int, float)): return x else: @@ -196,7 +192,7 @@ class TroughClient(object): response.status_code, response.reason, response.text, write_url, sql) return - self.logger.debug('posted %r to %s', sql, write_url) + self.logger.debug('posted to %s: %r', write_url, sql) def read(self, segment_id, sql_tmpl, values=()): read_url = self.read_url(segment_id) From d5bf49e44810e1b62bc3f40054f8c87461fe4185 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 14 Feb 2018 15:48:21 -0800 Subject: [PATCH 15/18] add do_not_archive check to should_archive --- warcprox/writerthread.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 1010161..27c5eea 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -81,8 +81,12 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): if recorded_url.warcprox_meta and 'warc-prefix' in recorded_url.warcprox_meta else self.options.prefix) + do_not_archive = (recorded_url.do_not_archive + if recorded_url.do_not_archive + else False) # special warc name prefix '-' means "don't archive" - return prefix != '-' and self._filter_accepts(recorded_url) + return prefix != '-' and (not do_not_archive) and + self._filter_accepts(recorded_url) def _log(self, recorded_url, records): try: From b3f08359e872f46c028ca5649d79a8e25ff81be6 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 14 Feb 2018 17:55:09 -0800 Subject: [PATCH 16/18] add CHAIN_POSITION support --- warcprox/controller.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/warcprox/controller.py b/warcprox/controller.py index 30446c3..644fdec 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -212,6 +212,8 @@ class WarcproxController(object): self._postfetch_chain.append( warcprox.ListenerPostfetchProcessor( plugin, self.options)) + elif hasattr(plugin, 'CHAIN_POSITION') and plugin.CHAIN_POSITION == 'early': + self._postfetch_chain.insert(0, plugin) # or insert early but later than 0? else: self._postfetch_chain.append(plugin) From 1e432fb54e830e9780e5224fb07fa4cb5bbe9cf1 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 15 Feb 2018 13:55:42 -0800 Subject: [PATCH 17/18] no indent errors, please --- warcprox/writerthread.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 27c5eea..854319c 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -85,8 +85,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor): if recorded_url.do_not_archive else False) # special warc name prefix '-' means "don't archive" - return prefix != '-' and (not do_not_archive) and - self._filter_accepts(recorded_url) + return (prefix != '-' and (not do_not_archive) + and self._filter_accepts(recorded_url)) def _log(self, recorded_url, records): try: From e37693df5aac5af0c1ae2dcd6aaede0f421042d1 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 15 Feb 2018 13:56:14 -0800 Subject: [PATCH 18/18] add do_not_archive to class --- warcprox/warcproxy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 5b36300..e55b295 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -330,7 +330,7 @@ class RecordedUrl: warcprox_meta=None, content_type=None, custom_type=None, status=None, size=None, client_ip=None, method=None, timestamp=None, host=None, duration=None, referer=None, - payload_digest=None, warc_records=None): + payload_digest=None, warc_records=None, do_not_archive=False): # XXX should test what happens with non-ascii url (when does # url-encoding happen?) if type(url) is not bytes: @@ -370,6 +370,7 @@ class RecordedUrl: self.referer = referer self.payload_digest = payload_digest self.warc_records = warc_records + self.do_not_archive = do_not_archive # inherit from object so that multiple inheritance from this class works # properly in python 2