diff --git a/tests/test_writer.py b/tests/test_writer.py new file mode 100644 index 0000000..9ce0e13 --- /dev/null +++ b/tests/test_writer.py @@ -0,0 +1,57 @@ +import os +import fcntl +from multiprocessing import Process, Queue +from datetime import datetime +import pytest +from warcprox.mitmproxy import ProxyingRecorder +from warcprox.warcproxy import RecordedUrl +from warcprox.writer import WarcWriter +from warcprox import Options + +recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') + +recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', + status=200, client_ip='127.0.0.2', + request_data=b'abc', + response_recorder=recorder, + remote_ip='127.0.0.3', + timestamp=datetime.utcnow()) + + +def lock_file(queue, filename): + """Try to lock file and return 1 if successful, else return 0. + It is necessary to run this method in a different process to test locking. + """ + try: + fi = open(filename, 'ab') + fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fi.close() + queue.put('OBTAINED LOCK') + except IOError: + queue.put('FAILED TO OBTAIN LOCK') + + +def test_warc_writer_locking(tmpdir): + """Test if WarcWriter is locking WARC files. + When we don't have the .open suffix, WarcWriter locks the file and the + external process trying to ``lock_file`` fails (result=0). + """ + dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) + wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) + wwriter.write_records(recorded_url) + warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] + assert warcs + target_warc = os.path.join(dirname, warcs[0]) + # launch another process and try to lock WARC file + queue = Queue() + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'FAILED TO OBTAIN LOCK' + wwriter.close_writer() + + # locking must succeed after writer has closed the WARC file. + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'OBTAINED LOCK' diff --git a/warcprox/main.py b/warcprox/main.py index 76e194a..d5a6e3f 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -78,6 +78,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcs', help='where to write warcs') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', help=argparse.SUPPRESS) arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( diff --git a/warcprox/writer.py b/warcprox/writer.py index cf8d72d..7a1032a 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -24,6 +24,7 @@ from __future__ import absolute_import import logging from datetime import datetime from hanzo import warctools +import fcntl import time import warcprox import os @@ -53,6 +54,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None + self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -70,6 +72,12 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error('could not unlock file %s (%s)', + self._fpath, exc) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -91,9 +99,17 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + '.open']) + self.directory, self._f_finalname + self._f_open_suffix]) self._f = open(self._fpath, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self._f_open_suffix == '': + try: + fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error('could not lock file %s (%s)', + self._fpath, exc) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname)