From fc5f39ffed1b88bd1d5a809c13f779d9a2ead56b Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 17:44:07 +0000 Subject: [PATCH 01/12] Add CDX Server based deduplication Add ``--cdxserver-dedup URL`` option. Create ``warcprox.dedup.CdxServerDedup`` class. Add dummy unit test (TODO) --- README.rst | 4 +++ setup.py | 1 + tests/test_dedup.py | 10 ++++++ warcprox/dedup.py | 86 +++++++++++++++++++++++++++++++++++++++++++-- warcprox/main.py | 5 +++ 5 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 tests/test_dedup.py diff --git a/README.rst b/README.rst index b9c1c5f..8adcafa 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,7 @@ Usage [--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] + [--cdxserver-dedup CDX_SERVER_URL] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] @@ -100,6 +101,9 @@ Usage persistent deduplication database file; empty string or /dev/null disables deduplication (default: ./warcprox.sqlite) + --cdxserver-dedup CDX_SERVER_URL + use a CDX server for deduplication + (default: None) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. diff --git a/setup.py b/setup.py index 34d11b5..b9308e2 100755 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', + 'urllib3', 'doublethink>=0.2.0.dev81', 'PySocks', 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..7836d27 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,10 @@ +import pytest +from warcprox.dedup import CdxServerDedup + + +def test_cdx(): + # TODO add mocking of CDX Server response + # TODO check found and not found cases + cdx_server = CdxServerDedup(cdx_url="https://web.archive.org/cdx/search/cdx") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url="http://example.com") diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..a3c89f7 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -21,12 +21,16 @@ USA. from __future__ import absolute_import +from datetime import datetime import logging import os import json from hanzo import warctools import warcprox import sqlite3 +import urllib3 + +urllib3.disable_warnings() class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -107,9 +111,16 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + if isinstance(dedup_db, CdxServerDedup): + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + if isinstance(dedup_db, CdxServerDedup): + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -174,3 +185,74 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) + +def _split_timestamp(timestamp): + """split `timestamp` into a tuple of 6 integers. + + :param timestamp: full-length timestamp. + :type timestamp: bytes + """ + return ( + int(timestamp[:-10]), + int(timestamp[-10:-8]), + int(timestamp[-8:-6]), + int(timestamp[-6:-4]), + int(timestamp[-4:-2]), + int(timestamp[-2:]) + ) + + +class CdxServerDedup(object): + """Query a CDX server to perform deduplication. + """ + logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + + def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", + options=warcprox.Options()): + self.http_pool = urllib3.PoolManager() + self.cdx_url = cdx_url + self.options = options + + def start(self): + pass + + def save(self, digest_key, response_record, bucket=""): + """Does not apply to CDX server, as it is obviously read-only. + """ + pass + + def lookup(self, digest_key, recorded_url): + """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be + computed on the original content, after decoding Content-Encoding and + Transfer-Encoding, if any), if they match, write a revisit record. + + :param digest_key: b'sha1:'. + Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + :param recorded_url: RecordedUrl object + Result must contain: + {"url", "date": "%Y-%m-%dT%H:%M:%SZ", "id": "warc_id" if available} + """ + url = recorded_url.url + u = url.decode("utf-8") if isinstance(url, bytes) else url + try: + result = self.http_pool.request('GET', self.cdx_url, fields=dict( + url=u, fl="timestamp,digest", limit=-1)) + except urllib3.HTTPError as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) + if result.status == 200: + digest_key = digest_key[5:] # drop sha1: prefix + for line in result.data.split(b'\n'): + if line: + (cdx_ts, cdx_digest) = line.split(b' ') + if cdx_digest == digest_key: + dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) + # TODO find out id + return dict(id=url, url=url, + date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + return None + + def notify(self, recorded_url, records): + """Since we don't save anything to CDX server, this does not apply. + """ + pass diff --git a/warcprox/main.py b/warcprox/main.py index 7b7314b..2d0414b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -106,6 +106,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', + help='use a CDX Server for deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', @@ -189,6 +191,9 @@ def init_controller(args): else: dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) listeners.append(dedup_db) + elif args.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) + listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None From 960dda4c319816cf9733367255e313e67c512e45 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:11:22 +0000 Subject: [PATCH 02/12] Add CdxServerDedup unit tests and improve its exception handling Add multiple ``CdxServerDedup`` unit tests to simulate found, not found and invalid responses from the CDX server. Use a different file ``tests/test_dedup.py`` because we test the CdxServerDedup component individually and it belongs to the ``warcprox.dedup`` package. Add ``mock`` package to dev requirements. Rework the warcprox.dedup.CdxServerDedup class to have better exception handling. --- setup.py | 2 +- tests/test_dedup.py | 54 ++++++++++++++++++++++++++++++++++++++++----- warcprox/dedup.py | 26 +++++++++++++--------- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/setup.py b/setup.py index b9308e2..228ece7 100755 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 7836d27..e1b7482 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -1,10 +1,52 @@ +import mock import pytest from warcprox.dedup import CdxServerDedup -def test_cdx(): - # TODO add mocking of CDX Server response - # TODO check found and not found cases - cdx_server = CdxServerDedup(cdx_url="https://web.archive.org/cdx/search/cdx") - res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - url="http://example.com") +def test_cdx_dedup(): + # Mock CDX Server responses to simulate found, not found and errors. + with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: + recorded_url = mock.Mock(); + recorded_url.url = "http://example.com" + # not found case + result = mock.Mock() + result.status = 200 + result.data = b'20170101020405 test' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res is None + + # found in the 2nd CDX line + result = mock.Mock() + result.status = 200 + result.data = b"""\ +20170101020304 xxx +20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A +20160505050505 yyyyyyyyyyyyyyyyyyyyyy""" + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res["url"] == "http://example.com" + assert res["date"] == "2017-02-03T04:05:03Z" + + # invalid CDX result status code + result = mock.Mock() + result.status = 400 + result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res is None + # invalid CDX result content + result = mock.Mock() + result.status = 200 + result.data = b'InvalidExceptionResult' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + recorded_url=recorded_url) + assert res is None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index a3c89f7..8aa9c16 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -29,6 +29,7 @@ from hanzo import warctools import warcprox import sqlite3 import urllib3 +from urllib3.exceptions import HTTPError urllib3.disable_warnings() @@ -206,10 +207,10 @@ class CdxServerDedup(object): """Query a CDX server to perform deduplication. """ logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + http_pool = urllib3.PoolManager() def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", options=warcprox.Options()): - self.http_pool = urllib3.PoolManager() self.cdx_url = cdx_url self.options = options @@ -226,30 +227,33 @@ class CdxServerDedup(object): computed on the original content, after decoding Content-Encoding and Transfer-Encoding, if any), if they match, write a revisit record. - :param digest_key: b'sha1:'. + :param digest_key: b'sha1:' (prefix is optional). Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' :param recorded_url: RecordedUrl object Result must contain: - {"url", "date": "%Y-%m-%dT%H:%M:%SZ", "id": "warc_id" if available} + {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} """ url = recorded_url.url u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", limit=-1)) - except urllib3.HTTPError as exc: - self.logger.error('CdxServerDedup request failed for url=%s %s', - url, exc) - if result.status == 200: - digest_key = digest_key[5:] # drop sha1: prefix + assert result.status == 200 + if isinstance(digest_key, bytes): + dkey = digest_key + else: + dkey = digest_key.encode('utf-8') + dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey for line in result.data.split(b'\n'): if line: (cdx_ts, cdx_digest) = line.split(b' ') - if cdx_digest == digest_key: + if cdx_digest == dkey: dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) - # TODO find out id - return dict(id=url, url=url, + return dict(url=url, date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + except (HTTPError, AssertionError, ValueError) as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) return None def notify(self, recorded_url, records): From 59e995ccdf05bcdff2094d6a223130dbf7bf5811 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:22:14 +0000 Subject: [PATCH 03/12] Add mock pkg to run-tests.sh --- tests/run-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 334cfc2..80db2f8 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -40,7 +40,7 @@ do && (cd /warcprox && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ - && pip --log-file /tmp/pip.log install . pytest requests warcio \ + && pip --log-file /tmp/pip.log install . pytest mock requests warcio \ && py.test -v tests \ && py.test -v --rethinkdb-servers=localhost tests \ && py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" From a0821575b4c16673c4b4e1c831c549eba0f8fff8 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:54:34 +0000 Subject: [PATCH 04/12] Fix bug with dedup_info date encoding --- warcprox/dedup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 8aa9c16..6258860 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -209,7 +209,7 @@ class CdxServerDedup(object): logger = logging.getLogger("warcprox.dedup.CdxServerDedup") http_pool = urllib3.PoolManager() - def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", + def __init__(self, cdx_url="https://web.archive.org/cdx/search", options=warcprox.Options()): self.cdx_url = cdx_url self.options = options @@ -237,7 +237,7 @@ class CdxServerDedup(object): u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( - url=u, fl="timestamp,digest", limit=-1)) + url=u, fl="timestamp,digest", limit=-10)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key @@ -249,8 +249,8 @@ class CdxServerDedup(object): (cdx_ts, cdx_digest) = line.split(b' ') if cdx_digest == dkey: dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) - return dict(url=url, - date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) except (HTTPError, AssertionError, ValueError) as exc: self.logger.error('CdxServerDedup request failed for url=%s %s', url, exc) From bc3d0cb4f6680c73b93cb0221c7842839505f0d9 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 19 Oct 2017 22:57:33 +0000 Subject: [PATCH 05/12] Fix minor CdxServerDedup unit test --- tests/test_dedup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index e1b7482..5a0ca3b 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -29,8 +29,7 @@ def test_cdx_dedup(): cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", recorded_url=recorded_url) - assert res["url"] == "http://example.com" - assert res["date"] == "2017-02-03T04:05:03Z" + assert res["date"] == b"2017-02-03T04:05:03Z" # invalid CDX result status code result = mock.Mock() From 202d664f3906716f15b52833a43a0e0c5eae9226 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 20 Oct 2017 20:00:02 +0000 Subject: [PATCH 06/12] Improve CdxServerDedup implementation Replace ``_split_timestamp`` with ``datetime.strptime`` in ``warcprox.dedup``. Remove ``isinstance()`` and add optional ``record_url`` in the rest of the dedup ``lookup`` methods. Make `--cdxserver-dedup` option help more explanatory. --- warcprox/dedup.py | 35 +++++++---------------------------- warcprox/main.py | 2 +- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 6258860..41b9249 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -77,7 +77,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", recorded_url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -112,16 +112,10 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - if isinstance(dedup_db, CdxServerDedup): - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url) - else: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url) else: - if isinstance(dedup_db, CdxServerDedup): - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url) - else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url=recorded_url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -166,7 +160,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", recorded_url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -187,22 +181,6 @@ class RethinkDedupDb: self.save(digest_key, records[0]) -def _split_timestamp(timestamp): - """split `timestamp` into a tuple of 6 integers. - - :param timestamp: full-length timestamp. - :type timestamp: bytes - """ - return ( - int(timestamp[:-10]), - int(timestamp[-10:-8]), - int(timestamp[-8:-6]), - int(timestamp[-6:-4]), - int(timestamp[-4:-2]), - int(timestamp[-2:]) - ) - - class CdxServerDedup(object): """Query a CDX server to perform deduplication. """ @@ -248,7 +226,8 @@ class CdxServerDedup(object): if line: (cdx_ts, cdx_digest) = line.split(b' ') if cdx_digest == dkey: - dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) + dt = datetime.strptime(cdx_ts.decode('ascii'), + '%Y%m%d%H%M%S') date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') return dict(url=url, date=date) except (HTTPError, AssertionError, ValueError) as exc: diff --git a/warcprox/main.py b/warcprox/main.py index 2d0414b..76e194a 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -107,7 +107,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', - help='use a CDX Server for deduplication') + help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', From f77aef91108c4398d56fb13aee885c236901e635 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 20 Oct 2017 21:59:43 +0000 Subject: [PATCH 07/12] Filter out warc/revisit records in CdxServerDedup --- warcprox/dedup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 41b9249..53b27c9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -215,7 +215,8 @@ class CdxServerDedup(object): u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( - url=u, fl="timestamp,digest", limit=-10)) + url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", + limit=-10)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key From 4fb44a7e9d4b1579b4b97d0804ca922cf219eead Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sat, 21 Oct 2017 20:24:28 +0000 Subject: [PATCH 08/12] Pass url instead of recorded_url obj to dedup lookup methods --- tests/test_dedup.py | 12 +++++------- warcprox/dedup.py | 14 +++++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 5a0ca3b..591337e 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -1,13 +1,11 @@ import mock -import pytest from warcprox.dedup import CdxServerDedup def test_cdx_dedup(): # Mock CDX Server responses to simulate found, not found and errors. with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: - recorded_url = mock.Mock(); - recorded_url.url = "http://example.com" + url = "http://example.com" # not found case result = mock.Mock() result.status = 200 @@ -15,7 +13,7 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res is None # found in the 2nd CDX line @@ -28,7 +26,7 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res["date"] == b"2017-02-03T04:05:03Z" # invalid CDX result status code @@ -38,7 +36,7 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res is None # invalid CDX result content result = mock.Mock() @@ -47,5 +45,5 @@ def test_cdx_dedup(): request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", - recorded_url=recorded_url) + url=url) assert res is None diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 53b27c9..1513946 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -77,7 +77,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket="", recorded_url=None): + def lookup(self, digest_key, bucket="", url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -113,9 +113,10 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], - recorded_url) + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url=recorded_url) + recorded_url.dedup_info = dedup_db.lookup(digest_key, + url=recorded_url.url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -160,7 +161,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket="", recorded_url=None): + def lookup(self, digest_key, bucket="", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -200,18 +201,17 @@ class CdxServerDedup(object): """ pass - def lookup(self, digest_key, recorded_url): + def lookup(self, digest_key, url): """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be computed on the original content, after decoding Content-Encoding and Transfer-Encoding, if any), if they match, write a revisit record. :param digest_key: b'sha1:' (prefix is optional). Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' - :param recorded_url: RecordedUrl object + :param url: Target URL string Result must contain: {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} """ - url = recorded_url.url u = url.decode("utf-8") if isinstance(url, bytes) else url try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( From f6b1d6f40879642c754a20be5504574667e7bf06 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Sat, 21 Oct 2017 20:45:46 +0000 Subject: [PATCH 09/12] Update CdxServerDedup lookup algorithm Get only one item from CDX (``limit=-1``). Update unit tests --- tests/test_dedup.py | 7 ++----- warcprox/dedup.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/test_dedup.py b/tests/test_dedup.py index 591337e..124efb5 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -16,13 +16,10 @@ def test_cdx_dedup(): url=url) assert res is None - # found in the 2nd CDX line + # found case result = mock.Mock() result.status = 200 - result.data = b"""\ -20170101020304 xxx -20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A -20160505050505 yyyyyyyyyyyyyyyyyyyyyy""" + result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' request.return_value = result cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 1513946..08bbf23 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -216,21 +216,21 @@ class CdxServerDedup(object): try: result = self.http_pool.request('GET', self.cdx_url, fields=dict( url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", - limit=-10)) + limit=-1)) assert result.status == 200 if isinstance(digest_key, bytes): dkey = digest_key else: dkey = digest_key.encode('utf-8') dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey - for line in result.data.split(b'\n'): - if line: - (cdx_ts, cdx_digest) = line.split(b' ') - if cdx_digest == dkey: - dt = datetime.strptime(cdx_ts.decode('ascii'), - '%Y%m%d%H%M%S') - date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') - return dict(url=url, date=date) + line = result.data.split(b'\n') + if line: + (cdx_ts, cdx_digest) = line[0].split(b' ') + if cdx_digest == dkey: + dt = datetime.strptime(cdx_ts.decode('ascii'), + '%Y%m%d%H%M%S') + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) except (HTTPError, AssertionError, ValueError) as exc: self.logger.error('CdxServerDedup request failed for url=%s %s', url, exc) From 428203277298e2965dcd8ddb678334174040bc3d Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Mon, 23 Oct 2017 22:21:57 +0000 Subject: [PATCH 10/12] Drop unnecessary split for newline in CDX results --- warcprox/dedup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 08bbf23..46f3c40 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -223,9 +223,9 @@ class CdxServerDedup(object): else: dkey = digest_key.encode('utf-8') dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey - line = result.data.split(b'\n') + line = result.data.strip() if line: - (cdx_ts, cdx_digest) = line[0].split(b' ') + (cdx_ts, cdx_digest) = line.split(b' ') if cdx_digest == dkey: dt = datetime.strptime(cdx_ts.decode('ascii'), '%Y%m%d%H%M%S') From 6beb19dc16bb60fa228f271c9eb9de29db203c64 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Wed, 25 Oct 2017 20:28:56 +0000 Subject: [PATCH 11/12] Expand comment with limit=-1 explanation --- warcprox/dedup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 46f3c40..e70f5f9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -206,6 +206,10 @@ class CdxServerDedup(object): computed on the original content, after decoding Content-Encoding and Transfer-Encoding, if any), if they match, write a revisit record. + Get only the last item (limit=-1) because Wayback Machine has special + performance optimisation to handle that. limit < 0 is very inefficient + in general. Maybe it could be configurable in the future. + :param digest_key: b'sha1:' (prefix is optional). Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' :param url: Target URL string From 70ed4790b8697db6b9ddee4e3385b5ff5933b4a3 Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Thu, 26 Oct 2017 18:18:15 +0000 Subject: [PATCH 12/12] Fix missing dummy url param in bigtable lookup method --- warcprox/bigtable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..f3d897d 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -220,7 +220,7 @@ class RethinkCapturesDedup: self.captures_db = captures_db self.options = options - def lookup(self, digest_key, bucket="__unspecified__"): + def lookup(self, digest_key, bucket="__unspecified__", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key algo, value_str = k.split(":") if self.options.base32: