mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Merge pull request #41 from vbanos/cdx-dedup
Enable Deduplication using CDX server
This commit is contained in:
commit
8ead8182e1
@ -47,6 +47,7 @@ Usage
|
|||||||
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
|
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
|
||||||
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
|
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
|
||||||
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
|
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
|
||||||
|
[--cdxserver-dedup CDX_SERVER_URL]
|
||||||
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
|
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
|
||||||
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
|
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
|
||||||
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
|
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
|
||||||
@ -100,6 +101,9 @@ Usage
|
|||||||
persistent deduplication database file; empty
|
persistent deduplication database file; empty
|
||||||
string or /dev/null disables deduplication
|
string or /dev/null disables deduplication
|
||||||
(default: ./warcprox.sqlite)
|
(default: ./warcprox.sqlite)
|
||||||
|
--cdxserver-dedup CDX_SERVER_URL
|
||||||
|
use a CDX server for deduplication
|
||||||
|
(default: None)
|
||||||
--rethinkdb-servers RETHINKDB_SERVERS
|
--rethinkdb-servers RETHINKDB_SERVERS
|
||||||
rethinkdb servers, used for dedup and stats if
|
rethinkdb servers, used for dedup and stats if
|
||||||
specified; e.g.
|
specified; e.g.
|
||||||
|
3
setup.py
3
setup.py
@ -39,6 +39,7 @@ deps = [
|
|||||||
'certauth==1.1.6',
|
'certauth==1.1.6',
|
||||||
'warctools',
|
'warctools',
|
||||||
'urlcanon>=0.1.dev16',
|
'urlcanon>=0.1.dev16',
|
||||||
|
'urllib3',
|
||||||
'doublethink>=0.2.0.dev81',
|
'doublethink>=0.2.0.dev81',
|
||||||
'PySocks',
|
'PySocks',
|
||||||
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
|
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
|
||||||
@ -59,7 +60,7 @@ setuptools.setup(
|
|||||||
license='GPL',
|
license='GPL',
|
||||||
packages=['warcprox'],
|
packages=['warcprox'],
|
||||||
install_requires=deps,
|
install_requires=deps,
|
||||||
tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
|
tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
|
||||||
cmdclass = {'test': PyTest},
|
cmdclass = {'test': PyTest},
|
||||||
test_suite='warcprox.tests',
|
test_suite='warcprox.tests',
|
||||||
entry_points={
|
entry_points={
|
||||||
|
@ -40,7 +40,7 @@ do
|
|||||||
&& (cd /warcprox && git diff HEAD) | patch -p1 \
|
&& (cd /warcprox && git diff HEAD) | patch -p1 \
|
||||||
&& virtualenv -p $python /tmp/venv \
|
&& virtualenv -p $python /tmp/venv \
|
||||||
&& source /tmp/venv/bin/activate \
|
&& source /tmp/venv/bin/activate \
|
||||||
&& pip --log-file /tmp/pip.log install . pytest requests warcio \
|
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
|
||||||
&& py.test -v tests \
|
&& py.test -v tests \
|
||||||
&& py.test -v --rethinkdb-servers=localhost tests \
|
&& py.test -v --rethinkdb-servers=localhost tests \
|
||||||
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"
|
&& py.test -v --rethinkdb-servers=localhost --rethinkdb-big-table tests"
|
||||||
|
46
tests/test_dedup.py
Normal file
46
tests/test_dedup.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import mock
|
||||||
|
from warcprox.dedup import CdxServerDedup
|
||||||
|
|
||||||
|
|
||||||
|
def test_cdx_dedup():
|
||||||
|
# Mock CDX Server responses to simulate found, not found and errors.
|
||||||
|
with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request:
|
||||||
|
url = "http://example.com"
|
||||||
|
# not found case
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 200
|
||||||
|
result.data = b'20170101020405 test'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res is None
|
||||||
|
|
||||||
|
# found case
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 200
|
||||||
|
result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res["date"] == b"2017-02-03T04:05:03Z"
|
||||||
|
|
||||||
|
# invalid CDX result status code
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 400
|
||||||
|
result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res is None
|
||||||
|
# invalid CDX result content
|
||||||
|
result = mock.Mock()
|
||||||
|
result.status = 200
|
||||||
|
result.data = b'InvalidExceptionResult'
|
||||||
|
request.return_value = result
|
||||||
|
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
|
||||||
|
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
|
||||||
|
url=url)
|
||||||
|
assert res is None
|
@ -220,7 +220,7 @@ class RethinkCapturesDedup:
|
|||||||
self.captures_db = captures_db
|
self.captures_db = captures_db
|
||||||
self.options = options
|
self.options = options
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket="__unspecified__"):
|
def lookup(self, digest_key, bucket="__unspecified__", url=None):
|
||||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||||
algo, value_str = k.split(":")
|
algo, value_str = k.split(":")
|
||||||
if self.options.base32:
|
if self.options.base32:
|
||||||
|
@ -21,12 +21,17 @@ USA.
|
|||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from hanzo import warctools
|
from hanzo import warctools
|
||||||
import warcprox
|
import warcprox
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import urllib3
|
||||||
|
from urllib3.exceptions import HTTPError
|
||||||
|
|
||||||
|
urllib3.disable_warnings()
|
||||||
|
|
||||||
class DedupDb(object):
|
class DedupDb(object):
|
||||||
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
logger = logging.getLogger("warcprox.dedup.DedupDb")
|
||||||
@ -72,7 +77,7 @@ class DedupDb(object):
|
|||||||
conn.close()
|
conn.close()
|
||||||
self.logger.debug('dedup db saved %s:%s', key, json_value)
|
self.logger.debug('dedup db saved %s:%s', key, json_value)
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket=""):
|
def lookup(self, digest_key, bucket="", url=None):
|
||||||
result = None
|
result = None
|
||||||
key = digest_key.decode('utf-8') + '|' + bucket
|
key = digest_key.decode('utf-8') + '|' + bucket
|
||||||
conn = sqlite3.connect(self.file)
|
conn = sqlite3.connect(self.file)
|
||||||
@ -107,9 +112,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
|
|||||||
and recorded_url.response_recorder.payload_size() > 0):
|
and recorded_url.response_recorder.payload_size() > 0):
|
||||||
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
|
||||||
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
|
||||||
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"])
|
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
|
||||||
|
recorded_url.url)
|
||||||
else:
|
else:
|
||||||
recorded_url.dedup_info = dedup_db.lookup(digest_key)
|
recorded_url.dedup_info = dedup_db.lookup(digest_key,
|
||||||
|
url=recorded_url.url)
|
||||||
|
|
||||||
class RethinkDedupDb:
|
class RethinkDedupDb:
|
||||||
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
|
||||||
@ -154,7 +161,7 @@ class RethinkDedupDb:
|
|||||||
raise Exception("unexpected result %s saving %s", result, record)
|
raise Exception("unexpected result %s saving %s", result, record)
|
||||||
self.logger.debug('dedup db saved %s:%s', k, record)
|
self.logger.debug('dedup db saved %s:%s', k, record)
|
||||||
|
|
||||||
def lookup(self, digest_key, bucket=""):
|
def lookup(self, digest_key, bucket="", url=None):
|
||||||
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
|
||||||
k = "{}|{}".format(k, bucket)
|
k = "{}|{}".format(k, bucket)
|
||||||
result = self.rr.table(self.table).get(k).run()
|
result = self.rr.table(self.table).get(k).run()
|
||||||
@ -174,3 +181,66 @@ class RethinkDedupDb:
|
|||||||
else:
|
else:
|
||||||
self.save(digest_key, records[0])
|
self.save(digest_key, records[0])
|
||||||
|
|
||||||
|
|
||||||
|
class CdxServerDedup(object):
|
||||||
|
"""Query a CDX server to perform deduplication.
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
|
||||||
|
http_pool = urllib3.PoolManager()
|
||||||
|
|
||||||
|
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
|
||||||
|
options=warcprox.Options()):
|
||||||
|
self.cdx_url = cdx_url
|
||||||
|
self.options = options
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def save(self, digest_key, response_record, bucket=""):
|
||||||
|
"""Does not apply to CDX server, as it is obviously read-only.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def lookup(self, digest_key, url):
|
||||||
|
"""Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be
|
||||||
|
computed on the original content, after decoding Content-Encoding and
|
||||||
|
Transfer-Encoding, if any), if they match, write a revisit record.
|
||||||
|
|
||||||
|
Get only the last item (limit=-1) because Wayback Machine has special
|
||||||
|
performance optimisation to handle that. limit < 0 is very inefficient
|
||||||
|
in general. Maybe it could be configurable in the future.
|
||||||
|
|
||||||
|
:param digest_key: b'sha1:<KEY-VALUE>' (prefix is optional).
|
||||||
|
Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
|
||||||
|
:param url: Target URL string
|
||||||
|
Result must contain:
|
||||||
|
{"url": <URL>, "date": "%Y-%m-%dT%H:%M:%SZ"}
|
||||||
|
"""
|
||||||
|
u = url.decode("utf-8") if isinstance(url, bytes) else url
|
||||||
|
try:
|
||||||
|
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
|
||||||
|
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
|
||||||
|
limit=-1))
|
||||||
|
assert result.status == 200
|
||||||
|
if isinstance(digest_key, bytes):
|
||||||
|
dkey = digest_key
|
||||||
|
else:
|
||||||
|
dkey = digest_key.encode('utf-8')
|
||||||
|
dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey
|
||||||
|
line = result.data.strip()
|
||||||
|
if line:
|
||||||
|
(cdx_ts, cdx_digest) = line.split(b' ')
|
||||||
|
if cdx_digest == dkey:
|
||||||
|
dt = datetime.strptime(cdx_ts.decode('ascii'),
|
||||||
|
'%Y%m%d%H%M%S')
|
||||||
|
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
|
||||||
|
return dict(url=url, date=date)
|
||||||
|
except (HTTPError, AssertionError, ValueError) as exc:
|
||||||
|
self.logger.error('CdxServerDedup request failed for url=%s %s',
|
||||||
|
url, exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def notify(self, recorded_url, records):
|
||||||
|
"""Since we don't save anything to CDX server, this does not apply.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
@ -106,6 +106,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
|
|||||||
group = arg_parser.add_mutually_exclusive_group()
|
group = arg_parser.add_mutually_exclusive_group()
|
||||||
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
|
||||||
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
|
||||||
|
group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup',
|
||||||
|
help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search')
|
||||||
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
|
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
|
||||||
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
|
||||||
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
|
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
|
||||||
@ -189,6 +191,9 @@ def init_controller(args):
|
|||||||
else:
|
else:
|
||||||
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
|
dedup_db = warcprox.dedup.RethinkDedupDb(rr, options=options)
|
||||||
listeners.append(dedup_db)
|
listeners.append(dedup_db)
|
||||||
|
elif args.cdxserver_dedup:
|
||||||
|
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
|
||||||
|
listeners.append(dedup_db)
|
||||||
elif args.dedup_db_file in (None, '', '/dev/null'):
|
elif args.dedup_db_file in (None, '', '/dev/null'):
|
||||||
logging.info('deduplication disabled')
|
logging.info('deduplication disabled')
|
||||||
dedup_db = None
|
dedup_db = None
|
||||||
|
Loading…
x
Reference in New Issue
Block a user