diff --git a/README.rst b/README.rst index b9c1c5f..8adcafa 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,7 @@ Usage [--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] + [--cdxserver-dedup CDX_SERVER_URL] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] @@ -100,6 +101,9 @@ Usage persistent deduplication database file; empty string or /dev/null disables deduplication (default: ./warcprox.sqlite) + --cdxserver-dedup CDX_SERVER_URL + use a CDX server for deduplication + (default: None) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. diff --git a/setup.py b/setup.py index f867054..3de51f7 100755 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', + 'urllib3', 'doublethink>=0.2.0.dev81', 'PySocks', 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu @@ -59,7 +60,7 @@ setuptools.setup( license='GPL', packages=['warcprox'], install_requires=deps, - tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 + tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', entry_points={ diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 334cfc2..80db2f8 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -40,7 +40,7 @@ do && (cd /warcprox && git diff HEAD) | patch -p1 \ && virtualenv -p $python /tmp/venv \ && source /tmp/venv/bin/activate \ - && pip --log-file /tmp/pip.log install . pytest requests warcio \ + && pip --log-file /tmp/pip.log install . pytest mock requests warcio \ && py.test -v tests \ && py.test -v --rethinkdb-servers=localhost tests \ && py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests" diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..124efb5 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,46 @@ +import mock +from warcprox.dedup import CdxServerDedup + + +def test_cdx_dedup(): + # Mock CDX Server responses to simulate found, not found and errors. + with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request: + url = "http://example.com" + # not found case + result = mock.Mock() + result.status = 200 + result.data = b'20170101020405 test' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + + # found case + result = mock.Mock() + result.status = 200 + result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res["date"] == b"2017-02-03T04:05:03Z" + + # invalid CDX result status code + result = mock.Mock() + result.status = 400 + result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None + # invalid CDX result content + result = mock.Mock() + result.status = 200 + result.data = b'InvalidExceptionResult' + request.return_value = result + cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url=url) + assert res is None diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 387d05c..f3d897d 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -220,7 +220,7 @@ class RethinkCapturesDedup: self.captures_db = captures_db self.options = options - def lookup(self, digest_key, bucket="__unspecified__"): + def lookup(self, digest_key, bucket="__unspecified__", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key algo, value_str = k.split(":") if self.options.base32: diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..e70f5f9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -21,12 +21,17 @@ USA. from __future__ import absolute_import +from datetime import datetime import logging import os import json from hanzo import warctools import warcprox import sqlite3 +import urllib3 +from urllib3.exceptions import HTTPError + +urllib3.disable_warnings() class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -72,7 +77,7 @@ class DedupDb(object): conn.close() self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", url=None): result = None key = digest_key.decode('utf-8') + '|' + bucket conn = sqlite3.connect(self.file) @@ -107,9 +112,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url.url) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + recorded_url.dedup_info = dedup_db.lookup(digest_key, + url=recorded_url.url) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -154,7 +161,7 @@ class RethinkDedupDb: raise Exception("unexpected result %s saving %s", result, record) self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, digest_key, bucket=""): + def lookup(self, digest_key, bucket="", url=None): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = "{}|{}".format(k, bucket) result = self.rr.table(self.table).get(k).run() @@ -174,3 +181,66 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) + +class CdxServerDedup(object): + """Query a CDX server to perform deduplication. + """ + logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + http_pool = urllib3.PoolManager() + + def __init__(self, cdx_url="https://web.archive.org/cdx/search", + options=warcprox.Options()): + self.cdx_url = cdx_url + self.options = options + + def start(self): + pass + + def save(self, digest_key, response_record, bucket=""): + """Does not apply to CDX server, as it is obviously read-only. + """ + pass + + def lookup(self, digest_key, url): + """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be + computed on the original content, after decoding Content-Encoding and + Transfer-Encoding, if any), if they match, write a revisit record. + + Get only the last item (limit=-1) because Wayback Machine has special + performance optimisation to handle that. limit < 0 is very inefficient + in general. Maybe it could be configurable in the future. + + :param digest_key: b'sha1:' (prefix is optional). + Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + :param url: Target URL string + Result must contain: + {"url": , "date": "%Y-%m-%dT%H:%M:%SZ"} + """ + u = url.decode("utf-8") if isinstance(url, bytes) else url + try: + result = self.http_pool.request('GET', self.cdx_url, fields=dict( + url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit", + limit=-1)) + assert result.status == 200 + if isinstance(digest_key, bytes): + dkey = digest_key + else: + dkey = digest_key.encode('utf-8') + dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey + line = result.data.strip() + if line: + (cdx_ts, cdx_digest) = line.split(b' ') + if cdx_digest == dkey: + dt = datetime.strptime(cdx_ts.decode('ascii'), + '%Y%m%d%H%M%S') + date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8') + return dict(url=url, date=date) + except (HTTPError, AssertionError, ValueError) as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) + return None + + def notify(self, recorded_url, records): + """Since we don't save anything to CDX server, this does not apply. + """ + pass diff --git a/warcprox/main.py b/warcprox/main.py index 7b7314b..76e194a 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -106,6 +106,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', + help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', @@ -189,6 +191,9 @@ def init_controller(args): else: dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) listeners.append(dedup_db) + elif args.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) + listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None