From 940af4e888df9bfa5b738a6133dfc013a452a682 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20J=C3=BCrgens?= Date: Fri, 18 Aug 2017 15:52:34 +0200 Subject: [PATCH 01/48] fix zero-indexing of warc_writer_threads so they can be disabled via empty list --- warcprox/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warcprox/controller.py b/warcprox/controller.py index 00f5eb7..b3f3eaf 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -176,7 +176,7 @@ class WarcproxController(object): assert(all( wwt.dedup_db is self.warc_writer_threads[0].dedup_db for wwt in self.warc_writer_threads)) - if self.warc_writer_threads[0].dedup_db: + if any((t.dedup_db for t in self.warc_writer_threads)): self.warc_writer_threads[0].dedup_db.start() for wwt in self.warc_writer_threads: @@ -211,7 +211,7 @@ class WarcproxController(object): if self.proxy.stats_db: self.proxy.stats_db.stop() - if self.warc_writer_threads[0].dedup_db: + if any((t.dedup_db for t in self.warc_writer_threads)): self.warc_writer_threads[0].dedup_db.close() self.proxy_thread.join() From 1bca9d03242ea3ba65dd3a48e4e523a9fea2512a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 18 Sep 2017 14:45:16 -0700 Subject: [PATCH 02/48] don't use http.client.HTTPResponse.getheader() to get the content-type header, because it can return a comma-delimited string --- setup.py | 2 +- warcprox/warcproxy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 20fceb2..01ef253 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev98', + version='2.2b1.dev99', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index b6c96d6..e49d44d 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -192,7 +192,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): warcprox_meta=warcprox_meta, status=prox_rec_res.status, size=prox_rec_res.recorder.len, client_ip=self.client_address[0], - content_type=prox_rec_res.getheader("Content-Type"), + content_type=prox_rec_res.headers.get("Content-Type"), method=self.command, timestamp=timestamp, host=self.hostname, duration=datetime.datetime.utcnow()-timestamp) self.server.recorded_url_q.put(recorded_url) From 8bfda9f4b364498f809a546acfcbd8a9b5ce8cfe Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 20 Sep 2017 11:03:36 -0700 Subject: [PATCH 03/48] fix python2 tests --- setup.py | 2 +- warcprox/warcproxy.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 01ef253..25a2e49 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev99', + version='2.2b1.dev100', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index e49d44d..9af6fe6 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -186,14 +186,22 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request( self) + content_type = None + try: + content_type = prox_rec_res.headers.get('content-type') + except AttributeError: # py2 + raw = prox_rec_res.msg.getrawheader('content-type') + if raw: + content_type = raw.strip() + recorded_url = RecordedUrl( url=self.url, request_data=req, response_recorder=prox_rec_res.recorder, remote_ip=remote_ip, warcprox_meta=warcprox_meta, status=prox_rec_res.status, size=prox_rec_res.recorder.len, client_ip=self.client_address[0], - content_type=prox_rec_res.headers.get("Content-Type"), - method=self.command, timestamp=timestamp, host=self.hostname, + content_type=content_type, method=self.command, + timestamp=timestamp, host=self.hostname, duration=datetime.datetime.utcnow()-timestamp) self.server.recorded_url_q.put(recorded_url) From b1819c51b9478a7c4d1f06afb9530c17ad002bfe Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sun, 24 Sep 2017 10:51:29 +0000 Subject: [PATCH 04/48] Add missing packages from setup.py, add tox config. Add missing `requests` and `warcio` packages. They are used in unit tests but they were not included in `setup.py`. Add `tox` configuration in order to be able to run unit tests for py27, py34 and py35 with 1 command. --- setup.py | 2 ++ tox.ini | 9 +++++++++ 2 files changed, 11 insertions(+) create mode 100644 tox.ini diff --git a/setup.py b/setup.py index 25a2e49..36984b0 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,8 @@ deps = [ 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev81', 'PySocks', + 'warcio', + 'requests' ] try: import concurrent.futures diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..bec9629 --- /dev/null +++ b/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist=py27,py34,py35 +[testenv] +deps= + pytest + pytest-xdist + requests + mock +commands=py.test -n 4 From 66b4c35322031da6361e9251528964cb5b71e664 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sun, 24 Sep 2017 11:15:30 +0000 Subject: [PATCH 05/48] Remove unused imports --- warcprox/bigtable.py | 2 -- warcprox/dedup.py | 1 - warcprox/main.py | 1 - warcprox/stats.py | 1 - warcprox/warc.py | 1 - warcprox/warcproxy.py | 3 --- warcprox/writer.py | 1 - warcprox/writerthread.py | 5 ----- 8 files changed, 15 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 94cf9c9..1064fc8 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -25,8 +25,6 @@ USA. from __future__ import absolute_import import logging -from hanzo import warctools -import random import warcprox import base64 import urlcanon diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 78c5434..71637a3 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -26,7 +26,6 @@ import os import json from hanzo import warctools import warcprox -import random import sqlite3 class DedupDb(object): diff --git a/warcprox/main.py b/warcprox/main.py index b4dc2e5..7b7314b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -39,7 +39,6 @@ import signal import threading import certauth.certauth import warcprox -import re import doublethink import cryptography.hazmat.backends.openssl import importlib diff --git a/warcprox/stats.py b/warcprox/stats.py index 52a5b47..55693a2 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -25,7 +25,6 @@ import logging import os import json from hanzo import warctools -import random import warcprox import threading import rethinkdb as r diff --git a/warcprox/warc.py b/warcprox/warc.py index fbc2a33..51b1c35 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -27,7 +27,6 @@ import hashlib import socket import hanzo.httptools from hanzo import warctools -import warcprox import datetime class WarcRecordBuilder: diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 9af6fe6..06983ed 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -35,15 +35,12 @@ try: except ImportError: import Queue as queue import logging -import re -import traceback import json import socket from hanzo import warctools from certauth.certauth import CertificateAuthority import warcprox import datetime -import ipaddress import urlcanon import os diff --git a/warcprox/writer.py b/warcprox/writer.py index 0c503bf..cf8d72d 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -27,7 +27,6 @@ from hanzo import warctools import time import warcprox import os -import socket import string import random import threading diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index 0e73b97..f8cdd7a 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -29,13 +29,8 @@ except ImportError: import logging import threading -import os -import hashlib import time -import socket -import base64 from datetime import datetime -import hanzo.httptools from hanzo import warctools import warcprox import cProfile From d035147e3ea8c04598f61c3ffe3f68332530f69b Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sun, 24 Sep 2017 13:36:12 +0000 Subject: [PATCH 06/48] Remove redundant close method from DedupDb and RethinkDedupDb I'm trying to implement another DedupDb interface and I looked into the use of each method. The ``close`` method of ``dedup.DedupDb`` and ``deup.RethinkDedupDb`` is empty. It is also invoked from ``controller``. Since it doesn't do anything and it won't in the foreseeable future, let's remove it. --- warcprox/controller.py | 2 -- warcprox/dedup.py | 6 ------ 2 files changed, 8 deletions(-) diff --git a/warcprox/controller.py b/warcprox/controller.py index b3f3eaf..42f71de 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -211,8 +211,6 @@ class WarcproxController(object): if self.proxy.stats_db: self.proxy.stats_db.stop() - if any((t.dedup_db for t in self.warc_writer_threads)): - self.warc_writer_threads[0].dedup_db.close() self.proxy_thread.join() if self.playback_proxy is not None: diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 78c5434..7a7a025 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -58,9 +58,6 @@ class DedupDb(object): def stop(self): pass - def close(self): - pass - def sync(self): pass @@ -154,9 +151,6 @@ class RethinkDedupDb: def stop(self): pass - def close(self): - pass - def sync(self): pass From eb266f198df6531d19cbaa94cee458e6da4a131b Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sun, 24 Sep 2017 13:44:13 +0000 Subject: [PATCH 07/48] Remove redundant stop() & sync() dedup methods Similarly with my previous commits, these methods do nothing. I think that the reason they are here is because the author uses the same style in other places in the code (e.g. ``warcprox.stats.StatsDb``). Similar methods exist there. --- warcprox/dedup.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 7a7a025..59c6040 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -55,12 +55,6 @@ class DedupDb(object): conn.commit() conn.close() - def stop(self): - pass - - def sync(self): - pass - def save(self, digest_key, response_record, bucket=""): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') @@ -148,12 +142,6 @@ class RethinkDedupDb: def start(self): pass - def stop(self): - pass - - def sync(self): - pass - def save(self, digest_key, response_record, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) From faae23d764f5e581076a7ac4441e5c20e6eb2cf1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 27 Sep 2017 17:29:55 -0700 Subject: [PATCH 08/48] allow very long request header lines, to support large warcprox-meta header values --- setup.py | 2 +- tests/test_warcprox.py | 38 ++++++++++++++++++++++++++++++++++++++ warcprox/mitmproxy.py | 6 ++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 25a2e49..0824491 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev100', + version='2.2b1.dev101', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index dd80a86..8b44974 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -1429,6 +1429,44 @@ def test_via_response_header(warcprox_, http_daemon, archiving_proxies, playback elif record.rec_type == 'request': assert record.http_headers.get_header('via') == '1.1 warcprox' +def test_long_warcprox_meta( + warcprox_, http_daemon, archiving_proxies, playback_proxies): + url = 'http://localhost:%s/b/g' % http_daemon.server_port + + # create a very long warcprox-meta header + headers = {'Warcprox-Meta': json.dumps({ + 'x':'y'*1000000, 'warc-prefix': 'test_long_warcprox_meta'})} + response = requests.get( + url, proxies=archiving_proxies, headers=headers, verify=False) + assert response.status_code == 200 + + # wait for writer thread to process + time.sleep(0.5) + while not all(wwt.idle for wwt in warcprox_.warc_writer_threads): + time.sleep(0.5) + time.sleep(0.5) + + # check that warcprox-meta was parsed and honored ("warc-prefix" param) + assert warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] + writer = warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"] + warc_path = os.path.join(writer.directory, writer._f_finalname) + warcprox_.warc_writer_threads[0].writer_pool.warc_writers["test_long_warcprox_meta"].close_writer() + assert os.path.exists(warc_path) + + # read the warc + with open(warc_path, 'rb') as f: + rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f)) + record = next(rec_iter) + assert record.rec_type == 'warcinfo' + record = next(rec_iter) + assert record.rec_type == 'response' + assert record.rec_headers.get_header('warc-target-uri') == url + record = next(rec_iter) + assert record.rec_type == 'request' + assert record.rec_headers.get_header('warc-target-uri') == url + with pytest.raises(StopIteration): + next(rec_iter) + if __name__ == '__main__': pytest.main() diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 6297dcc..914fb52 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -37,6 +37,12 @@ except ImportError: import urlparse as urllib_parse try: import http.client as http_client + # In python3 http.client.parse_headers() enforces http_client._MAXLINE + # as max length of an http header line, but we want to support very + # long warcprox-meta headers, so we tweak it here. Python2 doesn't seem + # to enforce any limit. Multiline headers could be an option but it + # turns out those are illegal as of RFC 7230. Plus, this is easier. + http_client._MAXLINE = 4194304 # 4 MiB except ImportError: import httplib as http_client import socket From 51a2178cbd8fe9d534880e63cfab5d17659d48a1 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 28 Sep 2017 20:35:47 +0000 Subject: [PATCH 09/48] Remove tox.ini, move warcio to test_requires --- setup.py | 6 ++---- tox.ini | 9 --------- 2 files changed, 2 insertions(+), 13 deletions(-) delete mode 100644 tox.ini diff --git a/setup.py b/setup.py index 36984b0..994bf98 100755 --- a/setup.py +++ b/setup.py @@ -40,9 +40,7 @@ deps = [ 'warctools', 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev81', - 'PySocks', - 'warcio', - 'requests' + 'PySocks' ] try: import concurrent.futures @@ -60,7 +58,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ diff --git a/tox.ini b/tox.ini deleted file mode 100644 index bec9629..0000000 --- a/tox.ini +++ /dev/null @@ -1,9 +0,0 @@ -[tox] -envlist=py27,py34,py35 -[testenv] -deps= - pytest - pytest-xdist - requests - mock -commands=py.test -n 4 From 6fd687f2b67eadba8059bee581270d8d740d59c4 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 28 Sep 2017 20:37:15 +0000 Subject: [PATCH 10/48] Add missing "," in deps --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 994bf98..b63df90 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ deps = [ 'warctools', 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev81', - 'PySocks' + 'PySocks', ] try: import concurrent.futures From be6fe83c569eccbda32c266a6f8df607973c082f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 28 Sep 2017 14:37:30 -0700 Subject: [PATCH 11/48] bump dev version number after merging pull requests --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 57322c2..e4bd5e8 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev101', + version='2.2b1.dev102', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 4e7d8fa9173650793c41603c9b733907dbe5c168 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 29 Sep 2017 06:36:37 +0000 Subject: [PATCH 12/48] Remove deleted ``close`` method call from test. --- tests/test_warcprox.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index dd80a86..40bdc24 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -274,12 +274,10 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db): ddb = warcprox.dedup.RethinkDedupDb(rr) def fin(): - if rethinkdb_servers: - ddb.close() - if not captures_db: - logging.info('dropping rethinkdb database {}'.format(db)) - result = ddb.rr.db_drop(db).run() - logging.info("result=%s", result) + if rethinkdb_servers and not captures_db: + logging.info('dropping rethinkdb database {}'.format(db)) + result = ddb.rr.db_drop(db).run() + logging.info("result=%s", result) request.addfinalizer(fin) return ddb From 908988c4f05a264b820e1406eecc0387da8f1583 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 6 Oct 2017 16:57:39 -0700 Subject: [PATCH 13/48] wait for rethinkdb indexes to be ready --- setup.py | 2 +- warcprox/bigtable.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e4bd5e8..f5f7f71 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev102', + version='2.2b1.dev103', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 1064fc8..387d05c 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -113,6 +113,7 @@ class RethinkCaptures: [r.row["abbr_canon_surt"], r.row["timestamp"]]).run() self.rr.table(self.table).index_create("sha1_warc_type", [ r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run() + self.rr.table(self.table).index_wait().run() def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": From 0cc68dd428bcaf0aa0bc4e9f51be346d498cc3ba Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 6 Oct 2017 16:58:27 -0700 Subject: [PATCH 14/48] avoid TypeError: 'NoneType' object is not iterable exception at shutdown --- setup.py | 2 +- warcprox/writerthread.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index f5f7f71..766f1b6 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev103', + version='2.2b1.dev104', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index f8cdd7a..e422a65 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -41,7 +41,7 @@ class WarcWriterThread(threading.Thread): def __init__( self, name='WarcWriterThread', recorded_url_q=None, - writer_pool=None, dedup_db=None, listeners=None, + writer_pool=None, dedup_db=None, listeners=[], options=warcprox.Options()): """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" threading.Thread.__init__(self, name=name) From 9b8043d3a2b268b6708251811223da3c22b1d1ae Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 6 Oct 2017 17:00:35 -0700 Subject: [PATCH 15/48] greatly simplify automated test setup by reusing initialization code from the command line executable; this also has the benefit of testing that initialization code --- setup.py | 2 +- tests/test_warcprox.py | 200 ++++++++--------------------------------- 2 files changed, 40 insertions(+), 162 deletions(-) diff --git a/setup.py b/setup.py index 766f1b6..89f37dc 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev104', + version='2.2b1.dev105', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 216b4af..b24a5c8 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -60,6 +60,7 @@ except ImportError: import certauth.certauth import warcprox +import warcprox.main try: import http.client as http_client @@ -241,166 +242,44 @@ def https_daemon(request, cert): return https_daemon @pytest.fixture(scope="module") -def captures_db(request, rethinkdb_servers, rethinkdb_big_table): - captures_db = None +def warcprox_(request, rethinkdb_servers, rethinkdb_big_table): + orig_dir = os.getcwd() + work_dir = tempfile.mkdtemp() + logging.info('changing to working directory %r', work_dir) + os.chdir(work_dir) + + argv = ['warcprox', + '--method-filter=GET', + '--method-filter=POST', + '--port=0', + '--playback-port=0', + '--onion-tor-socks-proxy=localhost:9050'] if rethinkdb_servers: - servers = rethinkdb_servers.split(",") - if rethinkdb_big_table: - db = 'warcprox_test_captures_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - captures_db = warcprox.bigtable.RethinkCaptures(rr) - captures_db.start() + rethinkdb_db = 'warcprox_test_%s' % ''.join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) + argv.append('--rethinkdb-servers=%s' % rethinkdb_servers) + argv.append('--rethinkdb-db=%s' % rethinkdb_db) + if rethinkdb_big_table: + argv.append('--rethinkdb-big-table') - def fin(): - if captures_db: - captures_db.close() - # logging.info('dropping rethinkdb database {}'.format(db)) - # result = captures_db.rr.db_drop(db).run() - # logging.info("result=%s", result) - request.addfinalizer(fin) + args = warcprox.main.parse_args(argv) + warcprox_ = warcprox.main.init_controller(args) - return captures_db - -@pytest.fixture(scope="module") -def rethink_dedup_db(request, rethinkdb_servers, captures_db): - ddb = None - if rethinkdb_servers: - if captures_db: - ddb = warcprox.bigtable.RethinkCapturesDedup(captures_db) - else: - servers = rethinkdb_servers.split(",") - db = 'warcprox_test_dedup_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - ddb = warcprox.dedup.RethinkDedupDb(rr) - - def fin(): - if rethinkdb_servers and not captures_db: - logging.info('dropping rethinkdb database {}'.format(db)) - result = ddb.rr.db_drop(db).run() - logging.info("result=%s", result) - request.addfinalizer(fin) - - return ddb - -@pytest.fixture(scope="module") -def dedup_db(request, rethink_dedup_db): - dedup_db_file = None - ddb = rethink_dedup_db - if not ddb: - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-dedup-', suffix='.db', delete=False) - f.close() - dedup_db_file = f.name - ddb = warcprox.dedup.DedupDb(dedup_db_file) - - def fin(): - if dedup_db_file: - logging.info('deleting file {}'.format(dedup_db_file)) - os.unlink(dedup_db_file) - request.addfinalizer(fin) - - return ddb - -@pytest.fixture(scope="module") -def stats_db(request, rethinkdb_servers): - if rethinkdb_servers: - servers = rethinkdb_servers.split(",") - db = 'warcprox_test_stats_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - sdb = warcprox.stats.RethinkStatsDb(rr) - sdb.start() - else: - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-stats-', suffix='.db', delete=False) - f.close() - stats_db_file = f.name - sdb = warcprox.stats.StatsDb(stats_db_file) - - def fin(): - sdb.close() - if rethinkdb_servers: - logging.info('dropping rethinkdb database {}'.format(db)) - result = sdb.rr.db_drop(db).run() - logging.info("result=%s", result) - # else: - # logging.info('deleting file {}'.format(stats_db_file)) - # os.unlink(stats_db_file) - request.addfinalizer(fin) - - return sdb - -@pytest.fixture(scope="module") -def service_registry(request, rethinkdb_servers): - if rethinkdb_servers: - servers = rethinkdb_servers.split(",") - db = 'warcprox_test_services_' + "".join(random.sample("abcdefghijklmnopqrstuvwxyz0123456789_",8)) - rr = doublethink.Rethinker(servers, db) - - def fin(): - logging.info('dropping rethinkdb database {}'.format(db)) - result = rr.db_drop(db).run() - logging.info("result=%s", result) - request.addfinalizer(fin) - - return doublethink.ServiceRegistry(rr) - else: - return None - -@pytest.fixture(scope="module") -def warcprox_(request, captures_db, dedup_db, stats_db, service_registry): - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-ca-', suffix='.pem', delete=True) - f.close() # delete it, or CertificateAuthority will try to read it - ca_file = f.name - ca_dir = tempfile.mkdtemp(prefix='warcprox-test-', suffix='-ca') - ca = certauth.certauth.CertificateAuthority(ca_file, ca_dir, 'warcprox-test') - - recorded_url_q = warcprox.TimestampedQueue() - - options = warcprox.Options(port=0, playback_port=0, - onion_tor_socks_proxy='localhost:9050') - proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q, - stats_db=stats_db, options=options) - options.port = proxy.server_port - - options.directory = tempfile.mkdtemp(prefix='warcprox-test-warcs-') - - f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False) - f.close() - playback_index_db_file = f.name - playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file) - playback_proxy = warcprox.playback.PlaybackProxy(ca=ca, - playback_index_db=playback_index_db, options=options) - options.playback_proxy = playback_proxy.server_port - - options.method_filter = ['GET','POST'] - - writer_pool = warcprox.writer.WarcWriterPool(options) - warc_writer_threads = [ - warcprox.writerthread.WarcWriterThread( - recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, listeners=[ - captures_db or dedup_db, playback_index_db, stats_db], - options=options) - for i in range(int(proxy.max_threads ** 0.5))] - - warcprox_ = warcprox.controller.WarcproxController( - proxy=proxy, warc_writer_threads=warc_writer_threads, - playback_proxy=playback_proxy, service_registry=service_registry, - options=options) logging.info('starting warcprox') - warcprox_thread = threading.Thread(name='WarcproxThread', - target=warcprox_.run_until_shutdown) + warcprox_thread = threading.Thread( + name='WarcproxThread', target=warcprox_.run_until_shutdown) warcprox_thread.start() def fin(): - logging.info('stopping warcprox') warcprox_.stop.set() warcprox_thread.join() - for f in (ca_file, ca_dir, options.directory, playback_index_db_file): - if os.path.isdir(f): - logging.info('deleting directory {}'.format(f)) - shutil.rmtree(f) - else: - logging.info('deleting file {}'.format(f)) - os.unlink(f) + if rethinkdb_servers: + logging.info('dropping rethinkdb database %r', rethinkdb_db) + rr = doublethink.Rethinker(rethinkdb_servers) + result = rr.db_drop(rethinkdb_db).run() + logging.info('deleting working directory %r', work_dir) + os.chdir(orig_dir) + shutil.rmtree(work_dir) + request.addfinalizer(fin) return warcprox_ @@ -1226,9 +1105,8 @@ def test_method_filter( assert response.content == payload def test_dedup_ok_flag( - https_daemon, http_daemon, warcprox_, archiving_proxies, - rethinkdb_big_table): - if not rethinkdb_big_table: + https_daemon, http_daemon, warcprox_, archiving_proxies): + if not warcprox_.options.rethinkdb_big_table: # this feature is n/a unless using rethinkdb big table return @@ -1312,11 +1190,11 @@ def test_status_api(warcprox_): assert status['pid'] == os.getpid() assert status['threads'] == warcprox_.proxy.pool._max_workers -def test_svcreg_status(warcprox_, service_registry): - if service_registry: +def test_svcreg_status(warcprox_): + if warcprox_.service_registry: start = time.time() while time.time() - start < 15: - status = service_registry.available_service('warcprox') + status = warcprox_.service_registry.available_service('warcprox') if status: break time.sleep(0.5) @@ -1373,11 +1251,11 @@ def test_controller_with_defaults(): assert not wwt.writer_pool.default_warc_writer.record_builder.base32 assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1' -def test_choose_a_port_for_me(service_registry): +def test_choose_a_port_for_me(warcprox_): options = warcprox.Options() options.port = 0 controller = warcprox.controller.WarcproxController( - service_registry=service_registry, options=options) + service_registry=warcprox_.service_registry, options=options) assert controller.proxy.server_port != 0 assert controller.proxy.server_port != 8000 assert controller.proxy.server_address == ( @@ -1393,12 +1271,12 @@ def test_choose_a_port_for_me(service_registry): status = json.loads(response.content.decode('ascii')) assert status['port'] == controller.proxy.server_port - if service_registry: + if warcprox_.service_registry: # check that service registry entry lists the correct port start = time.time() ports = [] while time.time() - start < 30: - svcs = service_registry.available_services('warcprox') + svcs = warcprox_.service_registry.available_services('warcprox') ports = [svc['port'] for svc in svcs] if controller.proxy.server_port in ports: break From bd23e37dc027ceeee9b09038623170a8020d67df Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 19:27:15 +0000 Subject: [PATCH 16/48] Stop using WarcRecord.REFERS_TO header and use payload_digest instead Stop adding WarcRecord.REFERS_TO when building WARC record. Methods ``warc.WarcRecordBuilder._build_response_principal_record`` and ``warc.WarcRecordBuilder.build_warc_record``. Replace ``record_id`` (WarcRecord.REFERS_TO) with payload_digest in ``playback``. Playback database has ``{'f': warcfile, 'o': offset, 'd': payload_digest}`` instead of ``'i': record_id``. Make all ``dedup`` classes return only `url` and `date`. Drop `id`. --- warcprox/bigtable.py | 2 -- warcprox/dedup.py | 7 ++----- warcprox/playback.py | 39 ++++++++++++++++++++------------------- warcprox/warc.py | 7 ++----- 4 files changed, 24 insertions(+), 31 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..c3a9bd8 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -233,8 +233,6 @@ class RethinkCapturesDedup: "url": entry["url"].encode("utf-8"), "date": entry["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8"), } - if "warc_id" in entry: - dedup_info["id"] = entry["warc_id"].encode("utf-8") return dedup_info else: return None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..79be80f 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -55,13 +55,12 @@ class DedupDb(object): conn.close() def save(self, digest_key, response_record, bucket=""): - record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') key = digest_key.decode('utf-8') + "|" + bucket - py_value = {'id':record_id, 'url':url, 'date':date} + py_value = {'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) conn = sqlite3.connect(self.file) @@ -81,7 +80,6 @@ class DedupDb(object): conn.close() if result_tuple: result = json.loads(result_tuple[0]) - result['id'] = result['id'].encode('latin1') result['url'] = result['url'].encode('latin1') result['date'] = result['date'].encode('latin1') self.logger.debug('dedup db lookup of key=%s returning %s', key, result) @@ -144,10 +142,9 @@ class RethinkDedupDb: def save(self, digest_key, response_record, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) - record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record = {'key':k,'url':url,'date':date,'id':record_id} + record = {'key': k, 'url': url, 'date': date} result = self.rr.table(self.table).insert( record, conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: diff --git a/warcprox/playback.py b/warcprox/playback.py index 663e10a..af4639f 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler): def _send_headers_and_refd_payload( - self, headers, refers_to, refers_to_target_uri, refers_to_date): + self, headers, refers_to_target_uri, refers_to_date, payload_digest): + """Parameters: + + """ location = self.server.playback_index_db.lookup_exact( - refers_to_target_uri, refers_to_date, record_id=refers_to) + refers_to_target_uri, refers_to_date, payload_digest) self.logger.debug('loading http payload from {}'.format(location)) fh = self._open_warc_at_offset(location['f'], location['o']) @@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler): if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST: raise Exception('unknown revisit record profile {}'.format(warc_profile)) - refers_to = record.get_header( - warctools.WarcRecord.REFERS_TO).decode('latin1') refers_to_target_uri = record.get_header( warctools.WarcRecord.REFERS_TO_TARGET_URI).decode( 'latin1') refers_to_date = record.get_header( warctools.WarcRecord.REFERS_TO_DATE).decode('latin1') - + payload_digest = record.get_header( + warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') self.logger.debug( 'revisit record references %s:%s capture of %s', - refers_to_date, refers_to, refers_to_target_uri) + refers_to_date, payload_digest, refers_to_target_uri) return self._send_headers_and_refd_payload( - record.content[1], refers_to, refers_to_target_uri, - refers_to_date) + record.content[1], refers_to_target_uri, refers_to_date, + payload_digest) else: # send it back raw, whatever it is @@ -264,12 +266,12 @@ class PlaybackIndexDb(object): # XXX canonicalize url? url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') + payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1') # there could be two visits of same url in the same second, and WARC-Date is # prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\ - # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} + # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...} with self._lock: conn = sqlite3.connect(self.file) @@ -283,10 +285,10 @@ class PlaybackIndexDb(object): if date_str in py_value: py_value[date_str].append( - {'f':warcfile, 'o':offset, 'i':record_id_str}) + {'f': warcfile, 'o': offset, 'd': payload_digest_str}) else: py_value[date_str] = [ - {'f':warcfile, 'o':offset, 'i':record_id_str}] + {'f': warcfile, 'o': offset, 'd': payload_digest_str}] json_value = json.dumps(py_value, separators=(',',':')) @@ -314,11 +316,11 @@ class PlaybackIndexDb(object): latest_date = max(py_value) result = py_value[latest_date][0] - result['i'] = result['i'].encode('ascii') + result['d'] = result['d'].encode('ascii') return latest_date, result # in python3 params are bytes - def lookup_exact(self, url, warc_date, record_id): + def lookup_exact(self, url, warc_date, payload_digest): conn = sqlite3.connect(self.file) cursor = conn.execute( 'select value from playback where url = ?', (url,)) @@ -334,14 +336,13 @@ class PlaybackIndexDb(object): if warc_date in py_value: for record in py_value[warc_date]: - if record['i'] == record_id: + if record['d'] == payload_digest: self.logger.debug( "found exact match for (%r,%r,%r)", - warc_date, record_id, url) - record['i'] = record['i'] + warc_date, payload_digest, url) + record['d'] = record['d'] return record else: self.logger.info( - "match not found for (%r,%r,%r)", warc_date, record_id, url) + "match not found for (%r,%r,%r)", warc_date, payload_digest, url) return None - diff --git a/warcprox/warc.py b/warcprox/warc.py index 51b1c35..53e049f 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -50,7 +50,6 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, - refers_to=recorded_url.dedup_info['id'], refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), @@ -87,8 +86,8 @@ class WarcRecordBuilder: def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, - profile=None, refers_to=None, refers_to_target_uri=None, - refers_to_date=None, payload_digest=None): + profile=None, refers_to_target_uri=None, refers_to_date=None, + payload_digest=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow()) @@ -105,8 +104,6 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if profile is not None: headers.append((warctools.WarcRecord.PROFILE, profile)) - if refers_to is not None: - headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) if refers_to_target_uri is not None: headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) if refers_to_date is not None: From f7240a33d797dd53a76cc3aacc8b699f970bb540 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 19:42:41 +0000 Subject: [PATCH 17/48] Replace invalid warcfilename variable in playback A warcfilename variable which does not exists is used here. Replace it with the current variable for filename. --- warcprox/playback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/playback.py b/warcprox/playback.py index 663e10a..7a9fead 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -131,7 +131,7 @@ class PlaybackProxyHandler(MitmProxyHandler): pass if errors: - raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors)) + raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors)) warc_type = record.get_header(warctools.WarcRecord.TYPE) if warc_type != warctools.WarcRecord.RESPONSE: From ad8ba43c3d49fd2450dd4fd22d56d1962b8d0c57 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 20:38:04 +0000 Subject: [PATCH 18/48] Update unit test --- tests/test_warcprox.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b24a5c8..26e3d3f 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -406,13 +406,11 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) time.sleep(0.5) # check in dedup db - # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -435,7 +433,6 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') - assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -479,13 +476,11 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie time.sleep(0.5) # check in dedup db - # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -508,7 +503,6 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') - assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -576,9 +570,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") assert dedup_lookup['url'] == url1.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b @@ -603,9 +595,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') - assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) - record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # archive url2 bucket_a From 424f236126e3078f8b92569fe19d9ebfc99a31f2 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 22:04:56 +0000 Subject: [PATCH 19/48] Revert warc to previous behavior If record_id is available, write it to REFERS_TO header. --- warcprox/warc.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/warcprox/warc.py b/warcprox/warc.py index 53e049f..de0ec06 100644 --- a/warcprox/warc.py +++ b/warcprox/warc.py @@ -50,6 +50,7 @@ class WarcRecordBuilder: url=recorded_url.url, warc_date=warc_date, data=response_header_block, warc_type=warctools.WarcRecord.REVISIT, + refers_to=recorded_url.dedup_info.get('id'), refers_to_target_uri=recorded_url.dedup_info['url'], refers_to_date=recorded_url.dedup_info['date'], payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32), @@ -86,8 +87,8 @@ class WarcRecordBuilder: def build_warc_record(self, url, warc_date=None, recorder=None, data=None, concurrent_to=None, warc_type=None, content_type=None, remote_ip=None, - profile=None, refers_to_target_uri=None, refers_to_date=None, - payload_digest=None): + profile=None, refers_to=None, refers_to_target_uri=None, + refers_to_date=None, payload_digest=None): if warc_date is None: warc_date = warctools.warc.warc_datetime_str(datetime.datetime.utcnow()) @@ -104,6 +105,8 @@ class WarcRecordBuilder: headers.append((warctools.WarcRecord.IP_ADDRESS, remote_ip)) if profile is not None: headers.append((warctools.WarcRecord.PROFILE, profile)) + if refers_to is not None: + headers.append((warctools.WarcRecord.REFERS_TO, refers_to)) if refers_to_target_uri is not None: headers.append((warctools.WarcRecord.REFERS_TO_TARGET_URI, refers_to_target_uri)) if refers_to_date is not None: From 97e52b8f7b4c64b32371788219f30d59265c0647 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 16 Oct 2017 02:28:09 +0000 Subject: [PATCH 20/48] Revert changes to bigtable and dedup --- warcprox/bigtable.py | 2 ++ warcprox/dedup.py | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index c3a9bd8..387d05c 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -233,6 +233,8 @@ class RethinkCapturesDedup: "url": entry["url"].encode("utf-8"), "date": entry["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8"), } + if "warc_id" in entry: + dedup_info["id"] = entry["warc_id"].encode("utf-8") return dedup_info else: return None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 79be80f..fd1ada4 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -55,12 +55,13 @@ class DedupDb(object): conn.close() def save(self, digest_key, response_record, bucket=""): + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') key = digest_key.decode('utf-8') + "|" + bucket - py_value = {'url':url, 'date':date} + py_value = {'id':record_id, 'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) conn = sqlite3.connect(self.file) @@ -80,6 +81,7 @@ class DedupDb(object): conn.close() if result_tuple: result = json.loads(result_tuple[0]) + result['id'] = result['id'].encode('latin1') result['url'] = result['url'].encode('latin1') result['date'] = result['date'].encode('latin1') self.logger.debug('dedup db lookup of key=%s returning %s', key, result) @@ -142,9 +144,10 @@ class RethinkDedupDb: def save(self, digest_key, response_record, bucket=""): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) + record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') - record = {'key': k, 'url': url, 'date': date} + record = {'key':k,'url':url,'date':date,'id':record_id} result = self.rr.table(self.table).insert( record, conflict="replace").run() if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: From 9ce3132510361c0790dc070a0f8eaa402da24db1 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 16 Oct 2017 02:41:43 +0000 Subject: [PATCH 21/48] Revert changes to test_warcprox.py --- tests/test_warcprox.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 26e3d3f..b24a5c8 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -406,11 +406,13 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) time.sleep(0.5) # check in dedup db - # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -433,6 +435,7 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') assert dedup_lookup['url'] == url.encode('ascii') + assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -476,11 +479,13 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie time.sleep(0.5) # check in dedup db - # {u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} + # {u'id': u'', u'url': u'https://localhost:62841/c/d', u'date': u'2013-11-22T00:14:37Z'} dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # need revisit to have a later timestamp than original, else playing @@ -503,6 +508,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') assert dedup_lookup['url'] == url.encode('ascii') + assert dedup_lookup['id'] == record_id assert dedup_lookup['date'] == dedup_date # test playback @@ -570,7 +576,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # check url1 not in dedup db bucket_b @@ -595,7 +603,9 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, dedup_lookup = warcprox_.warc_writer_threads[0].dedup_db.lookup( b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") assert dedup_lookup['url'] == url2.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] dedup_date = dedup_lookup['date'] # archive url2 bucket_a From 5ed47b387183b9ba4fd529edaba948310f389aed Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 16 Oct 2017 11:37:49 -0700 Subject: [PATCH 22/48] cryptography lib version 2.1.1 is causing problems --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 89f37dc..34d11b5 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ deps = [ 'urlcanon>=0.1.dev16', 'doublethink>=0.2.0.dev81', 'PySocks', + 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu ] try: import concurrent.futures @@ -49,7 +50,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev105', + version='2.2b1.dev106', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From fc5f39ffed1b88bd1d5a809c13f779d9a2ead56b Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 17:44:07 +0000 Subject: [PATCH 23/48] Add CDX Server based deduplication Add ``--cdxserver-dedup URL`` option. Create ``warcprox.dedup.CdxServerDedup`` class. Add dummy unit test (TODO) --- README.rst | 4 +++ setup.py | 1 + tests/test_dedup.py | 10 ++++++ warcprox/dedup.py | 86 +++++++++++++++++++++++++++++++++++++++++++-- warcprox/main.py | 5 +++ 5 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 tests/test_dedup.py diff --git a/README.rst b/README.rst index b9c1c5f..8adcafa 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,7 @@ Usage [--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] + [--cdxserver-dedup CDX_SERVER_URL] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] @@ -100,6 +101,9 @@ Usage persistent deduplication database file; empty string or /dev/null disables deduplication (default: ./warcprox.sqlite) + --cdxserver-dedup CDX_SERVER_URL + use a CDX server for deduplication + (default: None) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. diff --git a/setup.py b/setup.py index 34d11b5..b9308e2 100755 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', + 'urllib3', 'doublethink>=0.2.0.dev81', 'PySocks', 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..7836d27 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,10 @@ +import pytest +from warcprox.dedup import CdxServerDedup + + +def test_cdx(): + # TODO add mocking of CDX Server response + # TODO check found and not found cases + cdx_server = CdxServerDedup(cdx_url="https://web.archive.org/cdx/search/cdx") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url="http://example.com") diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..a3c89f7 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -21,12 +21,16 @@ USA. from __future__ import absolute_import +from datetime import datetime import logging import os import json from hanzo import warctools import warcprox import sqlite3 +import urllib3 + +urllib3.disable_warnings() class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -107,9 +111,16 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + if isinstance(dedup_db, CdxServerDedup): + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + if isinstance(dedup_db, CdxServerDedup): + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -174,3 +185,74 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) + +def _split_timestamp(timestamp): + """split `timestamp` into a tuple of 6 integers. + + :param timestamp: full-length timestamp. + :type timestamp: bytes + """ + return ( + int(timestamp[:-10]), + int(timestamp[-10:-8]), + int(timestamp[-8:-6]), + int(timestamp[-6:-4]), + int(timestamp[-4:-2]), + int(timestamp[-2:]) + ) + + +class CdxServerDedup(object): + """Query a CDX server to perform deduplication. + """ + logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + + def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", + options=warcprox.Options()): + self.http_pool = urllib3.PoolManager() + self.cdx_url = cdx_url + self.options = options + + def start(self): + pass + + def save(self, digest_key, response_record, bucket=""): + """Does not apply to CDX server, as it is obviously read-only. + """ + pass + + def lookup(self, digest_key, recorded_url): + """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be + computed on the original content, after decoding Content-Encoding and + Transfer-Encoding, if any), if they match, write a revisit record. + + :param digest_key: b'sha1:'. + Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + :param recorded_url: RecordedUrl object + Result must contain: + {"url", "date": "%Y-%m-%dT%H:%M:%SZ", "id": "warc_id" if available} + """ + url = recorded_url.url + u = url.decode("utf-8") if isinstance(url, bytes) else url + try: + result = self.http_pool.request('GET', self.cdx_url, fields=dict( + url=u, fl="timestamp,digest", limit=-1)) + except urllib3.HTTPError as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) + if result.status == 200: + digest_key = digest_key[5:] # drop sha1: prefix + for line in result.data.split(b'\n'): + if line: + (cdx_ts, cdx_digest) = line.split(b' ') + if cdx_digest == digest_key: + dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) + # TODO find out id + return dict(id=url, url=url, + date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + return None + + def notify(self, recorded_url, records): + """Since we don't save anything to CDX server, this does not apply. + """ + pass diff --git a/warcprox/main.py b/warcprox/main.py index 7b7314b..2d0414b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -106,6 +106,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', + help='use a CDX Server for deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', @@ -189,6 +191,9 @@ def init_controller(args): else: dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) listeners.append(dedup_db) + elif args.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) + listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None From 960dda4c319816cf9733367255e313e67c512e45 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:11:22 +0000 Subject: [PATCH 24/48] Add CdxServerDedup unit tests and improve its exception handling Add multiple ``CdxServerDedup`` unit tests to simulate found, not found and invalid responses from the CDX server. Use a different file ``tests/test_dedup.py`` because we test the CdxServerDedup component individually and it belongs to the ``warcprox.dedup`` package. Add ``mock`` package to dev requirements. Rework the warcprox.dedup.CdxServerDedup class to have better exception handling. --- setup.py | 2 +- tests/test_dedup.py | 54 ++++++++++++++++++++++++++++++++++++++++----- warcprox/dedup.py | 26 +++++++++++++--------- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/setup.py b/setup.py index b9308e2..228ece7 100755 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 7836d27..e1b7482 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -1,10 +1,52 @@ +import mock import pytest from warcprox.dedup import CdxServerDedup -def test_cdx(): - # TODO add mocking of CDX Server response - # TODO check found and not found cases - cdx_server = CdxServerDedup(cdx_url="https://web.archive.org/cdx/search/cdx") - res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - url="http://example.com") +def test_cdx_dedup(): + # Mock CDX Server responses to simulate found, not found and errors. + with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: + recorded_url = mock.Mock(); + recorded_url.url = "http://example.com" + # not found case + result = mock.Mock() + result.status = 200 + result.data = b'20170101020405 test' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res is None + + # found in the 2nd CDX line + result = mock.Mock() + result.status = 200 + result.data = b"""\ +20170101020304 xxx +20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A +20160505050505 yyyyyyyyyyyyyyyyyyyyyy""" + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res["url"] == "http://example.com" + assert res["date"] == "2017-02-03T04:05:03Z" + + # invalid CDX result status code + result = mock.Mock() + result.status = 400 + result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res is None + # invalid CDX result content + result = mock.Mock() + result.status = 200 + result.data = b'InvalidExceptionResult' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res is None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index a3c89f7..8aa9c16 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -29,6 +29,7 @@ from hanzo import warctools import warcprox import sqlite3 import urllib3 +from urllib3.exceptions import HTTPError urllib3.disable_warnings() @@ -206,10 +207,10 @@ class CdxServerDedup(object): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + http_pool = urllib3.PoolManager() def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", options=warcprox.Options()): - self.http_pool = urllib3.PoolManager() self.cdx_url = cdx_url self.options = options @@ -226,30 +227,33 @@ class CdxServerDedup(object): computed on the original content, after decoding Content-Encoding and Transfer-Encoding, if any), if they match, write a revisit record. - :param digest_key: b'sha1:'. + :param digest_key: b'sha1:' (prefix is optional). Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' :param recorded_url: RecordedUrl object Result must contain: - {"url", "date": "%Y-%m-%dT%H:%M:%SZ", "id": "warc_id" if available} + {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} """ url = recorded_url.url u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", limit=-1)) - except urllib3.HTTPError as exc: - self.logger.error('CdxServerDedup request failed for url=%s %s', - url, exc) - if result.status == 200: - digest_key = digest_key[5:] # drop sha1: prefix + assert result.status == 200 + if isinstance(digest_key, bytes): + dkey = digest_key + else: + dkey = digest_key.encode('utf-8') + dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey for line in result.data.split(b'\n'): if line: (cdx_ts, cdx_digest) = line.split(b' ') - if cdx_digest == digest_key: + if cdx_digest == dkey: dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) - # TODO find out id - return dict(id=url, url=url, + return dict(url=url, date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + except (HTTPError, AssertionError, ValueError) as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) return None def notify(self, recorded_url, records): From 59e995ccdf05bcdff2094d6a223130dbf7bf5811 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:22:14 +0000 Subject: [PATCH 25/48] Add mock pkg to run-tests.sh --- tests/run-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 334cfc2..80db2f8 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -40,7 +40,7 @@ do && (cd /warcprox && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ - && pip --log-file /tmp/pip.log install . pytest requests warcio \ + && pip --log-file /tmp/pip.log install . pytest mock requests warcio \ && py.test -v tests \ && py.test -v --rethinkdb-servers=localhost tests \ && py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" From a0821575b4c16673c4b4e1c831c549eba0f8fff8 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:54:34 +0000 Subject: [PATCH 26/48] Fix bug with dedup_info date encoding --- warcprox/dedup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8aa9c16..6258860 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -209,7 +209,7 @@ class CdxServerDedup(object): logger = logging.getLogger("warcprox.dedup.CdxServerDedup") http_pool = urllib3.PoolManager() - def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", + def __init__(self, cdx_url="https://web.archive.org/cdx/search", options=warcprox.Options()): self.cdx_url = cdx_url self.options = options @@ -237,7 +237,7 @@ class CdxServerDedup(object): u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( - url=u, fl="timestamp,digest", limit=-1)) + url=u, fl="timestamp,digest", limit=-10)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key @@ -249,8 +249,8 @@ class CdxServerDedup(object): (cdx_ts, cdx_digest) = line.split(b' ') if cdx_digest == dkey: dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) - return dict(url=url, - date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) except (HTTPError, AssertionError, ValueError) as exc: self.logger.error('CdxServerDedup request failed for url=%s %s', url, exc) From bc3d0cb4f6680c73b93cb0221c7842839505f0d9 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:57:33 +0000 Subject: [PATCH 27/48] Fix minor CdxServerDedup unit test --- tests/test_dedup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index e1b7482..5a0ca3b 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -29,8 +29,7 @@ def test_cdx_dedup(): cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", recorded_url=recorded_url) - assert res["url"] == "http://example.com" - assert res["date"] == "2017-02-03T04:05:03Z" + assert res["date"] == b"2017-02-03T04:05:03Z" # invalid CDX result status code result = mock.Mock() From 202d664f3906716f15b52833a43a0e0c5eae9226 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 20 Oct 2017 20:00:02 +0000 Subject: [PATCH 28/48] Improve CdxServerDedup implementation Replace ``_split_timestamp`` with ``datetime.strptime`` in ``warcprox.dedup``. Remove ``isinstance()`` and add optional ``record_url`` in the rest of the dedup ``lookup`` methods. Make `--cdxserver-dedup` option help more explanatory. --- warcprox/dedup.py | 35 +++++++---------------------------- warcprox/main.py | 2 +- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 6258860..41b9249 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -77,7 +77,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", recorded_url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -112,16 +112,10 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - if isinstance(dedup_db, CdxServerDedup): - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url) - else: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url) else: - if isinstance(dedup_db, CdxServerDedup): - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url) - else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url=recorded_url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -166,7 +160,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", recorded_url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -187,22 +181,6 @@ class RethinkDedupDb: self.save(digest_key, records[0]) -def _split_timestamp(timestamp): - """split `timestamp` into a tuple of 6 integers. - - :param timestamp: full-length timestamp. - :type timestamp: bytes - """ - return ( - int(timestamp[:-10]), - int(timestamp[-10:-8]), - int(timestamp[-8:-6]), - int(timestamp[-6:-4]), - int(timestamp[-4:-2]), - int(timestamp[-2:]) - ) - - class CdxServerDedup(object): """Query a CDX server to perform deduplication. """ @@ -248,7 +226,8 @@ class CdxServerDedup(object): if line: (cdx_ts, cdx_digest) = line.split(b' ') if cdx_digest == dkey: - dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) + dt = datetime.strptime(cdx_ts.decode('ascii'), + '%Y%m%d%H%M%S') date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') return dict(url=url, date=date) except (HTTPError, AssertionError, ValueError) as exc: diff --git a/warcprox/main.py b/warcprox/main.py index 2d0414b..76e194a 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -107,7 +107,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', - help='use a CDX Server for deduplication') + help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', From f77aef91108c4398d56fb13aee885c236901e635 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 20 Oct 2017 21:59:43 +0000 Subject: [PATCH 29/48] Filter out warc/revisit records in CdxServerDedup --- warcprox/dedup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 41b9249..53b27c9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -215,7 +215,8 @@ class CdxServerDedup(object): u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( - url=u, fl="timestamp,digest", limit=-10)) + url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", + limit=-10)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key From 4fb44a7e9d4b1579b4b97d0804ca922cf219eead Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sat, 21 Oct 2017 20:24:28 +0000 Subject: [PATCH 30/48] Pass url instead of recorded_url obj to dedup lookup methods --- tests/test_dedup.py | 12 +++++------- warcprox/dedup.py | 14 +++++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 5a0ca3b..591337e 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -1,13 +1,11 @@ import mock -import pytest from warcprox.dedup import CdxServerDedup def test_cdx_dedup(): # Mock CDX Server responses to simulate found, not found and errors. with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: - recorded_url = mock.Mock(); - recorded_url.url = "http://example.com" + url = "http://example.com" # not found case result = mock.Mock() result.status = 200 @@ -15,7 +13,7 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res is None # found in the 2nd CDX line @@ -28,7 +26,7 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res["date"] == b"2017-02-03T04:05:03Z" # invalid CDX result status code @@ -38,7 +36,7 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res is None # invalid CDX result content result = mock.Mock() @@ -47,5 +45,5 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res is None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 53b27c9..1513946 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -77,7 +77,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket="", recorded_url=None): + def lookup(self, digest_key, bucket="", url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -113,9 +113,10 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url) + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url=recorded_url) + recorded_url.dedup_info = dedup_db.lookup(digest_key, + url=recorded_url.url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -160,7 +161,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket="", recorded_url=None): + def lookup(self, digest_key, bucket="", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -200,18 +201,17 @@ class CdxServerDedup(object): """ pass - def lookup(self, digest_key, recorded_url): + def lookup(self, digest_key, url): """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be computed on the original content, after decoding Content-Encoding and Transfer-Encoding, if any), if they match, write a revisit record. :param digest_key: b'sha1:' (prefix is optional). Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' - :param recorded_url: RecordedUrl object + :param url: Target URL string Result must contain: {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} """ - url = recorded_url.url u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( From f6b1d6f40879642c754a20be5504574667e7bf06 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sat, 21 Oct 2017 20:45:46 +0000 Subject: [PATCH 31/48] Update CdxServerDedup lookup algorithm Get only one item from CDX (``limit=-1``). Update unit tests --- tests/test_dedup.py | 7 ++----- warcprox/dedup.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 591337e..124efb5 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -16,13 +16,10 @@ def test_cdx_dedup(): url=url) assert res is None - # found in the 2nd CDX line + # found case result = mock.Mock() result.status = 200 - result.data = b"""\ -20170101020304 xxx -20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A -20160505050505 yyyyyyyyyyyyyyyyyyyyyy""" + result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 1513946..08bbf23 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -216,21 +216,21 @@ class CdxServerDedup(object): try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-10)) + limit=-1)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key else: dkey = digest_key.encode('utf-8') dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey - for line in result.data.split(b'\n'): - if line: - (cdx_ts, cdx_digest) = line.split(b' ') - if cdx_digest == dkey: - dt = datetime.strptime(cdx_ts.decode('ascii'), - '%Y%m%d%H%M%S') - date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') - return dict(url=url, date=date) + line = result.data.split(b'\n') + if line: + (cdx_ts, cdx_digest) = line[0].split(b' ') + if cdx_digest == dkey: + dt = datetime.strptime(cdx_ts.decode('ascii'), + '%Y%m%d%H%M%S') + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) except (HTTPError, AssertionError, ValueError) as exc: self.logger.error('CdxServerDedup request failed for url=%s %s', url, exc) From e538637b65fd87a1bda8c38b8b63abb374a11ed1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 23 Oct 2017 12:49:32 -0700 Subject: [PATCH 32/48] fix benchmarks (update command line args) --- benchmarks/run-benchmarks.py | 129 +++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/benchmarks/run-benchmarks.py b/benchmarks/run-benchmarks.py index f595f8b..a05db59 100755 --- a/benchmarks/run-benchmarks.py +++ b/benchmarks/run-benchmarks.py @@ -163,78 +163,87 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later. arg_parser = argparse.ArgumentParser( prog=prog, description=desc, formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter) - arg_parser.add_argument( - '-z', '--gzip', dest='gzip', action='store_true', + + ### these warcprox options are not configurable for the benchmarks + # arg_parser.add_argument('-p', '--port', dest='port', default='8000', + # type=int, help='port to listen on') + # arg_parser.add_argument('-b', '--address', dest='address', + # default='localhost', help='address to listen on') + # arg_parser.add_argument('-c', '--cacert', dest='cacert', + # default='./{0}-warcprox-ca.pem'.format(socket.gethostname()), + # help='CA certificate file; if file does not exist, it will be created') + # arg_parser.add_argument('--certs-dir', dest='certs_dir', + # default='./{0}-warcprox-ca'.format(socket.gethostname()), + # help='where to store and load generated certificates') + # arg_parser.add_argument('-d', '--dir', dest='directory', + # default='./warcs', help='where to write warcs') + + arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('-n', '--prefix', dest='prefix', + default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( - '-s', '--size', dest='size', default=1000*1000*1000, type=int, - help='WARC file rollover size threshold in bytes') - arg_parser.add_argument( - '--rollover-idle-time', dest='rollover_idle_time', default=None, - type=int, help=( - 'WARC file rollover idle time threshold in seconds (so that ' - "Friday's last open WARC doesn't sit there all weekend " - 'waiting for more data)')) + '-s', '--size', dest='rollover_size', default=1000*1000*1000, + type=int, help='WARC file rollover size threshold in bytes') + arg_parser.add_argument('--rollover-idle-time', + dest='rollover_idle_time', default=None, type=int, + help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") try: hash_algos = hashlib.algorithms_guaranteed except AttributeError: hash_algos = hashlib.algorithms - arg_parser.add_argument( - '-g', '--digest-algorithm', dest='digest_algorithm', - default='sha1', help='digest algorithm, one of %s' % hash_algos) + arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm', + default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos))) arg_parser.add_argument('--base32', dest='base32', action='store_true', default=False, help='write digests in Base32 instead of hex') - arg_parser.add_argument( - '--method-filter', metavar='HTTP_METHOD', - action='append', help=( - 'only record requests with the given http method(s) (can be ' - 'used more than once)')) - arg_parser.add_argument( - '--stats-db-file', dest='stats_db_file', - default=os.path.join(tmpdir, 'stats.db'), help=( - 'persistent statistics database file; empty string or ' - '/dev/null disables statistics tracking')) + arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD', + action='append', help='only record requests with the given http method(s) (can be used more than once)') + arg_parser.add_argument('--stats-db-file', dest='stats_db_file', + default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') + arg_parser.add_argument('-P', '--playback-port', dest='playback_port', + type=int, default=None, help='port to listen on for instant playback') + arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', + default='./warcprox-playback-index.db', + help='playback index database file (only used if --playback-port is specified)') group = arg_parser.add_mutually_exclusive_group() - group.add_argument( - '-j', '--dedup-db-file', dest='dedup_db_file', - default=os.path.join(tmpdir, 'dedup.db'), help=( - 'persistent deduplication database file; empty string or ' - '/dev/null disables deduplication')) - group.add_argument( - '--rethinkdb-servers', dest='rethinkdb_servers', help=( - 'rethinkdb servers, used for dedup and stats if specified; ' - 'e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')) - # arg_parser.add_argument( - # '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help=( - # 'rethinkdb database name (ignored unless --rethinkdb-servers ' - # 'is specified)')) + group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', + default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', + help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') + arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', + help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') + arg_parser.add_argument('--rethinkdb-big-table', + dest='rethinkdb_big_table', action='store_true', default=False, + help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument( - '--rethinkdb-big-table', dest='rethinkdb_big_table', - action='store_true', default=False, help=( - 'use a big rethinkdb table called "captures", instead of a ' - 'small table called "dedup"; table is suitable for use as ' - 'index for playback (ignored unless --rethinkdb-servers is ' - 'specified)')) + '--rethinkdb-big-table-name', dest='rethinkdb_big_table_name', + default='captures', help=argparse.SUPPRESS) + arg_parser.add_argument('--queue-size', dest='queue_size', type=int, + default=500, help=argparse.SUPPRESS) + arg_parser.add_argument('--max-threads', dest='max_threads', type=int, + help=argparse.SUPPRESS) + arg_parser.add_argument('--profile', action='store_true', default=False, + help=argparse.SUPPRESS) arg_parser.add_argument( - '--queue-size', dest='queue_size', type=int, default=1, help=( - 'max size of the queue of urls waiting to be processed by ' - 'the warc writer thread')) + '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', + default=None, help=( + 'host:port of tor socks proxy, used only to connect to ' + '.onion sites')) arg_parser.add_argument( - '--max-threads', dest='max_threads', type=int, help=( - 'number of proxy server threads (if not specified, chosen based ' - 'on system resource limits')) - arg_parser.add_argument( - '--version', action='version', - version='warcprox %s' % warcprox.__version__) - arg_parser.add_argument( - '-v', '--verbose', dest='verbose', action='store_true', - help='verbose logging') - arg_parser.add_argument( - '--trace', dest='trace', action='store_true', - help='trace-level logging') - arg_parser.add_argument( - '--profile', dest='profile', action='store_true', default=False, - help='profile the warc writer thread') + '--plugin', metavar='PLUGIN_CLASS', dest='plugins', + action='append', help=( + 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". ' + 'May be used multiple times to register multiple plugins. ' + 'Plugin classes are loaded from the regular python module ' + 'search path. They will be instantiated with no arguments and ' + 'must have a method `notify(self, recorded_url, records)` ' + 'which will be called for each url, after warc records have ' + 'been written.')) + arg_parser.add_argument('--version', action='version', + version="warcprox {}".format(warcprox.__version__)) + arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') + arg_parser.add_argument('--trace', dest='trace', action='store_true') + arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') arg_parser.add_argument( '--requests', dest='requests', type=int, default=200, help='number of urls to fetch') From 428203277298e2965dcd8ddb678334174040bc3d Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 23 Oct 2017 22:21:57 +0000 Subject: [PATCH 33/48] Drop unnecessary split for newline in CDX results --- warcprox/dedup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 08bbf23..46f3c40 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -223,9 +223,9 @@ class CdxServerDedup(object): else: dkey = digest_key.encode('utf-8') dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey - line = result.data.split(b'\n') + line = result.data.strip() if line: - (cdx_ts, cdx_digest) = line[0].split(b' ') + (cdx_ts, cdx_digest) = line.split(b' ') if cdx_digest == dkey: dt = datetime.strptime(cdx_ts.decode('ascii'), '%Y%m%d%H%M%S') From 6beb19dc16bb60fa228f271c9eb9de29db203c64 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 25 Oct 2017 20:28:56 +0000 Subject: [PATCH 34/48] Expand comment with limit=-1 explanation --- warcprox/dedup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 46f3c40..e70f5f9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -206,6 +206,10 @@ class CdxServerDedup(object): computed on the original content, after decoding Content-Encoding and Transfer-Encoding, if any), if they match, write a revisit record. + Get only the last item (limit=-1) because Wayback Machine has special + performance optimisation to handle that. limit < 0 is very inefficient + in general. Maybe it could be configurable in the future. + :param digest_key: b'sha1:' (prefix is optional). Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' :param url: Target URL string From 37cd9457e7cbf939faf256be3057b8d87260f5a0 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 26 Oct 2017 09:56:44 -0700 Subject: [PATCH 35/48] version 2.2 for pypi to address https://github.com/internetarchive/warcprox/issues/42 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 34d11b5..ed1e2ef 100755 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ except: setuptools.setup( name='warcprox', - version='2.2b1.dev106', + version='2.2', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 7e1633d9b4ee2d29c52f9013f3b19a7de82def77 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 26 Oct 2017 10:02:35 -0700 Subject: [PATCH 36/48] back to dev version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index ed1e2ef..f867054 100755 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ except: setuptools.setup( name='warcprox', - version='2.2', + version='2.2.1b2.dev107', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', From 70ed4790b8697db6b9ddee4e3385b5ff5933b4a3 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 26 Oct 2017 18:18:15 +0000 Subject: [PATCH 37/48] Fix missing dummy url param in bigtable lookup method --- warcprox/bigtable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..f3d897d 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -220,7 +220,7 @@ class RethinkCapturesDedup: self.captures_db = captures_db self.options = options - def lookup(self, digest_key, bucket="__unspecified__"): + def lookup(self, digest_key, bucket="__unspecified__", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key algo, value_str = k.split(":") if self.options.base32: From c9f1feb3dbd53ecc5562de2ed651c5df8505ea83 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 26 Oct 2017 19:44:22 +0000 Subject: [PATCH 38/48] Add hidden --no-warc-open-suffix CLI option By default warcprox adds `.open` suffix in open WARC files. Using this option we disable that. The option does not appear on the program help. --- warcprox/main.py | 2 ++ warcprox/writer.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/warcprox/main.py b/warcprox/main.py index 76e194a..d5a6e3f 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -78,6 +78,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): default='./warcs', help='where to write warcs') arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', help='write gzip-compressed warc records') + arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix', + default=False, action='store_true', help=argparse.SUPPRESS) arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument( diff --git a/warcprox/writer.py b/warcprox/writer.py index cf8d72d..419fd77 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -53,6 +53,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None + self._f_finalname_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -91,7 +92,7 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + '.open']) + self.directory, self._f_finalname + self._f_finalname_suffix]) self._f = open(self._fpath, 'wb') From 975f2479a8caa5f5a8e0e0328b56c956b6563bbb Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 26 Oct 2017 21:58:31 +0000 Subject: [PATCH 39/48] Acquire and exclusive file lock when not using .open WARC suffix --- warcprox/writer.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/warcprox/writer.py b/warcprox/writer.py index 419fd77..7e7ff11 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -24,6 +24,7 @@ from __future__ import absolute_import import logging from datetime import datetime from hanzo import warctools +import fcntl import time import warcprox import os @@ -71,6 +72,8 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) + if self._f_finalname_suffix == '': + fcntl.flock(self._f, fcntl.LOCK_UN) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -95,6 +98,10 @@ class WarcWriter: self.directory, self._f_finalname + self._f_finalname_suffix]) self._f = open(self._fpath, 'wb') + # if no '.open' suffix is used for WARC, acquire an exclusive + # file lock. + if self._f_finalname_suffix == '': + fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname) From 5871a1bae267eb0fb2f4a2a4492e78840c7d8283 Mon Sep 17 00:00:00 2001 From: vbanos Date: Fri, 27 Oct 2017 16:22:16 +0300 Subject: [PATCH 40/48] Rename writer var and add exception handling Rename ``self._f_finalname_suffix`` to ``self._f_open_suffix``. Add exception handling for file locking operations. --- warcprox/writer.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/warcprox/writer.py b/warcprox/writer.py index 7e7ff11..a3e24c6 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -54,7 +54,7 @@ class WarcWriter: self._f = None self._fpath = None self._f_finalname = None - self._f_finalname_suffix = '' if options.no_warc_open_suffix else '.open' + self._f_open_suffix = '' if options.no_warc_open_suffix else '.open' self._serial = 0 self._lock = threading.RLock() @@ -72,8 +72,12 @@ class WarcWriter: with self._lock: if self._fpath: self.logger.info('closing %s', self._f_finalname) - if self._f_finalname_suffix == '': - fcntl.flock(self._f, fcntl.LOCK_UN) + if self._f_open_suffix == '': + try: + fcntl.flock(self._f, fcntl.LOCK_UN) + except IOError as exc: + self.logger.error('could not unlock file %s (%s)', + self._fpath, exc) self._f.close() finalpath = os.path.sep.join( [self.directory, self._f_finalname]) @@ -95,13 +99,17 @@ class WarcWriter: self.prefix, self.timestamp17(), self._serial, self._randomtoken, '.gz' if self.gzip else '') self._fpath = os.path.sep.join([ - self.directory, self._f_finalname + self._f_finalname_suffix]) + self.directory, self._f_finalname + self._f_open_suffix]) self._f = open(self._fpath, 'wb') # if no '.open' suffix is used for WARC, acquire an exclusive # file lock. - if self._f_finalname_suffix == '': - fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + if self._f_open_suffix == '': + try: + fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.logger.error('could not lock file %s (%s)', + self._fpath, exc) warcinfo_record = self.record_builder.build_warcinfo_record( self._f_finalname) From 3132856912c2d734b387da5fc42c51533eb6fcc9 Mon Sep 17 00:00:00 2001 From: vbanos Date: Sat, 28 Oct 2017 14:36:16 +0300 Subject: [PATCH 41/48] Test WarcWriter file locking when no_warc_open_suffix=True Add unit test for ``WarcWriter`` which open a different process and tries to lock the WARC file created by ``WarcWriter`` to check that locking works. --- tests/test_writer.py | 59 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/test_writer.py diff --git a/tests/test_writer.py b/tests/test_writer.py new file mode 100644 index 0000000..fa85616 --- /dev/null +++ b/tests/test_writer.py @@ -0,0 +1,59 @@ +import os +import fcntl +from multiprocessing import Process, Queue +from datetime import datetime +import pytest +from warcprox.mitmproxy import ProxyingRecorder +from warcprox.warcproxy import RecordedUrl +from warcprox.writer import WarcWriter +from warcprox import Options + +recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') + +recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', + status=200, client_ip='5.5.5.5', + request_data=b'abc', + response_recorder=recorder, + remote_ip='6.6.6.6', + timestamp=datetime.utcnow()) + + +def lock_file(queue, filename): + """Try to lock file and return 1 if successful, else return 0. + It is necessary to run this method in a different process to test locking. + """ + try: + fi = open(filename, 'ab') + fcntl.flock(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fi.close() + queue.put('1') + except IOError: + queue.put('0') + + +@pytest.mark.parametrize("no_warc_open_suffix,lock_result", [ + (True, '0'), + (False, '1')]) +def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): + """Test if WarcWriter is locking WARC files. + When we don't have the .open suffix, WarcWriter locks the file and the + external process trying to ``lock_file`` fails (result=0). + """ + dirname = os.path.dirname(tmpdir.mkdir('test-warc-writer')) + wwriter = WarcWriter(Options(directory=dirname, + no_warc_open_suffix=no_warc_open_suffix)) + wwriter.write_records(recorded_url) + + if no_warc_open_suffix: + suffix = '.warc' + else: + suffix = '.warc.open' + warcs = [fn for fn in os.listdir(dirname) if fn.endswith(suffix)] + assert warcs + target_warc = os.path.join(dirname, warcs[0]) + # launch another process and try to lock WARC file + queue = Queue() + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == lock_result From eda3da1db7b5fac263ff48d7e8eba1a7fefcf3fd Mon Sep 17 00:00:00 2001 From: vbanos Date: Sat, 28 Oct 2017 15:32:04 +0300 Subject: [PATCH 42/48] Unit test fix for Python2 compatibility --- tests/test_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index fa85616..8aedc7d 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -39,7 +39,7 @@ def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): When we don't have the .open suffix, WarcWriter locks the file and the external process trying to ``lock_file`` fails (result=0). """ - dirname = os.path.dirname(tmpdir.mkdir('test-warc-writer')) + dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=no_warc_open_suffix)) wwriter.write_records(recorded_url) From 25c0accc3cd7820945b7f033304096e7b56b714a Mon Sep 17 00:00:00 2001 From: vbanos Date: Sat, 28 Oct 2017 21:13:23 +0300 Subject: [PATCH 43/48] Swap fcntl.flock with fcntl.lockf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Linux, `fcntl.flock` is implemented with `flock(2)`, and `fcntl.lockf` is implemented with `fcntl(2)` — they are not compatible. Java `lock()` appears to be `fcntl(2)`. So, other Java programs working with these files work correctly only with `fcntl.lockf`. `warcprox` MUST use `fcntl.lockf` --- tests/test_writer.py | 2 +- warcprox/writer.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 8aedc7d..444909f 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -24,7 +24,7 @@ def lock_file(queue, filename): """ try: fi = open(filename, 'ab') - fcntl.flock(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) + fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) fi.close() queue.put('1') except IOError: diff --git a/warcprox/writer.py b/warcprox/writer.py index a3e24c6..7a1032a 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -74,7 +74,7 @@ class WarcWriter: self.logger.info('closing %s', self._f_finalname) if self._f_open_suffix == '': try: - fcntl.flock(self._f, fcntl.LOCK_UN) + fcntl.lockf(self._f, fcntl.LOCK_UN) except IOError as exc: self.logger.error('could not unlock file %s (%s)', self._fpath, exc) @@ -106,7 +106,7 @@ class WarcWriter: # file lock. if self._f_open_suffix == '': try: - fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) + fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError as exc: self.logger.error('could not lock file %s (%s)', self._fpath, exc) From 3d9a22b6c7855f60dea5f2f772432d548a6e6fd4 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sun, 29 Oct 2017 18:48:08 +0000 Subject: [PATCH 44/48] Return capture timestamp When client request has HTTP header ``Warcprox-Meta": {"return-capture-timestamp": 1}``, add to the response the WARC record timestamp in the following HTTP header: ``Warcprox-Meta: {"capture-timestamp": '%Y-%m-%d %H:%M:%S"}``. Add unit test. --- tests/test_warcprox.py | 15 +++++++++++++++ warcprox/mitmproxy.py | 14 +++++++++++--- warcprox/warcproxy.py | 7 ++++++- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index b24a5c8..22d4597 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -555,6 +555,21 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n" +def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): + url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) + request_meta = {"return-capture-timestamp": 1} + headers = {"Warcprox-Meta": json.dumps(request_meta)} + response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) + assert response.status_code == 200 + assert response.headers['Warcprox-Meta'] + data = json.loads(response.headers['Warcprox-Meta']) + assert data['capture-timestamp'] + try: + dt = datetime.datetime.strptime(data['capture-timestamp'], '%Y-%m-%d %H:%M:%S') + assert dt + except ValueError: + pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp']) + def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index 914fb52..e60c07e 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -45,6 +45,7 @@ try: http_client._MAXLINE = 4194304 # 4 MiB except ImportError: import httplib as http_client +import json import socket import logging import ssl @@ -163,13 +164,17 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp, proxy_client, digest_algorithm, url=url) self.fp = self.recorder - def begin(self): + def begin(self, timestamp=None): http_client.HTTPResponse.begin(self) # reads status line, headers status_and_headers = 'HTTP/1.1 {} {}\r\n'.format( self.status, self.reason) self.msg['Via'] = via_header_value( self.msg.get('Via'), '%0.1f' % (self.version / 10.0)) + if timestamp: + rmeta = {"capture-timestamp": timestamp.strftime('%Y-%m-%d %H:%M:%S')} + self.msg['Warcprox-Meta'] = json.dumps(rmeta, separators=',:') + for k,v in self.msg.items(): if k.lower() not in ( 'connection', 'proxy-connection', 'keep-alive', @@ -361,12 +366,15 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error("exception proxying request", exc_info=True) raise - def _proxy_request(self): + def _proxy_request(self, timestamp=None): ''' Sends the request to the remote server, then uses a ProxyingRecorder to read the response and send it to the proxy client, while recording the bytes in transit. Returns a tuple (request, response) where request is the raw request bytes, and response is a ProxyingRecorder. + + :param timestamp: generated on warcprox._proxy_request. It is the + timestamp written in the WARC record for this request. ''' # Build request req_str = '{} {} {}\r\n'.format( @@ -407,7 +415,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock, proxy_client=self.connection, digest_algorithm=self.server.digest_algorithm, url=self.url, method=self.command) - prox_rec_res.begin() + prox_rec_res.begin(timestamp=timestamp) buf = prox_rec_res.read(8192) while buf != b'': diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 06983ed..48dc5cd 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -180,8 +180,13 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): remote_ip = self._remote_server_sock.getpeername()[0] timestamp = datetime.datetime.utcnow() + if warcprox_meta and 'return-capture-timestamp' in warcprox_meta: + return_timestamp = timestamp + else: + return_timestamp = None + req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request( - self) + self, timestamp=return_timestamp) content_type = None try: From 56f0118374495f397de1f40b96b426a5c9789d44 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Tue, 31 Oct 2017 10:49:10 +0000 Subject: [PATCH 45/48] Replace timestamp parameter with more generic request/response syntax Replace timestamp parameter with more generic extra_response_headers={} When request has --header ``Warcprox-Meta: {\"accept\":[\"capture-metadata\"]}"`` Response has the following header: ``Warcprox-Meta: {"capture-metadata":{"timestamp":"2017-10-31T10:47:50Z"}}`` Update unit test --- tests/test_warcprox.py | 7 ++++--- warcprox/mitmproxy.py | 10 +++++----- warcprox/warcproxy.py | 11 +++++------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py index 22d4597..1752b94 100755 --- a/tests/test_warcprox.py +++ b/tests/test_warcprox.py @@ -557,15 +557,16 @@ def test_limits(http_daemon, warcprox_, archiving_proxies): def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies): url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) - request_meta = {"return-capture-timestamp": 1} + request_meta = {"accept": ["capture-metadata"]} headers = {"Warcprox-Meta": json.dumps(request_meta)} response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 200 assert response.headers['Warcprox-Meta'] data = json.loads(response.headers['Warcprox-Meta']) - assert data['capture-timestamp'] + assert data['capture-metadata'] try: - dt = datetime.datetime.strptime(data['capture-timestamp'], '%Y-%m-%d %H:%M:%S') + dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'], + '%Y-%m-%dT%H:%M:%SZ') assert dt except ValueError: pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp']) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index e60c07e..e2cc321 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -164,15 +164,15 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.fp, proxy_client, digest_algorithm, url=url) self.fp = self.recorder - def begin(self, timestamp=None): + def begin(self, extra_response_headers={}): http_client.HTTPResponse.begin(self) # reads status line, headers status_and_headers = 'HTTP/1.1 {} {}\r\n'.format( self.status, self.reason) self.msg['Via'] = via_header_value( self.msg.get('Via'), '%0.1f' % (self.version / 10.0)) - if timestamp: - rmeta = {"capture-timestamp": timestamp.strftime('%Y-%m-%d %H:%M:%S')} + if extra_response_headers: + rmeta = {"capture-metadata": extra_response_headers} self.msg['Warcprox-Meta'] = json.dumps(rmeta, separators=',:') for k,v in self.msg.items(): @@ -366,7 +366,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self.logger.error("exception proxying request", exc_info=True) raise - def _proxy_request(self, timestamp=None): + def _proxy_request(self, extra_response_headers={}): ''' Sends the request to the remote server, then uses a ProxyingRecorder to read the response and send it to the proxy client, while recording the @@ -415,7 +415,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): self._remote_server_sock, proxy_client=self.connection, digest_algorithm=self.server.digest_algorithm, url=self.url, method=self.command) - prox_rec_res.begin(timestamp=timestamp) + prox_rec_res.begin(extra_response_headers=extra_response_headers) buf = prox_rec_res.read(8192) while buf != b'': diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 48dc5cd..ec613ab 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -179,14 +179,13 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): remote_ip = self._remote_server_sock.getpeername()[0] timestamp = datetime.datetime.utcnow() - - if warcprox_meta and 'return-capture-timestamp' in warcprox_meta: - return_timestamp = timestamp - else: - return_timestamp = None + extra_response_headers = {} + if warcprox_meta and 'accept' in warcprox_meta and \ + 'capture-metadata' in warcprox_meta['accept']: + extra_response_headers['timestamp'] = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request( - self, timestamp=return_timestamp) + self, extra_response_headers=extra_response_headers) content_type = None try: From c087cc7a2eb47e091f2ba42af6688d7fdb75bced Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 1 Nov 2017 17:50:46 +0000 Subject: [PATCH 46/48] Improve test_writer tests Check also that locking succeeds after the writer closes the WARC file. Remove parametrize from ``test_warc_writer_locking``, test only for the ``no_warc_open_suffix=True`` option. Change `1` to `OBTAINED LOCK` and `0` to `FAILED TO OBTAIN LOCK` in ``lock_file`` method. --- tests/test_writer.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/tests/test_writer.py b/tests/test_writer.py index 444909f..9ce0e13 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -11,10 +11,10 @@ from warcprox import Options recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com') recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain', - status=200, client_ip='5.5.5.5', + status=200, client_ip='127.0.0.2', request_data=b'abc', response_recorder=recorder, - remote_ip='6.6.6.6', + remote_ip='127.0.0.3', timestamp=datetime.utcnow()) @@ -26,29 +26,20 @@ def lock_file(queue, filename): fi = open(filename, 'ab') fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB) fi.close() - queue.put('1') + queue.put('OBTAINED LOCK') except IOError: - queue.put('0') + queue.put('FAILED TO OBTAIN LOCK') -@pytest.mark.parametrize("no_warc_open_suffix,lock_result", [ - (True, '0'), - (False, '1')]) -def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): +def test_warc_writer_locking(tmpdir): """Test if WarcWriter is locking WARC files. When we don't have the .open suffix, WarcWriter locks the file and the external process trying to ``lock_file`` fails (result=0). """ dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer'))) - wwriter = WarcWriter(Options(directory=dirname, - no_warc_open_suffix=no_warc_open_suffix)) + wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True)) wwriter.write_records(recorded_url) - - if no_warc_open_suffix: - suffix = '.warc' - else: - suffix = '.warc.open' - warcs = [fn for fn in os.listdir(dirname) if fn.endswith(suffix)] + warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')] assert warcs target_warc = os.path.join(dirname, warcs[0]) # launch another process and try to lock WARC file @@ -56,4 +47,11 @@ def test_warc_writer_locking(tmpdir, no_warc_open_suffix, lock_result): p = Process(target=lock_file, args=(queue, target_warc)) p.start() p.join() - assert queue.get() == lock_result + assert queue.get() == 'FAILED TO OBTAIN LOCK' + wwriter.close_writer() + + # locking must succeed after writer has closed the WARC file. + p = Process(target=lock_file, args=(queue, target_warc)) + p.start() + p.join() + assert queue.get() == 'OBTAINED LOCK' From ca3121102ef3e67ef33b0e0ad1d6424fcaa11b31 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 2 Nov 2017 08:24:28 +0000 Subject: [PATCH 47/48] Move Warcprox-Meta header construction to warcproxy --- warcprox/mitmproxy.py | 4 ++-- warcprox/warcproxy.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index e2cc321..f6ea742 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -172,8 +172,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse): self.msg['Via'] = via_header_value( self.msg.get('Via'), '%0.1f' % (self.version / 10.0)) if extra_response_headers: - rmeta = {"capture-metadata": extra_response_headers} - self.msg['Warcprox-Meta'] = json.dumps(rmeta, separators=',:') + for header, value in extra_response_headers.items(): + self.msg[header] = value for k,v in self.msg.items(): if k.lower() not in ( diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index ec613ab..d37e588 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -182,7 +182,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler): extra_response_headers = {} if warcprox_meta and 'accept' in warcprox_meta and \ 'capture-metadata' in warcprox_meta['accept']: - extra_response_headers['timestamp'] = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + rmeta = {'capture-metadata': {'timestamp': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')}} + extra_response_headers['Warcprox-Meta'] = json.dumps(rmeta, separators=',:') req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request( self, extra_response_headers=extra_response_headers) From d174e736be08dd075df80543d54f3f4a65bd3722 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 2 Nov 2017 19:43:45 +0000 Subject: [PATCH 48/48] Update docstring --- warcprox/mitmproxy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/warcprox/mitmproxy.py b/warcprox/mitmproxy.py index f6ea742..b14cddf 100644 --- a/warcprox/mitmproxy.py +++ b/warcprox/mitmproxy.py @@ -373,8 +373,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler): bytes in transit. Returns a tuple (request, response) where request is the raw request bytes, and response is a ProxyingRecorder. - :param timestamp: generated on warcprox._proxy_request. It is the - timestamp written in the WARC record for this request. + :param extra_response_headers: generated on warcprox._proxy_request. + It may contain extra HTTP headers such as ``Warcprox-Meta`` which + are written in the WARC record for this request. ''' # Build request req_str = '{} {} {}\r\n'.format(