Merge branch 'master' into trough-dedup

* master:
  Update docstring
  Move Warcprox-Meta header construction to warcproxy
  Improve test_writer tests
  Replace timestamp parameter with more generic request/response syntax
  Return capture timestamp
  Swap fcntl.flock with fcntl.lockf
  Unit test fix for Python2 compatibility
  Test WarcWriter file locking when no_warc_open_suffix=True
  Rename writer var and add exception handling
  Acquire and exclusive file lock when not using .open WARC suffix
  Add hidden --no-warc-open-suffix CLI option
  Fix missing dummy url param in bigtable lookup method
  back to dev version number
  version 2.2 for pypi to address https://github.com/internetarchive/warcprox/issues/42
  Expand comment with limit=-1 explanation
  Drop unnecessary split for newline in CDX results
  fix benchmarks (update command line args)
  Update CdxServerDedup lookup algorithm
  Pass url instead of recorded_url obj to dedup lookup methods
  Filter out warc/revisit records in CdxServerDedup
  Improve CdxServerDedup implementation
  Fix minor CdxServerDedup unit test
  Fix bug with dedup_info date encoding
  Add mock pkg to run-tests.sh
  Add CdxServerDedup unit tests and improve its exception handling
  Add CDX Server based deduplication
  cryptography lib version 2.1.1 is causing problems
  Revert changes to test_warcprox.py
  Revert changes to bigtable and dedup
  Revert warc to previous behavior
  Update unit test
  Replace invalid warcfilename variable in playback
  Stop using WarcRecord.REFERS_TO header and use payload_digest instead
This commit is contained in:
Noah Levitt 2017-11-02 16:34:52 -07:00
commit ed49eea4d5
15 changed files with 334 additions and 95 deletions

View File

@ -47,6 +47,7 @@ Usage
[--stats-db-file STATS_DB_FILE] [-P PLAYBACK_PORT]
[--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
[-j DEDUP_DB_FILE | --rethinkdb-servers RETHINKDB_SERVERS]
[--cdxserver-dedup CDX_SERVER_URL]
[--rethinkdb-db RETHINKDB_DB] [--rethinkdb-big-table]
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
[--plugin PLUGIN_CLASS] [--version] [-v] [--trace] [-q]
@ -100,6 +101,9 @@ Usage
persistent deduplication database file; empty
string or /dev/null disables deduplication
(default: ./warcprox.sqlite)
--cdxserver-dedup CDX_SERVER_URL
use a CDX server for deduplication
(default: None)
--rethinkdb-servers RETHINKDB_SERVERS
rethinkdb servers, used for dedup and stats if
specified; e.g.

View File

@ -163,78 +163,87 @@ Benchmarking code uses asyncio/aiohttp and requires python 3.5 or later.
arg_parser = argparse.ArgumentParser(
prog=prog, description=desc,
formatter_class=warcprox.main.BetterArgumentDefaultsHelpFormatter)
arg_parser.add_argument(
'-z', '--gzip', dest='gzip', action='store_true',
### these warcprox options are not configurable for the benchmarks
# arg_parser.add_argument('-p', '--port', dest='port', default='8000',
# type=int, help='port to listen on')
# arg_parser.add_argument('-b', '--address', dest='address',
# default='localhost', help='address to listen on')
# arg_parser.add_argument('-c', '--cacert', dest='cacert',
# default='./{0}-warcprox-ca.pem'.format(socket.gethostname()),
# help='CA certificate file; if file does not exist, it will be created')
# arg_parser.add_argument('--certs-dir', dest='certs_dir',
# default='./{0}-warcprox-ca'.format(socket.gethostname()),
# help='where to store and load generated certificates')
# arg_parser.add_argument('-d', '--dir', dest='directory',
# default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument(
'-s', '--size', dest='size', default=1000*1000*1000, type=int,
help='WARC file rollover size threshold in bytes')
arg_parser.add_argument(
'--rollover-idle-time', dest='rollover_idle_time', default=None,
type=int, help=(
'WARC file rollover idle time threshold in seconds (so that '
"Friday's last open WARC doesn't sit there all weekend "
'waiting for more data)'))
'-s', '--size', dest='rollover_size', default=1000*1000*1000,
type=int, help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None, type=int,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
try:
hash_algos = hashlib.algorithms_guaranteed
except AttributeError:
hash_algos = hashlib.algorithms
arg_parser.add_argument(
'-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of %s' % hash_algos)
arg_parser.add_argument('-g', '--digest-algorithm', dest='digest_algorithm',
default='sha1', help='digest algorithm, one of {}'.format(', '.join(hash_algos)))
arg_parser.add_argument('--base32', dest='base32', action='store_true',
default=False, help='write digests in Base32 instead of hex')
arg_parser.add_argument(
'--method-filter', metavar='HTTP_METHOD',
action='append', help=(
'only record requests with the given http method(s) (can be '
'used more than once)'))
arg_parser.add_argument(
'--stats-db-file', dest='stats_db_file',
default=os.path.join(tmpdir, 'stats.db'), help=(
'persistent statistics database file; empty string or '
'/dev/null disables statistics tracking'))
arg_parser.add_argument('--method-filter', metavar='HTTP_METHOD',
action='append', help='only record requests with the given http method(s) (can be used more than once)')
arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox.sqlite', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)')
group = arg_parser.add_mutually_exclusive_group()
group.add_argument(
'-j', '--dedup-db-file', dest='dedup_db_file',
default=os.path.join(tmpdir, 'dedup.db'), help=(
'persistent deduplication database file; empty string or '
'/dev/null disables deduplication'))
group.add_argument(
'--rethinkdb-servers', dest='rethinkdb_servers', help=(
'rethinkdb servers, used for dedup and stats if specified; '
'e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org'))
# arg_parser.add_argument(
# '--rethinkdb-db', dest='rethinkdb_db', default='warcprox', help=(
# 'rethinkdb database name (ignored unless --rethinkdb-servers '
# 'is specified)'))
group.add_argument('-j', '--dedup-db-file', dest='dedup_db_file',
default='./warcprox.sqlite', help='persistent deduplication database file; empty string or /dev/null disables deduplication')
group.add_argument('--rethinkdb-servers', dest='rethinkdb_servers',
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default='warcprox',
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument(
'--rethinkdb-big-table', dest='rethinkdb_big_table',
action='store_true', default=False, help=(
'use a big rethinkdb table called "captures", instead of a '
'small table called "dedup"; table is suitable for use as '
'index for playback (ignored unless --rethinkdb-servers is '
'specified)'))
'--rethinkdb-big-table-name', dest='rethinkdb_big_table_name',
default='captures', help=argparse.SUPPRESS)
arg_parser.add_argument('--queue-size', dest='queue_size', type=int,
default=500, help=argparse.SUPPRESS)
arg_parser.add_argument('--max-threads', dest='max_threads', type=int,
help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS)
arg_parser.add_argument(
'--queue-size', dest='queue_size', type=int, default=1, help=(
'max size of the queue of urls waiting to be processed by '
'the warc writer thread'))
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=(
'host:port of tor socks proxy, used only to connect to '
'.onion sites'))
arg_parser.add_argument(
'--max-threads', dest='max_threads', type=int, help=(
'number of proxy server threads (if not specified, chosen based '
'on system resource limits'))
arg_parser.add_argument(
'--version', action='version',
version='warcprox %s' % warcprox.__version__)
arg_parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='verbose logging')
arg_parser.add_argument(
'--trace', dest='trace', action='store_true',
help='trace-level logging')
arg_parser.add_argument(
'--profile', dest='profile', action='store_true', default=False,
help='profile the warc writer thread')
'--plugin', metavar='PLUGIN_CLASS', dest='plugins',
action='append', help=(
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
'May be used multiple times to register multiple plugins. '
'Plugin classes are loaded from the regular python module '
'search path. They will be instantiated with no arguments and '
'must have a method `notify(self, recorded_url, records)` '
'which will be called for each url, after warc records have '
'been written.'))
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.__version__))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('--trace', dest='trace', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
arg_parser.add_argument(
'--requests', dest='requests', type=int, default=200,
help='number of urls to fetch')

View File

@ -40,6 +40,7 @@ deps = [
'warctools',
'urlcanon>=0.1.dev16',
'doublethink>=0.2.0.dev87',
'urllib3',
'PySocks',
'cryptography!=2.1.1', # 2.1.1 installation is failing on ubuntu
]
@ -50,7 +51,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.2b1.dev105',
version='2.2.1b2.dev107',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',
@ -59,7 +60,7 @@ setuptools.setup(
license='GPL',
packages=['warcprox'],
install_requires=deps,
tests_require=['requests>=2.0.1', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
tests_require=['requests>=2.0.1', 'mock', 'pytest', 'warcio'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
cmdclass = {'test': PyTest},
test_suite='warcprox.tests',
entry_points={

View File

@ -38,7 +38,7 @@ do
&& (cd /warcprox && git diff HEAD) | patch -p1 \
&& virtualenv -p $python /tmp/venv \
&& source /tmp/venv/bin/activate \
&& pip --log-file /tmp/pip.log install . pytest requests warcio \
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
&& py.test -v tests \
&& py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
&& py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \

46
tests/test_dedup.py Normal file
View File

@ -0,0 +1,46 @@
import mock
from warcprox.dedup import CdxServerDedup
def test_cdx_dedup():
# Mock CDX Server responses to simulate found, not found and errors.
with mock.patch('warcprox.dedup.CdxServerDedup.http_pool.request') as request:
url = "http://example.com"
# not found case
result = mock.Mock()
result.status = 200
result.data = b'20170101020405 test'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# found case
result = mock.Mock()
result.status = 200
result.data = b'20170203040503 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res["date"] == b"2017-02-03T04:05:03Z"
# invalid CDX result status code
result = mock.Mock()
result.status = 400
result.data = b'20170101020405 B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None
# invalid CDX result content
result = mock.Mock()
result.status = 200
result.data = b'InvalidExceptionResult'
request.return_value = result
cdx_server = CdxServerDedup(cdx_url="dummy-cdx-server-url")
res = cdx_server.lookup(digest_key="B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
url=url)
assert res is None

View File

@ -570,6 +570,22 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"accept": ["capture-metadata"]}
headers = {"Warcprox-Meta": json.dumps(request_meta)}
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 200
assert response.headers['Warcprox-Meta']
data = json.loads(response.headers['Warcprox-Meta'])
assert data['capture-metadata']
try:
dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'],
'%Y-%m-%dT%H:%M:%SZ')
assert dt
except ValueError:
pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp'])
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)

57
tests/test_writer.py Normal file
View File

@ -0,0 +1,57 @@
import os
import fcntl
from multiprocessing import Process, Queue
from datetime import datetime
import pytest
from warcprox.mitmproxy import ProxyingRecorder
from warcprox.warcproxy import RecordedUrl
from warcprox.writer import WarcWriter
from warcprox import Options
recorder = ProxyingRecorder(None, None, 'sha1', url='http://example.com')
recorded_url = RecordedUrl(url='http://example.com', content_type='text/plain',
status=200, client_ip='127.0.0.2',
request_data=b'abc',
response_recorder=recorder,
remote_ip='127.0.0.3',
timestamp=datetime.utcnow())
def lock_file(queue, filename):
"""Try to lock file and return 1 if successful, else return 0.
It is necessary to run this method in a different process to test locking.
"""
try:
fi = open(filename, 'ab')
fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB)
fi.close()
queue.put('OBTAINED LOCK')
except IOError:
queue.put('FAILED TO OBTAIN LOCK')
def test_warc_writer_locking(tmpdir):
"""Test if WarcWriter is locking WARC files.
When we don't have the .open suffix, WarcWriter locks the file and the
external process trying to ``lock_file`` fails (result=0).
"""
dirname = os.path.dirname(str(tmpdir.mkdir('test-warc-writer')))
wwriter = WarcWriter(Options(directory=dirname, no_warc_open_suffix=True))
wwriter.write_records(recorded_url)
warcs = [fn for fn in os.listdir(dirname) if fn.endswith('.warc')]
assert warcs
target_warc = os.path.join(dirname, warcs[0])
# launch another process and try to lock WARC file
queue = Queue()
p = Process(target=lock_file, args=(queue, target_warc))
p.start()
p.join()
assert queue.get() == 'FAILED TO OBTAIN LOCK'
wwriter.close_writer()
# locking must succeed after writer has closed the WARC file.
p = Process(target=lock_file, args=(queue, target_warc))
p.start()
p.join()
assert queue.get() == 'OBTAINED LOCK'

View File

@ -221,7 +221,7 @@ class RethinkCapturesDedup:
self.captures_db = RethinkCaptures(options=options)
self.options = options
def lookup(self, digest_key, bucket="__unspecified__"):
def lookup(self, digest_key, bucket="__unspecified__", url=None):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
algo, value_str = k.split(":")
if self.options.base32:

View File

@ -31,6 +31,10 @@ import requests
import doublethink
import rethinkdb as r
import datetime
import urllib3
from urllib3.exceptions import HTTPError
urllib3.disable_warnings()
class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -76,7 +80,7 @@ class DedupDb(object):
conn.close()
self.logger.debug('dedup db saved %s:%s', key, json_value)
def lookup(self, digest_key, bucket=""):
def lookup(self, digest_key, bucket="", url=None):
result = None
key = digest_key.decode('utf-8') + '|' + bucket
conn = sqlite3.connect(self.file)
@ -111,9 +115,11 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"])
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
else:
recorded_url.dedup_info = dedup_db.lookup(digest_key)
recorded_url.dedup_info = dedup_db.lookup(digest_key,
url=recorded_url.url)
class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
@ -157,7 +163,7 @@ class RethinkDedupDb:
raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug('dedup db saved %s:%s', k, record)
def lookup(self, digest_key, bucket=""):
def lookup(self, digest_key, bucket="", url=None):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
result = self.rr.table(self.table).get(k).run()
@ -177,6 +183,69 @@ class RethinkDedupDb:
else:
self.save(digest_key, records[0])
class CdxServerDedup(object):
"""Query a CDX server to perform deduplication.
"""
logger = logging.getLogger("warcprox.dedup.CdxServerDedup")
http_pool = urllib3.PoolManager()
def __init__(self, cdx_url="https://web.archive.org/cdx/search",
options=warcprox.Options()):
self.cdx_url = cdx_url
self.options = options
def start(self):
pass
def save(self, digest_key, response_record, bucket=""):
"""Does not apply to CDX server, as it is obviously read-only.
"""
pass
def lookup(self, digest_key, url):
"""Compare `sha1` with SHA1 hash of fetched content (note SHA1 must be
computed on the original content, after decoding Content-Encoding and
Transfer-Encoding, if any), if they match, write a revisit record.
Get only the last item (limit=-1) because Wayback Machine has special
performance optimisation to handle that. limit < 0 is very inefficient
in general. Maybe it could be configurable in the future.
:param digest_key: b'sha1:<KEY-VALUE>' (prefix is optional).
Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
:param url: Target URL string
Result must contain:
{"url": <URL>, "date": "%Y-%m-%dT%H:%M:%SZ"}
"""
u = url.decode("utf-8") if isinstance(url, bytes) else url
try:
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1))
assert result.status == 200
if isinstance(digest_key, bytes):
dkey = digest_key
else:
dkey = digest_key.encode('utf-8')
dkey = dkey[5:] if dkey.startswith(b'sha1:') else dkey
line = result.data.strip()
if line:
(cdx_ts, cdx_digest) = line.split(b' ')
if cdx_digest == dkey:
dt = datetime.datetime.strptime(
cdx_ts.decode('ascii'), '%Y%m%d%H%M%S')
date = dt.strftime('%Y-%m-%dT%H:%M:%SZ').encode('utf-8')
return dict(url=url, date=date)
except (HTTPError, AssertionError, ValueError) as exc:
self.logger.error('CdxServerDedup request failed for url=%s %s',
url, exc)
return None
def notify(self, recorded_url, records):
"""Since we don't save anything to CDX server, this does not apply.
"""
pass
class TroughDedupDb(object):
'''
https://github.com/jkafader/trough
@ -265,7 +334,7 @@ class TroughDedupDb(object):
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
def lookup(self, digest_key, bucket='__unspecified__'):
def lookup(self, digest_key, bucket='__unspecified__', url=None):
read_url = self._read_url(bucket)
if not read_url:
return None

View File

@ -80,6 +80,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default='./warcs', help='where to write warcs')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
arg_parser.add_argument('--no-warc-open-suffix', dest='no_warc_open_suffix',
default=False, action='store_true', help=argparse.SUPPRESS)
arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument(
@ -133,6 +135,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
'🐷   url pointing to trough configuration rethinkdb database, '
'e.g. rethinkdb://db0.foo.org,db1.foo.org:38015'
'/trough_configuration'))
group.add_argument('--cdxserver-dedup', dest='cdxserver_dedup',
help='use a CDX Server URL for deduplication; e.g. https://web.archive.org/cdx/search')
arg_parser.add_argument(
'--rethinkdb-services-url', dest='rethinkdb_services_url', help=(
'rethinkdb service registry table url; if provided, warcprox '
@ -205,6 +209,8 @@ def init_controller(args):
dedup_db = warcprox.bigtable.RethinkCapturesDedup(options=options)
elif args.rethinkdb_trough_db_url:
dedup_db = warcprox.dedup.TroughDedupDb(options)
elif args.cdxserver_dedup:
dedup_db = warcprox.dedup.CdxServerDedup(cdx_url=args.cdxserver_dedup)
elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled')
dedup_db = None

View File

@ -45,6 +45,7 @@ try:
http_client._MAXLINE = 4194304 # 4 MiB
except ImportError:
import httplib as http_client
import json
import socket
import logging
import ssl
@ -163,13 +164,17 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.fp, proxy_client, digest_algorithm, url=url)
self.fp = self.recorder
def begin(self):
def begin(self, extra_response_headers={}):
http_client.HTTPResponse.begin(self) # reads status line, headers
status_and_headers = 'HTTP/1.1 {} {}\r\n'.format(
self.status, self.reason)
self.msg['Via'] = via_header_value(
self.msg.get('Via'), '%0.1f' % (self.version / 10.0))
if extra_response_headers:
for header, value in extra_response_headers.items():
self.msg[header] = value
for k,v in self.msg.items():
if k.lower() not in (
'connection', 'proxy-connection', 'keep-alive',
@ -361,12 +366,16 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error("exception proxying request", exc_info=True)
raise
def _proxy_request(self):
def _proxy_request(self, extra_response_headers={}):
'''
Sends the request to the remote server, then uses a ProxyingRecorder to
read the response and send it to the proxy client, while recording the
bytes in transit. Returns a tuple (request, response) where request is
the raw request bytes, and response is a ProxyingRecorder.
:param extra_response_headers: generated on warcprox._proxy_request.
It may contain extra HTTP headers such as ``Warcprox-Meta`` which
are written in the WARC record for this request.
'''
# Build request
req_str = '{} {} {}\r\n'.format(
@ -407,7 +416,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self._remote_server_sock, proxy_client=self.connection,
digest_algorithm=self.server.digest_algorithm,
url=self.url, method=self.command)
prox_rec_res.begin()
prox_rec_res.begin(extra_response_headers=extra_response_headers)
buf = prox_rec_res.read(8192)
while buf != b'':

View File

@ -120,9 +120,12 @@ class PlaybackProxyHandler(MitmProxyHandler):
def _send_headers_and_refd_payload(
self, headers, refers_to, refers_to_target_uri, refers_to_date):
self, headers, refers_to_target_uri, refers_to_date, payload_digest):
"""Parameters:
"""
location = self.server.playback_index_db.lookup_exact(
refers_to_target_uri, refers_to_date, record_id=refers_to)
refers_to_target_uri, refers_to_date, payload_digest)
self.logger.debug('loading http payload from {}'.format(location))
fh = self._open_warc_at_offset(location['f'], location['o'])
@ -131,7 +134,7 @@ class PlaybackProxyHandler(MitmProxyHandler):
pass
if errors:
raise Exception('warc errors at {}:{} -- {}'.format(warcfilename, offset, errors))
raise Exception('warc errors at {}:{} -- {}'.format(location['f'], offset, errors))
warc_type = record.get_header(warctools.WarcRecord.TYPE)
if warc_type != warctools.WarcRecord.RESPONSE:
@ -177,20 +180,19 @@ class PlaybackProxyHandler(MitmProxyHandler):
if warc_profile != warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST:
raise Exception('unknown revisit record profile {}'.format(warc_profile))
refers_to = record.get_header(
warctools.WarcRecord.REFERS_TO).decode('latin1')
refers_to_target_uri = record.get_header(
warctools.WarcRecord.REFERS_TO_TARGET_URI).decode(
'latin1')
refers_to_date = record.get_header(
warctools.WarcRecord.REFERS_TO_DATE).decode('latin1')
payload_digest = record.get_header(
warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
self.logger.debug(
'revisit record references %s:%s capture of %s',
refers_to_date, refers_to, refers_to_target_uri)
refers_to_date, payload_digest, refers_to_target_uri)
return self._send_headers_and_refd_payload(
record.content[1], refers_to, refers_to_target_uri,
refers_to_date)
record.content[1], refers_to_target_uri, refers_to_date,
payload_digest)
else:
# send it back raw, whatever it is
@ -264,12 +266,12 @@ class PlaybackIndexDb(object):
# XXX canonicalize url?
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date_str = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
record_id_str = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
payload_digest_str = response_record.get_header(warctools.WarcRecord.PAYLOAD_DIGEST).decode('latin1')
# there could be two visits of same url in the same second, and WARC-Date is
# prescribed as YYYY-MM-DDThh:mm:ssZ, so we have to handle it :-\
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'d':payload_digest},record2,...],date2:[{...}],...}
with self._lock:
conn = sqlite3.connect(self.file)
@ -283,10 +285,10 @@ class PlaybackIndexDb(object):
if date_str in py_value:
py_value[date_str].append(
{'f':warcfile, 'o':offset, 'i':record_id_str})
{'f': warcfile, 'o': offset, 'd': payload_digest_str})
else:
py_value[date_str] = [
{'f':warcfile, 'o':offset, 'i':record_id_str}]
{'f': warcfile, 'o': offset, 'd': payload_digest_str}]
json_value = json.dumps(py_value, separators=(',',':'))
@ -314,11 +316,11 @@ class PlaybackIndexDb(object):
latest_date = max(py_value)
result = py_value[latest_date][0]
result['i'] = result['i'].encode('ascii')
result['d'] = result['d'].encode('ascii')
return latest_date, result
# in python3 params are bytes
def lookup_exact(self, url, warc_date, record_id):
def lookup_exact(self, url, warc_date, payload_digest):
conn = sqlite3.connect(self.file)
cursor = conn.execute(
'select value from playback where url = ?', (url,))
@ -334,14 +336,13 @@ class PlaybackIndexDb(object):
if warc_date in py_value:
for record in py_value[warc_date]:
if record['i'] == record_id:
if record['d'] == payload_digest:
self.logger.debug(
"found exact match for (%r,%r,%r)",
warc_date, record_id, url)
record['i'] = record['i']
warc_date, payload_digest, url)
record['d'] = record['d']
return record
else:
self.logger.info(
"match not found for (%r,%r,%r)", warc_date, record_id, url)
"match not found for (%r,%r,%r)", warc_date, payload_digest, url)
return None

View File

@ -50,7 +50,7 @@ class WarcRecordBuilder:
url=recorded_url.url, warc_date=warc_date,
data=response_header_block,
warc_type=warctools.WarcRecord.REVISIT,
refers_to=recorded_url.dedup_info['id'],
refers_to=recorded_url.dedup_info.get('id'),
refers_to_target_uri=recorded_url.dedup_info['url'],
refers_to_date=recorded_url.dedup_info['date'],
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),

View File

@ -179,9 +179,14 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
remote_ip = self._remote_server_sock.getpeername()[0]
timestamp = datetime.datetime.utcnow()
extra_response_headers = {}
if warcprox_meta and 'accept' in warcprox_meta and \
'capture-metadata' in warcprox_meta['accept']:
rmeta = {'capture-metadata': {'timestamp': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')}}
extra_response_headers['Warcprox-Meta'] = json.dumps(rmeta, separators=',:')
req, prox_rec_res = warcprox.mitmproxy.MitmProxyHandler._proxy_request(
self)
self, extra_response_headers=extra_response_headers)
content_type = None
try:

View File

@ -24,6 +24,7 @@ from __future__ import absolute_import
import logging
from datetime import datetime
from hanzo import warctools
import fcntl
import time
import warcprox
import os
@ -53,6 +54,7 @@ class WarcWriter:
self._f = None
self._fpath = None
self._f_finalname = None
self._f_open_suffix = '' if options.no_warc_open_suffix else '.open'
self._serial = 0
self._lock = threading.RLock()
@ -70,6 +72,12 @@ class WarcWriter:
with self._lock:
if self._fpath:
self.logger.info('closing %s', self._f_finalname)
if self._f_open_suffix == '':
try:
fcntl.lockf(self._f, fcntl.LOCK_UN)
except IOError as exc:
self.logger.error('could not unlock file %s (%s)',
self._fpath, exc)
self._f.close()
finalpath = os.path.sep.join(
[self.directory, self._f_finalname])
@ -91,9 +99,17 @@ class WarcWriter:
self.prefix, self.timestamp17(), self._serial,
self._randomtoken, '.gz' if self.gzip else '')
self._fpath = os.path.sep.join([
self.directory, self._f_finalname + '.open'])
self.directory, self._f_finalname + self._f_open_suffix])
self._f = open(self._fpath, 'wb')
# if no '.open' suffix is used for WARC, acquire an exclusive
# file lock.
if self._f_open_suffix == '':
try:
fcntl.lockf(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as exc:
self.logger.error('could not lock file %s (%s)',
self._fpath, exc)
warcinfo_record = self.record_builder.build_warcinfo_record(
self._f_finalname)