From c9f1feb3dbd53ecc5562de2ed651c5df8505ea83 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 26 Oct 2017 19:44:22 +0000 Subject: [PATCH 1/7] Add hidden --no-warc-open-suffix CLI option By default warcprox adds `.open` suffix in open WARC files. Using this option we disable that. The option does not appear on the program help. --- warcprox/main.py | 2 ++ warcprox/writer.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/warcprox/main.py b/warcprox/main.py index 76e194a..d5a6e3f 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -78,6 +78,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcs', help='where to write warcs') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', help=argparse.SUPPRESS) arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( diff --git a/warcprox/writer.py b/warcprox/writer.py index cf8d72d..419fd77 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -53,6 +53,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None + self._f_finalname_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -91,7 +92,7 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + '.open']) + self.directory, self._f_finalname + self._f_finalname_suffix]) self._f = open(self._fpath, 'wb') From 975f2479a8caa5f5a8e0e0328b56c956b6563bbb Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 26 Oct 2017 21:58:31 +0000 Subject: [PATCH 2/7] Acquire and exclusive file lock when not using .open WARC suffix --- warcprox/writer.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/warcprox/writer.py b/warcprox/writer.py index 419fd77..7e7ff11 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -24,6 +24,7 @@ from __future__ import absolute_import import logging from datetime import datetime from hanzo import warctools +import fcntl import time import warcprox import os @@ -71,6 +72,8 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) + if self._f_finalname_suffix == '': + fcntl.flock(self._f, fcntl.LOCK_UN) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -95,6 +98,10 @@ class WarcWriter: self.directory, self._f_finalname + self._f_finalname_suffix]) self._f = open(self._fpath, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self._f_finalname_suffix == '': + fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname) From 5871a1bae267eb0fb2f4a2a4492e78840c7d8283 Mon Sep 17 00:00:00 2001 From: vbanos Date: Fri, 27 Oct 2017 16:22:16 +0300 Subject: [PATCH 3/7] Rename writer var and add exception handling Rename ``self._f_finalname_suffix`` to ``self._f_open_suffix``. Add exception handling for file locking operations. --- warcprox/writer.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/warcprox/writer.py b/warcprox/writer.py index 7e7ff11..a3e24c6 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -54,7 +54,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None - self._f_finalname_suffix = '' if options.no_warc_open_suffix else '.open' + self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -72,8 +72,12 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) - if self._f_finalname_suffix == '': - fcntl.flock(self._f, fcntl.LOCK_UN) + if self._f_open_suffix == '': + try: + fcntl.flock(self._f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error('could not unlock file %s (%s)', + self._fpath, exc) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -95,13 +99,17 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + self._f_finalname_suffix]) + self.directory, self._f_finalname + self._f_open_suffix]) self._f = open(self._fpath, 'wb') # if no '.open' suffix is used for WARC, acquire an exclusive # file lock. - if self._f_finalname_suffix == '': - fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + if self._f_open_suffix == '': + try: + fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error('could not lock file %s (%s)', + self._fpath, exc) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname) From 3132856912c2d734b387da5fc42c51533eb6fcc9 Mon Sep 17 00:00:00 2001 From: vbanos Date: Sat, 28 Oct 2017 14:36:16 +0300 Subject: [PATCH 4/7] Test WarcWriter file locking when no_warc_open_suffix=True Add unit test for ``WarcWriter`` which open a different process and tries to lock the WARC file created by ``WarcWriter`` to check that locking works. --- tests/test_writer.py | 59 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/test_writer.py diff --git a/tests/test_writer.py b/tests/test_writer.py new file mode 100644 index 0000000..fa85616 --- /dev/null +++ b/tests/test_writer.py @@ -0,0 +1,59 @@ +import os +import fcntl +from multiprocessing import Process, Queue +from datetime import datetime +import pytest +from warcprox.mitmproxy import ProxyingRecorder +from warcprox.warcproxy import RecordedUrl +from warcprox.writer import WarcWriter +from warcprox import Options + +recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') + +recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', + status=200, client_ip='5.5.5.5', + request_data=b'abc', + response_recorder=recorder, + remote_ip='6.6.6.6', + timestamp=datetime.utcnow()) + + +def lock_file(queue, filename): + """Try to lock file and return 1 if successful, else return 0. + It is necessary to run this method in a different process to test locking. + """ + try: + fi = open(filename, 'ab') + fcntl.flock(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fi.close() + queue.put('1') + except IOError: + queue.put('0') + + +@pytest.mark.parametrize("no_warc_open_suffix,lock_result", [ + (True, '0'), + (False, '1')]) +def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): + """Test if WarcWriter is locking WARC files. + When we don't have the .open suffix, WarcWriter locks the file and the + external process trying to ``lock_file`` fails (result=0). + """ + dirname = os.path.dirname(tmpdir.mkdir('test-warc-writer')) + wwriter = WarcWriter(Options(directory=dirname, + no_warc_open_suffix=no_warc_open_suffix)) + wwriter.write_records(recorded_url) + + if no_warc_open_suffix: + suffix = '.warc' + else: + suffix = '.warc.open' + warcs = [fn for fn in os.listdir(dirname) if fn.endswith(suffix)] + assert warcs + target_warc = os.path.join(dirname, warcs[0]) + # launch another process and try to lock WARC file + queue = Queue() + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == lock_result From eda3da1db7b5fac263ff48d7e8eba1a7fefcf3fd Mon Sep 17 00:00:00 2001 From: vbanos Date: Sat, 28 Oct 2017 15:32:04 +0300 Subject: [PATCH 5/7] Unit test fix for Python2 compatibility --- tests/test_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index fa85616..8aedc7d 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -39,7 +39,7 @@ def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): When we don't have the .open suffix, WarcWriter locks the file and the external process trying to ``lock_file`` fails (result=0). """ - dirname = os.path.dirname(tmpdir.mkdir('test-warc-writer')) + dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=no_warc_open_suffix)) wwriter.write_records(recorded_url) From 25c0accc3cd7820945b7f033304096e7b56b714a Mon Sep 17 00:00:00 2001 From: vbanos Date: Sat, 28 Oct 2017 21:13:23 +0300 Subject: [PATCH 6/7] Swap fcntl.flock with fcntl.lockf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Linux, `fcntl.flock` is implemented with `flock(2)`, and `fcntl.lockf` is implemented with `fcntl(2)` — they are not compatible. Java `lock()` appears to be `fcntl(2)`. So, other Java programs working with these files work correctly only with `fcntl.lockf`. `warcprox` MUST use `fcntl.lockf` --- tests/test_writer.py | 2 +- warcprox/writer.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 8aedc7d..444909f 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -24,7 +24,7 @@ def lock_file(queue, filename): """ try: fi = open(filename, 'ab') - fcntl.flock(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) fi.close() queue.put('1') except IOError: diff --git a/warcprox/writer.py b/warcprox/writer.py index a3e24c6..7a1032a 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -74,7 +74,7 @@ class WarcWriter: self.logger.info('closing %s', self._f_finalname) if self._f_open_suffix == '': try: - fcntl.flock(self._f, fcntl.LOCK_UN) + fcntl.lockf(self._f, fcntl.LOCK_UN) except IOError as exc: self.logger.error('could not unlock file %s (%s)', self._fpath, exc) @@ -106,7 +106,7 @@ class WarcWriter: # file lock. if self._f_open_suffix == '': try: - fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError as exc: self.logger.error('could not lock file %s (%s)', self._fpath, exc) From c087cc7a2eb47e091f2ba42af6688d7fdb75bced Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 1 Nov 2017 17:50:46 +0000 Subject: [PATCH 7/7] Improve test_writer tests Check also that locking succeeds after the writer closes the WARC file. Remove parametrize from ``test_warc_writer_locking``, test only for the ``no_warc_open_suffix=True`` option. Change `1` to `OBTAINED LOCK` and `0` to `FAILED TO OBTAIN LOCK` in ``lock_file`` method. --- tests/test_writer.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 444909f..9ce0e13 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -11,10 +11,10 @@ from warcprox import Options recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', - status=200, client_ip='5.5.5.5', + status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, - remote_ip='6.6.6.6', + remote_ip='127.0.0.3', timestamp=datetime.utcnow()) @@ -26,29 +26,20 @@ def lock_file(queue, filename): fi = open(filename, 'ab') fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) fi.close() - queue.put('1') + queue.put('OBTAINED LOCK') except IOError: - queue.put('0') + queue.put('FAILED TO OBTAIN LOCK') -@pytest.mark.parametrize("no_warc_open_suffix,lock_result", [ - (True, '0'), - (False, '1')]) -def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): +def test_warc_writer_locking(tmpdir): """Test if WarcWriter is locking WARC files. When we don't have the .open suffix, WarcWriter locks the file and the external process trying to ``lock_file`` fails (result=0). """ dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) - wwriter = WarcWriter(Options(directory=dirname, - no_warc_open_suffix=no_warc_open_suffix)) + wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) wwriter.write_records(recorded_url) - - if no_warc_open_suffix: - suffix = '.warc' - else: - suffix = '.warc.open' - warcs = [fn for fn in os.listdir(dirname) if fn.endswith(suffix)] + warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] assert warcs target_warc = os.path.join(dirname, warcs[0]) # launch another process and try to lock WARC file @@ -56,4 +47,11 @@ def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): p = Process(target=lock_file, args=(queue, target_warc)) p.start() p.join() - assert queue.get() == lock_result + assert queue.get() == 'FAILED TO OBTAIN LOCK' + wwriter.close_writer() + + # locking must succeed after writer has closed the WARC file. + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'OBTAINED LOCK'