From fc5f39ffed1b88bd1d5a809c13f779d9a2ead56b Mon Sep 17 00:00:00 2001 From: Vangelis Banos Date: Fri, 13 Oct 2017 17:44:07 +0000 Subject: [PATCH] Add CDX Server based deduplication Add ``--cdxserver-dedup URL`` option. Create ``warcprox.dedup.CdxServerDedup`` class. Add dummy unit test (TODO) --- README.rst | 4 +++ setup.py | 1 + tests/test_dedup.py | 10 ++++++ warcprox/dedup.py | 86 +++++++++++++++++++++++++++++++++++++++++++-- warcprox/main.py | 5 +++ 5 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 tests/test_dedup.py diff --git a/README.rst b/README.rst index b9c1c5f..8adcafa 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,7 @@ Usage [--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT] [--playback-index-db-file PLAYBACK_INDEX_DB_FILE] [-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS] + [--cdxserver-dedup CDX_SERVER_URL] [--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table] [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY] [--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q] @@ -100,6 +101,9 @@ Usage persistent deduplication database file; empty string or /dev/null disables deduplication (default: ./warcprox.sqlite) + --cdxserver-dedup CDX_SERVER_URL + use a CDX server for deduplication + (default: None) --rethinkdb-servers RETHINKDB_SERVERS rethinkdb servers, used for dedup and stats if specified; e.g. diff --git a/setup.py b/setup.py index 34d11b5..b9308e2 100755 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ deps = [ 'certauth==1.1.6', 'warctools', 'urlcanon>=0.1.dev16', + 'urllib3', 'doublethink>=0.2.0.dev81', 'PySocks', 'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..7836d27 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,10 @@ +import pytest +from warcprox.dedup import CdxServerDedup + + +def test_cdx(): + # TODO add mocking of CDX Server response + # TODO check found and not found cases + cdx_server = CdxServerDedup(cdx_url="https://web.archive.org/cdx/search/cdx") + res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A", + url="http://example.com") diff --git a/warcprox/dedup.py b/warcprox/dedup.py index fd1ada4..a3c89f7 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -21,12 +21,16 @@ USA. from __future__ import absolute_import +from datetime import datetime import logging import os import json from hanzo import warctools import warcprox import sqlite3 +import urllib3 + +urllib3.disable_warnings() class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -107,9 +111,16 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): and recorded_url.response_recorder.payload_size() > 0): digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: - recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + if isinstance(dedup_db, CdxServerDedup): + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"], + recorded_url) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) else: - recorded_url.dedup_info = dedup_db.lookup(digest_key) + if isinstance(dedup_db, CdxServerDedup): + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -174,3 +185,74 @@ class RethinkDedupDb: else: self.save(digest_key, records[0]) + +def _split_timestamp(timestamp): + """split `timestamp` into a tuple of 6 integers. + + :param timestamp: full-length timestamp. + :type timestamp: bytes + """ + return ( + int(timestamp[:-10]), + int(timestamp[-10:-8]), + int(timestamp[-8:-6]), + int(timestamp[-6:-4]), + int(timestamp[-4:-2]), + int(timestamp[-2:]) + ) + + +class CdxServerDedup(object): + """Query a CDX server to perform deduplication. + """ + logger = logging.getLogger("warcprox.dedup.CdxServerDedup") + + def __init__(self, cdx_url="https://web.archive.org/cdx/search/cdx", + options=warcprox.Options()): + self.http_pool = urllib3.PoolManager() + self.cdx_url = cdx_url + self.options = options + + def start(self): + pass + + def save(self, digest_key, response_record, bucket=""): + """Does not apply to CDX server, as it is obviously read-only. + """ + pass + + def lookup(self, digest_key, recorded_url): + """Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be + computed on the original content, after decoding Content-Encoding and + Transfer-Encoding, if any), if they match, write a revisit record. + + :param digest_key: b'sha1:'. + Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A' + :param recorded_url: RecordedUrl object + Result must contain: + {"url", "date": "%Y-%m-%dT%H:%M:%SZ", "id": "warc_id" if available} + """ + url = recorded_url.url + u = url.decode("utf-8") if isinstance(url, bytes) else url + try: + result = self.http_pool.request('GET', self.cdx_url, fields=dict( + url=u, fl="timestamp,digest", limit=-1)) + except urllib3.HTTPError as exc: + self.logger.error('CdxServerDedup request failed for url=%s %s', + url, exc) + if result.status == 200: + digest_key = digest_key[5:] # drop sha1: prefix + for line in result.data.split(b'\n'): + if line: + (cdx_ts, cdx_digest) = line.split(b' ') + if cdx_digest == digest_key: + dt = datetime(*_split_timestamp(cdx_ts.decode('ascii'))) + # TODO find out id + return dict(id=url, url=url, + date=dt.strftime('%Y-%m-%dT%H:%M:%SZ')) + return None + + def notify(self, recorded_url, records): + """Since we don't save anything to CDX server, this does not apply. + """ + pass diff --git a/warcprox/main.py b/warcprox/main.py index 7b7314b..2d0414b 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -106,6 +106,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): group = arg_parser.add_mutually_exclusive_group() group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file', default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication') + group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup', + help='use a CDX Server for deduplication') group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox', @@ -189,6 +191,9 @@ def init_controller(args): else: dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options) listeners.append(dedup_db) + elif args.cdxserver_dedup: + dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup) + listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None