From ae23011d845021f28625be400b6c3ae895afa0e9 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 8 Jan 2018 12:13:05 +0000 Subject: [PATCH 01/12] Configurable WARC filenames New ``--warc-filename`` CLI parameter with default value: ``'{prefix}-{timestamp17}-{serialno}-{randomtoken}'`` (the previous hard-coded WARC filename format). Use variables: ``{prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}`` to define custom WARC filenames. --- warcprox/main.py | 3 +++ warcprox/writer.py | 37 +++++++++++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/warcprox/main.py b/warcprox/main.py index 348dfbf..8bfc3c4 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -77,6 +77,9 @@ def _build_arg_parser(prog): help='where to store and load generated certificates') arg_parser.add_argument('-d', '--dir', dest='directory', default='./warcs', help='where to write warcs') + arg_parser.add_argument('--warc-filename', dest='warc_filename', + default='{prefix}-{timestamp17}-{serialno}-{randomtoken}', + help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', diff --git a/warcprox/writer.py b/warcprox/writer.py index 7a1032a..23dbafb 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -28,6 +28,7 @@ import fcntl import time import warcprox import os +import socket import string import random import threading @@ -42,6 +43,8 @@ class WarcWriter: self._last_activity = time.time() self.gzip = options.gzip or False + self.warc_filename = options.warc_filename or \ + '{prefix}-{timestamp17}-{randomtoken}-{serialno}.warc' digest_algorithm = options.digest_algorithm or 'sha1' base32 = options.base32 self.record_builder = warcprox.warc.WarcRecordBuilder( @@ -68,6 +71,10 @@ class WarcWriter: now = datetime.utcnow() return '{:%Y%m%d%H%M%S}{:03d}'.format(now, now.microsecond//1000) + def timestamp14(self): + now = datetime.utcnow() + return '{:%Y%m%d%H%M%S}'.format(now) + def close_writer(self): with self._lock: if self._fpath: @@ -86,8 +93,32 @@ class WarcWriter: self._fpath = None self._f = None + def serial(self): + return '{:05d}'.format(self._serial) + # h3 default - # ${prefix}-${timestamp17}-${randomtoken}-${serialno}.warc.gz" + def _warc_filename(self): + """WARC filename is configurable with CLI parameter --warc-filename. + Default: '{prefix}-{timestamp17}-{serialno}-{randomtoken}' + Available variables are: prefix, timestamp14, timestamp17, serialno, + randomtoken, hostname, shorthostname. + Extension ``.warc`` or ``.warc.gz`` is appended automatically. + """ + hostname = socket.getfqdn() + shorthostname = hostname.split(',')[0] + fname = self.warc_filename.format(prefix=self.prefix, + timestamp14=self.timestamp14(), + timestamp17=self.timestamp17(), + serialno=self.serial(), + randomtoken=self._randomtoken, + hostname=hostname, + shorthostname=shorthostname) + if self.gzip: + fname = fname + '.warc.gz' + else: + fname = fname + '.warc' + return fname + def _writer(self): with self._lock: if self._fpath and os.path.getsize( @@ -95,9 +126,7 @@ class WarcWriter: self.close_writer() if self._f == None: - self._f_finalname = '{}-{}-{:05d}-{}.warc{}'.format( - self.prefix, self.timestamp17(), self._serial, - self._randomtoken, '.gz' if self.gzip else '') + self._f_finalname = self._warc_filename() self._fpath = os.path.sep.join([ self.directory, self._f_finalname + self._f_open_suffix]) From ec86f2b3dfdef11e47869819c5f863d76e867521 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 9 Jan 2018 07:02:39 +0000 Subject: [PATCH 02/12] Fix warc_filename default value Remove redundant `.warc` --- warcprox/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/writer.py b/warcprox/writer.py index 23dbafb..44d21d3 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -44,7 +44,7 @@ class WarcWriter: self.gzip = options.gzip or False self.warc_filename = options.warc_filename or \ - '{prefix}-{timestamp17}-{randomtoken}-{serialno}.warc' + '{prefix}-{timestamp17}-{randomtoken}-{serialno}' digest_algorithm = options.digest_algorithm or 'sha1' base32 = options.base32 self.record_builder = warcprox.warc.WarcRecordBuilder( From d2ce61aec9afea6f72a2b4d0e9347bdb9a588afd Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 9 Jan 2018 12:54:42 +0000 Subject: [PATCH 03/12] Add WarcWriter warc_filename unit test Use custom ``warc_filename`` option and check that the created WARC filename follows the defined pattern. --- tests/test_writer.py | 24 ++++++++++++++++++++++++ warcprox/writer.py | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 4474f82..9b6d53a 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -163,3 +163,27 @@ def test_special_dont_write_prefix(): wwt.stop.set() wwt.join() + +def test_warc_writer_filename(tmpdir): + """Test if WarcWriter is writing WARC files with custom filenames. + """ + recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') + recorded_url = RecordedUrl( + url='http://example.com', content_type='text/plain', status=200, + client_ip='127.0.0.2', request_data=b'abc', + response_recorder=recorder, remote_ip='127.0.0.3', + timestamp=datetime.utcnow()) + + dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) + wwriter = WarcWriter(Options(directory=dirname, prefix='foo', + warc_filename='{timestamp17}-{prefix}-{timestamp14}-{serialno}')) + wwriter.write_records(recorded_url) + warcs = [fn for fn in os.listdir(dirname)] + assert warcs + target_warc = os.path.join(dirname, warcs[0]) + assert target_warc + parts = warcs[0].split('-') + assert len(parts[0]) == 17 + assert parts[1] == 'foo' + assert len(parts[2]) == 14 + assert parts[3] == '00000.warc.open' diff --git a/warcprox/writer.py b/warcprox/writer.py index 44d21d3..56ff635 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -105,7 +105,7 @@ class WarcWriter: Extension ``.warc`` or ``.warc.gz`` is appended automatically. """ hostname = socket.getfqdn() - shorthostname = hostname.split(',')[0] + shorthostname = hostname.split('.')[0] fname = self.warc_filename.format(prefix=self.prefix, timestamp14=self.timestamp14(), timestamp17=self.timestamp17(), From 9d789cdae8b8b2e8c681813b99eead3314894fe9 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 10 Jan 2018 18:41:56 +0000 Subject: [PATCH 04/12] Fix writer unit test --- tests/test_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 9b6d53a..7294ecc 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -176,13 +176,13 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}-{prefix}-{timestamp14}-{serialno}')) + warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs target_warc = os.path.join(dirname, warcs[0]) assert target_warc - parts = warcs[0].split('-') + parts = warcs[0].split('_') assert len(parts[0]) == 17 assert parts[1] == 'foo' assert len(parts[2]) == 14 From deddd4f850598df68320baa3e4fa95f3dc317d1d Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 10 Jan 2018 18:52:59 +0000 Subject: [PATCH 05/12] Another fix for the unit test --- tests/test_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 7294ecc..2ed4e1b 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -176,13 +176,13 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) + warc_filename='{timestamp17}-{prefix}-{timestamp14}-{serialno}')) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs target_warc = os.path.join(dirname, warcs[0]) assert target_warc - parts = warcs[0].split('_') + parts = os.path.basename(warcs[0]).split('-') assert len(parts[0]) == 17 assert parts[1] == 'foo' assert len(parts[2]) == 14 From e737a30ec181730676c76da73f94645ffa9bd731 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 10 Jan 2018 19:29:22 +0000 Subject: [PATCH 06/12] fix github problem with unit test --- tests/test_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 2ed4e1b..f9fb339 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -176,13 +176,13 @@ def test_warc_writer_filename(tmpdir): dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, prefix='foo', - warc_filename='{timestamp17}-{prefix}-{timestamp14}-{serialno}')) + warc_filename='{timestamp17}_{prefix}_{timestamp14}_{serialno}')) wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs target_warc = os.path.join(dirname, warcs[0]) assert target_warc - parts = os.path.basename(warcs[0]).split('-') + parts = os.path.basename(warcs[0]).split('_') assert len(parts[0]) == 17 assert parts[1] == 'foo' assert len(parts[2]) == 14 From b2c47142de435a83553bbb0c77c9ba2d9419978e Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 10 Jan 2018 20:38:06 +0000 Subject: [PATCH 07/12] Change the writer unit test To be able to run in github. --- tests/test_writer.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index f9fb339..5bce6b7 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -24,6 +24,7 @@ import fcntl from multiprocessing import Process, Queue from datetime import datetime import pytest +import re from warcprox.mitmproxy import ProxyingRecorder from warcprox.warcproxy import RecordedUrl from warcprox.writer import WarcWriter @@ -180,10 +181,4 @@ def test_warc_writer_filename(tmpdir): wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs - target_warc = os.path.join(dirname, warcs[0]) - assert target_warc - parts = os.path.basename(warcs[0]).split('_') - assert len(parts[0]) == 17 - assert parts[1] == 'foo' - assert len(parts[2]) == 14 - assert parts[3] == '00000.warc.open' + assert re.match('\d{17}_foo_\d{14}_00000.warc.open', warcs[0]) From 47ea3110bee3d3c7ff0e39218db7c6ed68288bc5 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 10 Jan 2018 20:55:31 +0000 Subject: [PATCH 08/12] Yet another unit test fix --- tests/test_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 5bce6b7..9728ec9 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -181,4 +181,4 @@ def test_warc_writer_filename(tmpdir): wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs - assert re.match('\d{17}_foo_\d{14}_00000.warc.open', warcs[0]) + assert re.search('\d{17}_foo_\d{14}_00000.warc.open', warcs[0]) From e59fed2b6f1a588996c20d2fdd650a6a9465c1b4 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 15 Jan 2018 17:43:34 +0000 Subject: [PATCH 09/12] Configurable CdxServerDedup urllib3 connection pool size urllib3 pool has default ``maxsize=1`` http://urllib3.readthedocs.io/en/latest/advanced-usage.html. We need to set a higher value because we get warnings like this: ``` 2018-01-15 20:04:10,044 18436 WARNING WarcWriterThread030(tid=18502) urllib3.connectionpool._put_conn(connectionpool.py:277) Connection pool is full, discarding connection: wwwb-dedup ``` We set value: ```cdxserver_maxsize = args.writer_threads or 200```. Note that the ideal would be to use this https://github.com/internetarchive/warcprox/blob/master/warcprox/main.py#L284 but it is initialized after dedup, there is a dependency and we cannot use it. --- warcprox/dedup.py | 4 ++-- warcprox/main.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index f21e1df..45b8142 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -185,12 +185,12 @@ class CdxServerDedup(object): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") - http_pool = urllib3.PoolManager() def __init__(self, cdx_url="https://web.archive.org/cdx/search", - options=warcprox.Options()): + maxsize=200, options=warcprox.Options()): self.cdx_url = cdx_url self.options = options + self.http_pool = urllib3.PoolManager(maxsize=maxsize) def start(self): pass diff --git a/warcprox/main.py b/warcprox/main.py index 348dfbf..457ddb2 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -222,7 +222,9 @@ def init_controller(args): elif args.rethinkdb_trough_db_url: dedup_db = warcprox.dedup.TroughDedupDb(options) elif args.cdxserver_dedup: - dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) + cdxserver_maxsize = args.writer_threads or 200 + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup, + maxsize=cdxserver_maxsize) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None From f73e625d6b227c9e5cdadef739ec6bd7c0b596f4 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 15 Jan 2018 20:17:22 +0000 Subject: [PATCH 10/12] Chec writer._fname in unit test For some reason this test previously failed in github. Maybe it has to do with the temporary files I need to create there... in any case, I changed what we check and evaluate the ``write._fname`` for the correct filename format. --- tests/test_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 9728ec9..61fe108 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -181,4 +181,4 @@ def test_warc_writer_filename(tmpdir): wwriter.write_records(recorded_url) warcs = [fn for fn in os.listdir(dirname)] assert warcs - assert re.search('\d{17}_foo_\d{14}_00000.warc.open', warcs[0]) + assert re.search('\d{17}_foo_\d{14}_00000.warc.open', wwriter._fpath) From 4a165e5f779275793ab720ba000d1463777556aa Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 15 Jan 2018 20:58:36 +0000 Subject: [PATCH 11/12] Update CdxServerDedup unit test To work correctly with the new way we init the ``CdxServerDedup.http_pool``. Use ``mock.MagicMock`` instead of ``mock.patch``. The unit test logic remains entirely the same. --- tests/test_dedup.py | 76 ++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 124efb5..eea3ccd 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -4,43 +4,43 @@ from warcprox.dedup import CdxServerDedup def test_cdx_dedup(): # Mock CDX Server responses to simulate found, not found and errors. - with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: - url = "http://example.com" - # not found case - result = mock.Mock() - result.status = 200 - result.data = b'20170101020405 test' - request.return_value = result - cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") - res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - url=url) - assert res is None + url = "http://example.com" + # not found case + result = mock.Mock() + result.status = 200 + result.data = b'20170101020405 test' + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + cdx_server.http_pool.request = mock.MagicMock(return_value=result) + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None - # found case - result = mock.Mock() - result.status = 200 - result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' - request.return_value = result - cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") - res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - url=url) - assert res["date"] == b"2017-02-03T04:05:03Z" + # found case + result = mock.Mock() + result.status = 200 + result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + cdx_server.http_pool.request = mock.MagicMock(return_value=result) + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res["date"] == b"2017-02-03T04:05:03Z" - # invalid CDX result status code - result = mock.Mock() - result.status = 400 - result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' - request.return_value = result - cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") - res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - url=url) - assert res is None - # invalid CDX result content - result = mock.Mock() - result.status = 200 - result.data = b'InvalidExceptionResult' - request.return_value = result - cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") - res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - url=url) - assert res is None + # invalid CDX result status code + result = mock.Mock() + result.status = 400 + result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + cdx_server.http_pool.request = mock.MagicMock(return_value=result) + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + + # invalid CDX result content + result = mock.Mock() + result.status = 200 + result.data = b'InvalidExceptionResult' + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + cdx_server.http_pool.request = mock.MagicMock(return_value=result) + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None From b43ab751f07751c1005550d9ca129b47a5ec3876 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 15 Jan 2018 17:28:20 -0800 Subject: [PATCH 12/12] fix running_stats thing --- setup.py | 2 +- warcprox/main.py | 5 ++--- warcprox/warcproxy.py | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index e90ac5f..242d5b6 100755 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ except: setuptools.setup( name='warcprox', - version='2.3.1b4.dev137', + version='2.3.1b4.dev138', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/main.py b/warcprox/main.py index 065bd63..1369204 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -215,8 +215,6 @@ def init_controller(args): exit(1) listeners = [] - running_stats = warcprox.stats.RunningStats() - listeners.append(running_stats) if args.rethinkdb_dedup_url: dedup_db = warcprox.dedup.RethinkDedupDb(options=options) @@ -254,7 +252,8 @@ def init_controller(args): proxy = warcprox.warcproxy.WarcProxy( ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db, - running_stats=running_stats, options=options) + options=options) + listeners.append(proxy.running_stats) if args.playback_port is not None: playback_index_db = warcprox.playback.PlaybackIndexDb( diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 6f7dd34..31b9b45 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -451,7 +451,7 @@ class WarcProxy(SingleThreadedWarcProxy, warcprox.mitmproxy.PooledMitmProxy): def __init__( self, ca=None, recorded_url_q=None, stats_db=None, - running_stats=None, options=warcprox.Options()): + options=warcprox.Options()): if options.max_threads: self.logger.info( "max_threads=%s set by command line option",