mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
Merge branch 'master' into qa
* master:
log exception and continue 🤞 if schema reg fails
log stack trace in case batch postprocessor raises
bump dev version after merge
more edits
more little edits
explain a bit about mitm
little edits
describe the last two remaining fields
fixlets
more progress on documenting "limits"
add another "wait" to fix failing test
fix bug in limits enforcement
docs still in progress
new checks exposing bug in limits enforcement
working on "limits" and "soft-limits"
explain warcprox-meta "blocks"
starting to explain some warcprox-meta fields
short sectioni on stats
barely starting to flesh out warcprox-meta section
explain deduplication
starting to talk about warcprox-meta
fix failure message in test_return_capture_timestamp
double the backticks
stubby api docs
rename README.rst -> readme.rst
add some debug logging in BatchTroughLoader
just one should_dedup() for trough dedup
only run tests in py3
fix trough deployment in Dockerfile
fix test_dedup_min_text_size failure?
rewrite test_dedup_min_size() to account for
we want to save all captures to the big "captures"
default values for dedup_min_text_size et al
This commit is contained in:
commit
a3f5313850
186
README.rst
186
README.rst
@ -1,186 +0,0 @@
|
||||
warcprox - WARC writing MITM HTTP/S proxy
|
||||
-----------------------------------------
|
||||
.. image:: https://travis-ci.org/internetarchive/warcprox.svg?branch=master
|
||||
:target: https://travis-ci.org/internetarchive/warcprox
|
||||
|
||||
Based on the excellent and simple pymiproxy by Nadeem Douba.
|
||||
https://github.com/allfro/pymiproxy
|
||||
|
||||
Install
|
||||
~~~~~~~
|
||||
|
||||
Warcprox runs on python 3.4+.
|
||||
|
||||
To install latest release run:
|
||||
|
||||
::
|
||||
|
||||
# apt-get install libffi-dev libssl-dev
|
||||
pip install warcprox
|
||||
|
||||
You can also install the latest bleeding edge code:
|
||||
|
||||
::
|
||||
|
||||
pip install git+https://github.com/internetarchive/warcprox.git
|
||||
|
||||
|
||||
Trusting the CA cert
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
For best results while browsing through warcprox, you need to add the CA
|
||||
cert as a trusted cert in your browser. If you don't do that, you will
|
||||
get the warning when you visit each new site. But worse, any embedded
|
||||
https content on a different server will simply fail to load, because
|
||||
the browser will reject the certificate without telling you.
|
||||
|
||||
Plugins
|
||||
~~~~~~~
|
||||
|
||||
Warcprox supports a limited notion of plugins by way of the `--plugin` command
|
||||
line argument. Plugin classes are loaded from the regular python module search
|
||||
path. They will be instantiated with one argument, a `warcprox.Options`, which
|
||||
holds the values of all the command line arguments. Legacy plugins with
|
||||
constructors that take no arguments are also supported. Plugins should either
|
||||
have a method `notify(self, recorded_url, records)` or should subclass
|
||||
`warcprox.BasePostfetchProcessor`. More than one plugin can be configured by
|
||||
specifying `--plugin` multiples times.
|
||||
|
||||
`A minimal example <https://github.com/internetarchive/warcprox/blob/318405e795ac0ab8760988a1a482cf0a17697148/warcprox/__init__.py#L165>`__
|
||||
|
||||
Usage
|
||||
~~~~~
|
||||
|
||||
::
|
||||
|
||||
usage: warcprox [-h] [-p PORT] [-b ADDRESS] [-c CACERT]
|
||||
[--certs-dir CERTS_DIR] [-d DIRECTORY]
|
||||
[--warc-filename WARC_FILENAME] [-z] [-n PREFIX]
|
||||
[-s ROLLOVER_SIZE]
|
||||
[--rollover-idle-time ROLLOVER_IDLE_TIME]
|
||||
[-g DIGEST_ALGORITHM] [--base32]
|
||||
[--method-filter HTTP_METHOD]
|
||||
[--stats-db-file STATS_DB_FILE | --rethinkdb-stats-url RETHINKDB_STATS_URL]
|
||||
[-P PLAYBACK_PORT]
|
||||
[-j DEDUP_DB_FILE | --rethinkdb-dedup-url RETHINKDB_DEDUP_URL | --rethinkdb-big-table-url RETHINKDB_BIG_TABLE_URL | --rethinkdb-trough-db-url RETHINKDB_TROUGH_DB_URL | --cdxserver-dedup CDXSERVER_DEDUP]
|
||||
[--rethinkdb-services-url RETHINKDB_SERVICES_URL]
|
||||
[--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
|
||||
[--crawl-log-dir CRAWL_LOG_DIR] [--plugin PLUGIN_CLASS]
|
||||
[--version] [-v] [--trace] [-q]
|
||||
|
||||
warcprox - WARC writing MITM HTTP/S proxy
|
||||
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
-p PORT, --port PORT port to listen on (default: 8000)
|
||||
-b ADDRESS, --address ADDRESS
|
||||
address to listen on (default: localhost)
|
||||
-c CACERT, --cacert CACERT
|
||||
CA certificate file; if file does not exist, it
|
||||
will be created (default:
|
||||
./ayutla.monkeybrains.net-warcprox-ca.pem)
|
||||
--certs-dir CERTS_DIR
|
||||
where to store and load generated certificates
|
||||
(default: ./ayutla.monkeybrains.net-warcprox-ca)
|
||||
-d DIRECTORY, --dir DIRECTORY
|
||||
where to write warcs (default: ./warcs)
|
||||
--warc-filename WARC_FILENAME
|
||||
define custom WARC filename with variables
|
||||
{prefix}, {timestamp14}, {timestamp17},
|
||||
{serialno}, {randomtoken}, {hostname},
|
||||
{shorthostname} (default:
|
||||
{prefix}-{timestamp17}-{serialno}-{randomtoken})
|
||||
-z, --gzip write gzip-compressed warc records
|
||||
-n PREFIX, --prefix PREFIX
|
||||
default WARC filename prefix (default: WARCPROX)
|
||||
-s ROLLOVER_SIZE, --size ROLLOVER_SIZE
|
||||
WARC file rollover size threshold in bytes
|
||||
(default: 1000000000)
|
||||
--rollover-idle-time ROLLOVER_IDLE_TIME
|
||||
WARC file rollover idle time threshold in seconds
|
||||
(so that Friday's last open WARC doesn't sit there
|
||||
all weekend waiting for more data) (default: None)
|
||||
-g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM
|
||||
digest algorithm, one of sha384, sha224, md5,
|
||||
sha256, sha512, sha1 (default: sha1)
|
||||
--base32 write digests in Base32 instead of hex
|
||||
--method-filter HTTP_METHOD
|
||||
only record requests with the given http method(s)
|
||||
(can be used more than once) (default: None)
|
||||
--stats-db-file STATS_DB_FILE
|
||||
persistent statistics database file; empty string
|
||||
or /dev/null disables statistics tracking
|
||||
(default: ./warcprox.sqlite)
|
||||
--rethinkdb-stats-url RETHINKDB_STATS_URL
|
||||
rethinkdb stats table url, e.g. rethinkdb://db0.fo
|
||||
o.org,db1.foo.org:38015/my_warcprox_db/my_stats_ta
|
||||
ble (default: None)
|
||||
-P PLAYBACK_PORT, --playback-port PLAYBACK_PORT
|
||||
port to listen on for instant playback (default:
|
||||
None)
|
||||
-j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE
|
||||
persistent deduplication database file; empty
|
||||
string or /dev/null disables deduplication
|
||||
(default: ./warcprox.sqlite)
|
||||
--rethinkdb-dedup-url RETHINKDB_DEDUP_URL
|
||||
rethinkdb dedup url, e.g. rethinkdb://db0.foo.org,
|
||||
db1.foo.org:38015/my_warcprox_db/my_dedup_table
|
||||
(default: None)
|
||||
--rethinkdb-big-table-url RETHINKDB_BIG_TABLE_URL
|
||||
rethinkdb big table url (table will be populated
|
||||
with various capture information and is suitable
|
||||
for use as index for playback), e.g. rethinkdb://d
|
||||
b0.foo.org,db1.foo.org:38015/my_warcprox_db/captur
|
||||
es (default: None)
|
||||
--rethinkdb-trough-db-url RETHINKDB_TROUGH_DB_URL
|
||||
🐷 url pointing to trough configuration rethinkdb
|
||||
database, e.g. rethinkdb://db0.foo.org,db1.foo.org
|
||||
:38015/trough_configuration (default: None)
|
||||
--cdxserver-dedup CDXSERVER_DEDUP
|
||||
use a CDX Server URL for deduplication; e.g.
|
||||
https://web.archive.org/cdx/search (default: None)
|
||||
--rethinkdb-services-url RETHINKDB_SERVICES_URL
|
||||
rethinkdb service registry table url; if provided,
|
||||
warcprox will create and heartbeat entry for
|
||||
itself (default: None)
|
||||
--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY
|
||||
host:port of tor socks proxy, used only to connect
|
||||
to .onion sites (default: None)
|
||||
--crawl-log-dir CRAWL_LOG_DIR
|
||||
if specified, write crawl log files in the
|
||||
specified directory; one crawl log is written per
|
||||
warc filename prefix; crawl log format mimics
|
||||
heritrix (default: None)
|
||||
--plugin PLUGIN_CLASS
|
||||
Qualified name of plugin class, e.g.
|
||||
"mypkg.mymod.MyClass". May be used multiple times
|
||||
to register multiple plugins. See README.rst for
|
||||
more information. (default: None)
|
||||
--version show program's version number and exit
|
||||
-v, --verbose
|
||||
--trace
|
||||
-q, --quiet
|
||||
|
||||
License
|
||||
~~~~~~~
|
||||
|
||||
Warcprox is a derivative work of pymiproxy, which is GPL. Thus warcprox is also
|
||||
GPL.
|
||||
|
||||
* Copyright (C) 2012 Cygnos Corporation
|
||||
* Copyright (C) 2013-2018 Internet Archive
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; either version 2
|
||||
of the License, or (at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
320
api.rst
Normal file
320
api.rst
Normal file
@ -0,0 +1,320 @@
|
||||
warcprox API
|
||||
************
|
||||
|
||||
Means of interacting with warcprox over http, aside from simply proxying urls.
|
||||
|
||||
.. contents::
|
||||
|
||||
``/status`` url
|
||||
===============
|
||||
|
||||
If warcprox is running at localhost:8000, http://localhost:8000/status returns
|
||||
a json blob with a bunch of status info. For example:
|
||||
|
||||
::
|
||||
|
||||
$ curl -sS http://localhost:8000/status
|
||||
{
|
||||
"rates_5min": {
|
||||
"warc_bytes_per_sec": 0.0,
|
||||
"urls_per_sec": 0.0,
|
||||
"actual_elapsed": 277.2983281612396
|
||||
},
|
||||
"version": "2.4b2.dev174",
|
||||
"load": 0.0,
|
||||
"seconds_behind": 0.0,
|
||||
"threads": 100,
|
||||
"warc_bytes_written": 0,
|
||||
"port": 8000,
|
||||
"postfetch_chain": [
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "SkipFacebookCaptchas"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "BatchTroughLoader"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "WarcWriterProcessor"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "BatchTroughStorer"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "RethinkStatsProcessor"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "CrawlLogger"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "TroughFeed"
|
||||
},
|
||||
{
|
||||
"queued_urls": 0,
|
||||
"processor": "RunningStats"
|
||||
}
|
||||
],
|
||||
"queue_max_size": 500,
|
||||
"role": "warcprox",
|
||||
"queued_urls": 0,
|
||||
"active_requests": 1,
|
||||
"host": "wbgrp-svc405.us.archive.org",
|
||||
"rates_15min": {
|
||||
"warc_bytes_per_sec": 0.0,
|
||||
"urls_per_sec": 0.0,
|
||||
"actual_elapsed": 876.9885368347168
|
||||
},
|
||||
"unaccepted_requests": 0,
|
||||
"urls_processed": 0,
|
||||
"pid": 18841,
|
||||
"address": "127.0.0.1",
|
||||
"rates_1min": {
|
||||
"warc_bytes_per_sec": 0.0,
|
||||
"urls_per_sec": 0.0,
|
||||
"actual_elapsed": 54.92501664161682
|
||||
},
|
||||
"start_time": 1526690353.4060142
|
||||
}
|
||||
|
||||
``WARCPROX_WRITE_RECORD`` http method
|
||||
=====================================
|
||||
|
||||
To make warcprox write an arbitrary warc record you can send it a special
|
||||
request with http method ``WARCPROX_WRITE_RECORD``. The http request must
|
||||
include the headers ``WARC-Type``, ``Content-Type``, and ``Content-Length``.
|
||||
Warcprox will use these to populate the warc record. For example::
|
||||
|
||||
$ ncat --crlf 127.0.0.1 8000 <<EOF
|
||||
> WARCPROX_WRITE_RECORD special://url/some?thing HTTP/1.1
|
||||
> WARC-Type: resource
|
||||
> Content-type: text/plain;charset=utf-8
|
||||
> Content-length: 29
|
||||
>
|
||||
> i am a warc record payload!
|
||||
> EOF
|
||||
HTTP/1.0 204 OK
|
||||
Server: BaseHTTP/0.6 Python/3.6.3
|
||||
Date: Tue, 22 May 2018 19:21:02 GMT
|
||||
|
||||
On success warcprox responds with http status 204. For the request above
|
||||
warcprox will write a warc record that looks like this::
|
||||
|
||||
WARC/1.0
|
||||
WARC-Type: resource
|
||||
WARC-Record-ID: <urn:uuid:d0e10852-b18c-4037-a99e-f41915fec5b5>
|
||||
WARC-Date: 2018-05-21T23:33:31Z
|
||||
WARC-Target-URI: special://url/some?thing
|
||||
WARC-Block-Digest: sha1:a282cfe127ab8d51b315ff3d31de18614979d0df
|
||||
WARC-Payload-Digest: sha1:a282cfe127ab8d51b315ff3d31de18614979d0df
|
||||
Content-Type: text/plain;charset=utf-8
|
||||
Content-Length: 29
|
||||
|
||||
i am a warc record payload!
|
||||
|
||||
``Warcprox-Meta`` http request header
|
||||
=====================================
|
||||
|
||||
``Warcprox-Meta`` is a special http request header that can be used to pass
|
||||
configuration information and metadata with each proxy request to warcprox. The
|
||||
value is a json blob. There are several fields understood by warcprox, and
|
||||
arbitrary additional fields can be included. If warcprox doesn't recognize a
|
||||
field it simply ignores it. Custom fields may be useful for custom warcprox
|
||||
plugins (see `<readme.rst#plugins>`_).
|
||||
|
||||
Warcprox strips the ``warcprox-meta`` header out before sending the request to
|
||||
remote server, and does not write it in the warc request record.
|
||||
|
||||
Brozzler knows about ``warcprox-meta``. For information on configuring
|
||||
it in brozzler, see
|
||||
https://github.com/internetarchive/brozzler/blob/master/job-conf.rst#warcprox-meta.
|
||||
``Warcprox-Meta`` is often a very important part of brozzler job configuration.
|
||||
It is the way url and data limits on jobs, seeds, and hosts are implemented,
|
||||
among other things.
|
||||
|
||||
Warcprox-Meta fields
|
||||
--------------------
|
||||
|
||||
``warc-prefix`` (string)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Specifies a warc filename prefix. Warcprox will write the warc record for this
|
||||
capture, if any, to a warc named accordingly.
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"warc-prefix": "special-warc"}
|
||||
|
||||
``dedup-bucket`` (string)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Specifies the deduplication bucket. For more information about deduplication
|
||||
see `<readme.rst#deduplication>`_.
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"}
|
||||
|
||||
``blocks`` (list)
|
||||
~~~~~~~~~~~~~~~~~
|
||||
List of url match rules. Url match rules are somewhat described at
|
||||
https://github.com/internetarchive/brozzler/blob/master/job-conf.rst#scoping
|
||||
and https://github.com/iipc/urlcanon/blob/e2ab3524e/python/urlcanon/rules.py#L70.
|
||||
(TODO: write a better doc and link to it)
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"blocks": [{"ssurt": "com,example,//http:/"}, {"domain": "malware.us", "substring": "wp-login.php?action=logout"}]}
|
||||
|
||||
If any of the rules match the url being requested, warcprox aborts normal
|
||||
processing and responds with a http ``403``. The http response includes
|
||||
a ``Warcprox-Meta`` response header with one field, ``blocked-by-rule``,
|
||||
which reproduces the value of the match rule that resulted in the block. The
|
||||
presence of the ``warcprox-meta`` response header can be used by the client to
|
||||
distinguish this type of a response from a 403 from the remote site.
|
||||
|
||||
An example::
|
||||
|
||||
$ curl -iksS --proxy localhost:8000 --header 'Warcprox-Meta: {"blocks": [{"ssurt": "com,example,//http:/"}, {"domain": "malware.us", "substring": "wp-login.php?action=logout"}]}' http://example.com/foo
|
||||
HTTP/1.0 403 Forbidden
|
||||
Server: BaseHTTP/0.6 Python/3.6.3
|
||||
Date: Fri, 25 May 2018 22:46:42 GMT
|
||||
Content-Type: text/plain;charset=utf-8
|
||||
Connection: close
|
||||
Content-Length: 111
|
||||
Warcprox-Meta: {"blocked-by-rule":{"ssurt":"com,example,//http:/"}}
|
||||
|
||||
request rejected by warcprox: blocked by rule found in Warcprox-Meta header: {"ssurt": "com,example,//http:/"}
|
||||
|
||||
You might be wondering why ``blocks`` is necessary. Why would the warcprox
|
||||
client make a request that it should already know will be blocked by the proxy?
|
||||
The answer is that the request may be initiated somewhere where it's difficult
|
||||
to evaluate the block rules. In particular, this circumstance prevails when the
|
||||
browser controlled by brozzler is requesting images, javascript, css, and so
|
||||
on, embedded in a page.
|
||||
|
||||
``stats`` (dictionary)
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
``stats`` is a dictionary with only one field understood by warcprox,
|
||||
``buckets``. The value of ``buckets`` is a list of strings and/or
|
||||
dictionaries. A string signifies the name of the bucket; a dictionary is
|
||||
expected to have at least an item with key ``bucket`` whose value is the name
|
||||
of the bucket. The other currently recognized key is ``tally-domains``, which
|
||||
if supplied should be a list of domains. This instructs warcprox to
|
||||
additionally tally substats of the given bucket by domain.
|
||||
|
||||
See `<readme.rst#statistics>`_ for more information on statistics kept by
|
||||
warcprox.
|
||||
|
||||
Examples::
|
||||
|
||||
Warcprox-Meta: {"stats":{"buckets":["my-stats-bucket","all-the-stats"]}}
|
||||
Warcprox-Meta: {"stats":{"buckets":["bucket1",{"bucket":"bucket2","tally-domains":["foo.bar.com","192.168.10.20"}]}}
|
||||
|
||||
Domain stats are stored in the stats table under the key
|
||||
``"bucket2:foo.bar.com"`` for the latter example. See the following two
|
||||
sections for more examples. The ``soft-limits`` section has an example of a
|
||||
limit on a domain specified in ``tally-domains``.
|
||||
|
||||
``limits`` (dictionary)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Specifies quantitative limits for warcprox to enforce. The structure of the
|
||||
dictionary is ``{stats_key: numerical_limit, ...}`` where stats key has the
|
||||
format ``"bucket/sub-bucket/statistic"``. See `readme.rst#statistics`_ for
|
||||
further explanation of what "bucket", "sub-bucket", and "statistic" mean here.
|
||||
|
||||
If processing a request would result in exceeding a limit, warcprox aborts
|
||||
normal processing and responds with a http ``420 Reached Limit``. The http
|
||||
response includes a ``Warcprox-Meta`` response header with the complete set
|
||||
of statistics for the bucket whose limit has been reached.
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"stats": {"buckets": ["test_limits_bucket"]}, "limits": {"test_limits_bucket/total/urls": 10}}
|
||||
|
||||
::
|
||||
|
||||
$ curl -iksS --proxy localhost:8000 --header 'Warcprox-Meta: {"stats": {"buckets": ["test_limits_bucket"]}, "limits": {"test_limits_bucket/total/urls": 10}}' http://example.com/foo
|
||||
HTTP/1.0 420 Reached limit
|
||||
Server: BaseHTTP/0.6 Python/3.6.3
|
||||
Date: Fri, 25 May 2018 23:08:32 GMT
|
||||
Content-Type: text/plain;charset=utf-8
|
||||
Connection: close
|
||||
Content-Length: 77
|
||||
Warcprox-Meta: {"stats":{"test_limits_bucket":{"bucket":"test_limits_bucket","total":{"urls":10,"wire_bytes":15840},"new":{"urls":0,"wire_bytes":0},"revisit":{"urls":10,"wire_bytes":15840}}},"reached-limit":{"test_limits_bucket/total/urls":10}}
|
||||
|
||||
request rejected by warcprox: reached limit test_limits_bucket/total/urls=10
|
||||
|
||||
``soft-limits`` (dictionary)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
From warcprox's perspective ``soft-limits`` work almost exactly the same way
|
||||
as ``limits``. The only difference is that when a soft limit is hit, warcprox
|
||||
response with an http ``430 Reached soft limit`` instead of http ``420``.
|
||||
|
||||
Warcprox clients might treat a ``430`` very differently from a ``420``. From
|
||||
brozzler's perspective, for instance, ``soft-limits`` are very different from
|
||||
``limits``. When brozzler receives a ``420`` from warcprox because a ``limit``
|
||||
has been reached, this means that crawling for that seed is finished, and
|
||||
brozzler sets about finalizing the crawl of that seed. On the other hand,
|
||||
brozzler blissfully ignores ``430`` responses, because soft limits only apply
|
||||
to a particular bucket (like a domain), and don't have any effect on crawling
|
||||
of urls that don't fall in that bucket.
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"stats": {"buckets": [{"bucket": "test_domain_doc_limit_bucket", "tally-domains": ["foo.localhost"]}]}, "soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls": 10}}
|
||||
|
||||
::
|
||||
|
||||
$ curl -iksS --proxy localhost:8000 --header 'Warcprox-Meta: {"stats": {"buckets": ["test_limits_bucket"]}, "soft-limits": {"test_limits_bucket/total/urls": 10}}' http://example.com/foo
|
||||
HTTP/1.0 430 Reached soft limit
|
||||
Server: BaseHTTP/0.6 Python/3.6.3
|
||||
Date: Fri, 25 May 2018 23:12:06 GMT
|
||||
Content-Type: text/plain;charset=utf-8
|
||||
Connection: close
|
||||
Content-Length: 82
|
||||
Warcprox-Meta: {"stats":{"test_limits_bucket":{"bucket":"test_limits_bucket","total":{"urls":10,"wire_bytes":15840},"new":{"urls":0,"wire_bytes":0},"revisit":{"urls":10,"wire_bytes":15840}}},"reached-soft-limit":{"test_limits_bucket/total/urls":10}}
|
||||
|
||||
request rejected by warcprox: reached soft limit test_limits_bucket/total/urls=10
|
||||
|
||||
``metadata`` (dictionary)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
An arbitrary dictionary. Warcprox mostly ignores this. The one exception is
|
||||
that if it has a ``seed`` entry and crawl logs are enabled via the
|
||||
``--crawl-log-dir`` command line option, the value of ``seed`` is written to
|
||||
the crawl log as the 11th field on the line, simulating heritrix's "source
|
||||
tag".
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"metadata": {"seed": "http://example.com/seed", "description": "here's some information about this crawl job. blah blah"}
|
||||
|
||||
``accept`` (list)
|
||||
~~~~~~~~~~~~~~~~~
|
||||
Specifies fields that the client would like to receive in the ``Warcprox-Meta``
|
||||
response header. Only one value is currently understood,
|
||||
``capture-metadata``.
|
||||
|
||||
Example::
|
||||
|
||||
Warcprox-Meta: {"accept": ["capture-metadata"]}
|
||||
|
||||
The response will include a ``Warcprox-Meta`` response header with one field
|
||||
also called ``captured-metadata``. Currently warcprox reports one piece of
|
||||
capture medata, ``timestamp``, which represents the time fetch began for the
|
||||
resource and matches the ``WARC-Date`` written to the warc record. For
|
||||
example::
|
||||
|
||||
Warcprox-Meta: {"capture-metadata":{"timestamp":"2018-05-30T00:22:49Z"}}
|
||||
|
||||
``Warcprox-Meta`` http response header
|
||||
======================================
|
||||
In some cases warcprox will add a ``Warcprox-Meta`` header to the http response
|
||||
that it sends to the client. As with the request header, the value is a json
|
||||
blob. It is only included if something in the ``warcprox-meta`` request header
|
||||
calls for it. Those cases are described above in the `Warcprox-Meta http
|
||||
request header`_ section.
|
||||
|
173
readme.rst
Normal file
173
readme.rst
Normal file
@ -0,0 +1,173 @@
|
||||
Warcprox - WARC writing MITM HTTP/S proxy
|
||||
*****************************************
|
||||
.. image:: https://travis-ci.org/internetarchive/warcprox.svg?branch=master
|
||||
:target: https://travis-ci.org/internetarchive/warcprox
|
||||
|
||||
Warcprox is a tool for archiving the web. It is an http proxy that stores its
|
||||
traffic to disk in `WARC
|
||||
<https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/>`_
|
||||
format. Warcprox captures encrypted https traffic by using the
|
||||
`"man-in-the-middle" <https://en.wikipedia.org/wiki/Man-in-the-middle_attack>`_
|
||||
technique (see the `Man-in-the-middle`_ section for more info).
|
||||
|
||||
The web pages that warcprox stores in WARC files can be played back using
|
||||
software like `OpenWayback <https://github.com/iipc/openwayback>`_ or `pywb
|
||||
<https://github.com/webrecorder/pywb>`_. Warcprox has been developed in
|
||||
parallel with `brozzler <https://github.com/internetarchive/brozzler>`_ and
|
||||
together they make a comprehensive modern distributed archival web crawling
|
||||
system.
|
||||
|
||||
Warcprox was originally based on the excellent and simple pymiproxy by Nadeem
|
||||
Douba. https://github.com/allfro/pymiproxy
|
||||
|
||||
.. contents::
|
||||
|
||||
Getting started
|
||||
===============
|
||||
Warcprox runs on python 3.4+.
|
||||
|
||||
To install latest release run::
|
||||
|
||||
# apt-get install libffi-dev libssl-dev
|
||||
pip install warcprox
|
||||
|
||||
You can also install the latest bleeding edge code::
|
||||
|
||||
pip install git+https://github.com/internetarchive/warcprox.git
|
||||
|
||||
To start warcprox run::
|
||||
|
||||
warcprox
|
||||
|
||||
Try ``warcprox --help`` for documentation on command line options.
|
||||
|
||||
Man-in-the-middle
|
||||
=================
|
||||
Normally, http proxies can't read https traffic, because it's encrypted. The
|
||||
browser uses the http ``CONNECT`` method to establish a tunnel through the
|
||||
proxy, and the proxy merely routes raw bytes between the client and server.
|
||||
Since the bytes are encrypted, the proxy can't make sense of the information
|
||||
it's proxying. This nonsensical encrypted data would not be very useful to
|
||||
archive.
|
||||
|
||||
In order to capture https traffic, warcprox acts as a "man-in-the-middle"
|
||||
(MITM). When it receives a ``CONNECT`` directive from a client, it generates a
|
||||
public key certificate for the requested site, presents to the client, and
|
||||
proceeds to establish an encrypted connection with the client. Then it makes a
|
||||
separate, normal https connection to the remote site. It decrypts, archives,
|
||||
and re-encrypts traffic in both directions.
|
||||
|
||||
Although "man-in-the-middle" is often paired with "attack", there is nothing
|
||||
malicious about what warcprox is doing. If you configure an instance of
|
||||
warcprox as your browser's http proxy, you will see lots of certificate
|
||||
warnings, since none of the certificates will be signed by trusted authorities.
|
||||
To use warcprox effectively the client needs to disable certificate
|
||||
verification, or add the CA cert generated by warcprox as a trusted authority.
|
||||
(If you do this in your browser, make sure you undo it when you're done using
|
||||
warcprox!)
|
||||
|
||||
API
|
||||
===
|
||||
For interacting with a running instance of warcprox.
|
||||
|
||||
* ``/status`` url
|
||||
* ``WARCPROX_WRITE_RECORD`` http method
|
||||
* ``Warcprox-Meta`` http request header and response header
|
||||
|
||||
See `<api.rst>`_.
|
||||
|
||||
Deduplication
|
||||
=============
|
||||
Warcprox avoids archiving redundant content by "deduplicating" it. The process
|
||||
for deduplication works similarly to heritrix and other web archiving tools.
|
||||
|
||||
1. while fetching url, calculate payload content digest (typically sha1)
|
||||
2. look up digest in deduplication database (warcprox supports a few different
|
||||
ones)
|
||||
3. if found, write warc ``revisit`` record referencing the url and capture time
|
||||
of the previous capture
|
||||
4. else (if not found),
|
||||
|
||||
a. write warc ``response`` record with full payload
|
||||
b. store entry in deduplication database
|
||||
|
||||
The dedup database is partitioned into different "buckets". Urls are
|
||||
deduplicated only against other captures in the same bucket. If specified, the
|
||||
``dedup-bucket`` field of the ``Warcprox-Meta`` http request header determines
|
||||
the bucket, otherwise the default bucket is used.
|
||||
|
||||
Deduplication can be disabled entirely by starting warcprox with the argument
|
||||
``--dedup-db-file=/dev/null``.
|
||||
|
||||
Statistics
|
||||
==========
|
||||
Warcprox keeps some crawl statistics and stores them in sqlite or rethinkdb.
|
||||
These are consulted for enforcing ``limits`` and ``soft-limits`` (see
|
||||
`<api.rst#warcprox-meta-fields>`_), and can also be consulted by other
|
||||
processes outside of warcprox, for reporting etc.
|
||||
|
||||
Statistics are grouped by "bucket". Every capture is counted as part of the
|
||||
``__all__`` bucket. Other buckets can be specified in the ``Warcprox-Meta``
|
||||
request header. The fallback bucket in case none is specified is called
|
||||
``__unspecified__``.
|
||||
|
||||
Within each bucket are three sub-buckets:
|
||||
|
||||
* ``new`` - tallies captures for which a complete record (usually a ``response``
|
||||
record) was written to warc
|
||||
* ``revisit`` - tallies captures for which a ``revisit`` record was written to
|
||||
warc
|
||||
* ``total`` - includes all urls processed, even those not written to warc (so the
|
||||
numbers may be greater than new + revisit)
|
||||
|
||||
Within each of these sub-buckets we keep two statistics:
|
||||
|
||||
* ``urls`` - simple count of urls
|
||||
* ``wire_bytes`` - sum of bytes received over the wire, including http headers,
|
||||
from the remote server for each url
|
||||
|
||||
For historical reasons, in sqlite, the default store, statistics are kept as
|
||||
json blobs::
|
||||
|
||||
sqlite> select * from buckets_of_stats;
|
||||
bucket stats
|
||||
--------------- ---------------------------------------------------------------------------------------------
|
||||
__unspecified__ {"bucket":"__unspecified__","total":{"urls":37,"wire_bytes":1502781},"new":{"urls":15,"wire_bytes":1179906},"revisit":{"urls":22,"wire_bytes":322875}}
|
||||
__all__ {"bucket":"__all__","total":{"urls":37,"wire_bytes":1502781},"new":{"urls":15,"wire_bytes":1179906},"revisit":{"urls":22,"wire_bytes":322875}}
|
||||
|
||||
Plugins
|
||||
=======
|
||||
Warcprox supports a limited notion of plugins by way of the ``--plugin``
|
||||
command line argument. Plugin classes are loaded from the regular python module
|
||||
search path. They will be instantiated with one argument, a
|
||||
``warcprox.Options``, which holds the values of all the command line arguments.
|
||||
Legacy plugins with constructors that take no arguments are also supported.
|
||||
Plugins should either have a method ``notify(self, recorded_url, records)`` or
|
||||
should subclass ``warcprox.BasePostfetchProcessor``. More than one plugin can
|
||||
be configured by specifying ``--plugin`` multiples times.
|
||||
|
||||
`A minimal example <https://github.com/internetarchive/warcprox/blob/318405e795ac0ab8760988a1a482cf0a17697148/warcprox/__init__.py#L165>`__
|
||||
|
||||
License
|
||||
=======
|
||||
|
||||
Warcprox is a derivative work of pymiproxy, which is GPL. Thus warcprox is also
|
||||
GPL.
|
||||
|
||||
* Copyright (C) 2012 Cygnos Corporation
|
||||
* Copyright (C) 2013-2018 Internet Archive
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; either version 2
|
||||
of the License, or (at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
4
setup.py
4
setup.py
@ -40,12 +40,12 @@ except:
|
||||
|
||||
setuptools.setup(
|
||||
name='warcprox',
|
||||
version='2.4b2.dev170',
|
||||
version='2.4b2.dev176',
|
||||
description='WARC writing MITM HTTP/S proxy',
|
||||
url='https://github.com/internetarchive/warcprox',
|
||||
author='Noah Levitt',
|
||||
author_email='nlevitt@archive.org',
|
||||
long_description=open('README.rst').read(),
|
||||
long_description=open('readme.rst').read(),
|
||||
license='GPL',
|
||||
packages=['warcprox'],
|
||||
install_requires=deps,
|
||||
|
@ -75,11 +75,13 @@ RUN mv -v /etc/hadoop/conf/hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml.orig \
|
||||
RUN echo '#!/bin/bash\nservice hadoop-hdfs-namenode start\nservice hadoop-hdfs-datanode start' > /etc/my_init.d/50_start_hdfs.sh \
|
||||
&& chmod a+x /etc/my_init.d/50_start_hdfs.sh
|
||||
|
||||
RUN apt-get install -y libsqlite3-dev
|
||||
|
||||
# trough itself
|
||||
RUN virtualenv -p python3 /opt/trough-ve3 \
|
||||
&& . /opt/trough-ve3/bin/activate \
|
||||
&& pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string \
|
||||
&& pip install git+https://github.com/nlevitt/trough.git@toward-warcprox-dedup
|
||||
&& pip install git+https://github.com/internetarchive/trough.git
|
||||
|
||||
RUN mkdir -vp /etc/service/trough-sync-local \
|
||||
&& echo "#!/bin/bash\nsource /opt/trough-ve3/bin/activate\nexec sync.py >>/tmp/trough-sync-local.out 2>&1" > /etc/service/trough-sync-local/run \
|
||||
@ -97,11 +99,11 @@ RUN mkdir -vp /etc/service/trough-write \
|
||||
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6222 --master --processes=2 --harakiri=240 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/writer.py >>/tmp/trough-write.out 2>&1' > /etc/service/trough-write/run \
|
||||
&& chmod a+x /etc/service/trough-write/run
|
||||
|
||||
RUN mkdir -vp /etc/service/trough-write-provisioner-local \
|
||||
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_local.py >>/tmp/trough-write-provisioner-local.out 2>&1' > /etc/service/trough-write-provisioner-local/run \
|
||||
&& chmod a+x /etc/service/trough-write-provisioner-local/run
|
||||
RUN mkdir -vp /etc/service/trough-segment-manager-local \
|
||||
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6112 --master --processes=2 --harakiri=7200 --http-timeout=7200 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:local >>/tmp/trough-segment-manager-local.out 2>&1' > /etc/service/trough-segment-manager-local/run \
|
||||
&& chmod a+x /etc/service/trough-segment-manager-local/run
|
||||
|
||||
RUN mkdir -vp /etc/service/trough-write-provisioner-server \
|
||||
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=20 --max-requests=50000 --vacuum --die-on-term --wsgi-file $venv/bin/write_provisioner_server.py >>/tmp/trough-write-provisioner-server.out 2>&1' > /etc/service/trough-write-provisioner-server/run \
|
||||
&& chmod a+x /etc/service/trough-write-provisioner-server/run
|
||||
RUN mkdir -vp /etc/service/trough-segment-manager-server \
|
||||
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=7200 --http-timeout=7200 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:server >>/tmp/trough-segment-manager-server.out 2>&1' > /etc/service/trough-segment-manager-server/run \
|
||||
&& chmod a+x /etc/service/trough-segment-manager-server/run
|
||||
|
||||
|
@ -31,18 +31,15 @@ script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
docker build -t internetarchive/warcprox-tests $script_dir
|
||||
|
||||
for python in python3 python2.7
|
||||
do
|
||||
docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \
|
||||
bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \
|
||||
&& (cd /warcprox && git diff HEAD) | patch -p1 \
|
||||
&& virtualenv -p $python /tmp/venv \
|
||||
&& source /tmp/venv/bin/activate \
|
||||
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
|
||||
&& py.test -v tests \
|
||||
&& py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
|
||||
&& py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \
|
||||
&& py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests \
|
||||
"
|
||||
done
|
||||
docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \
|
||||
bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \
|
||||
&& (cd /warcprox && git diff HEAD) | patch -p1 \
|
||||
&& virtualenv -p python3 /tmp/venv \
|
||||
&& source /tmp/venv/bin/activate \
|
||||
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
|
||||
&& py.test -v tests \
|
||||
&& py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
|
||||
&& py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \
|
||||
&& py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests \
|
||||
"
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
#
|
||||
|
||||
pip install git+https://github.com/jkafader/snakebite@feature/python3-version-string
|
||||
pip install git+https://github.com/internetarchive/trough.git@toward-warcprox-dedup
|
||||
pip install git+https://github.com/internetarchive/trough.git
|
||||
|
||||
mkdir /etc/trough
|
||||
|
||||
|
@ -709,6 +709,7 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
|
||||
# wait for postfetch chain
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 10)
|
||||
|
||||
# next fetch hits the limit
|
||||
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||
assert response.status_code == 420
|
||||
assert response.reason == "Reached limit"
|
||||
@ -717,6 +718,17 @@ def test_limits(http_daemon, warcprox_, archiving_proxies):
|
||||
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||
assert response.raw.data == b"request rejected by warcprox: reached limit test_limits_bucket/total/urls=10\n"
|
||||
|
||||
# make sure limit doesn't get applied to a different stats bucket
|
||||
request_meta = {"stats":{"buckets":["no_limits_bucket"]},"limits":{"test_limits_bucket/total/urls":10}}
|
||||
headers = {"Warcprox-Meta": json.dumps(request_meta)}
|
||||
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||
assert response.status_code == 200
|
||||
assert response.headers['warcprox-test-header'] == 'i!'
|
||||
assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
|
||||
|
||||
# wait for postfetch chain
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 11)
|
||||
|
||||
def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
|
||||
@ -726,14 +738,16 @@ def test_return_capture_timestamp(http_daemon, warcprox_, archiving_proxies):
|
||||
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||
assert response.status_code == 200
|
||||
assert response.headers['Warcprox-Meta']
|
||||
data = json.loads(response.headers['Warcprox-Meta'])
|
||||
assert data['capture-metadata']
|
||||
response_meta = json.loads(response.headers['Warcprox-Meta'])
|
||||
assert response_meta['capture-metadata']
|
||||
try:
|
||||
dt = datetime.datetime.strptime(data['capture-metadata']['timestamp'],
|
||||
dt = datetime.datetime.strptime(response_meta['capture-metadata']['timestamp'],
|
||||
'%Y-%m-%dT%H:%M:%SZ')
|
||||
assert dt
|
||||
except ValueError:
|
||||
pytest.fail('Invalid capture-timestamp format %s', data['capture-timestamp'])
|
||||
pytest.fail(
|
||||
'Invalid http response warcprox-meta["capture-metadata"]["timestamp"]: %r',
|
||||
meta['capture-metadata']['timestamp'])
|
||||
|
||||
# wait for postfetch chain (or subsequent test could fail)
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
|
||||
@ -997,6 +1011,7 @@ def test_domain_doc_soft_limit(
|
||||
http_daemon, https_daemon, warcprox_, archiving_proxies):
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
|
||||
# ** comment is obsolete (server is multithreaded) but still useful **
|
||||
# we need to clear the connection pool here because
|
||||
# - connection pool already may already have an open connection localhost
|
||||
# - we're about to make a connection to foo.localhost
|
||||
@ -1132,6 +1147,23 @@ def test_domain_doc_soft_limit(
|
||||
assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||
assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_doc_limit_bucket:foo.localhost/total/urls=10\n"
|
||||
|
||||
# make sure soft limit doesn't get applied to a different stats bucket
|
||||
request_meta = {
|
||||
"stats": {"buckets": [{"bucket":"no_limit_bucket","tally-domains":["foo.localhost"]}]},
|
||||
"soft-limits": {"test_domain_doc_limit_bucket:foo.localhost/total/urls":10},
|
||||
}
|
||||
headers = {"Warcprox-Meta": json.dumps(request_meta)}
|
||||
url = 'http://zuh.foo.localhost:{}/o/p'.format(http_daemon.server_port)
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, headers=headers, stream=True,
|
||||
verify=False)
|
||||
assert response.status_code == 200
|
||||
assert response.headers['warcprox-test-header'] == 'o!'
|
||||
assert response.content == b'I am the warcprox test payload! pppppppppp!\n'
|
||||
|
||||
# wait for postfetch chain
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 22)
|
||||
|
||||
def test_domain_data_soft_limit(
|
||||
http_daemon, https_daemon, warcprox_, archiving_proxies):
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
@ -1226,6 +1258,22 @@ def test_domain_data_soft_limit(
|
||||
### assert response.headers["content-type"] == "text/plain;charset=utf-8"
|
||||
### assert response.raw.data == b"request rejected by warcprox: reached soft limit test_domain_data_limit_bucket:xn--zz-2ka.localhost/new/wire_bytes=200\n"
|
||||
|
||||
# make sure soft limit doesn't get applied to a different stats bucket
|
||||
request_meta = {
|
||||
"stats": {"buckets": [{"bucket":"no_limit_bucket","tally-domains":['ÞzZ.LOCALhost']}]},
|
||||
"soft-limits": {"test_domain_data_limit_bucket:ÞZZ.localhost/new/wire_bytes":200},
|
||||
}
|
||||
headers = {"Warcprox-Meta": json.dumps(request_meta)}
|
||||
url = 'http://ÞZz.localhost:{}/y/z'.format(http_daemon.server_port)
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, headers=headers, stream=True)
|
||||
assert response.status_code == 200
|
||||
assert response.headers['warcprox-test-header'] == 'y!'
|
||||
assert response.content == b'I am the warcprox test payload! zzzzzzzzzz!\n'
|
||||
|
||||
# wait for postfetch chain
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 5)
|
||||
|
||||
# XXX this test relies on a tor proxy running at localhost:9050 with a working
|
||||
# connection to the internet, and relies on a third party site (facebook) being
|
||||
# up and behaving a certain way
|
||||
@ -1838,6 +1886,8 @@ def test_socket_timeout_response(
|
||||
"""Response will timeout because we use --socket-timeout=4 whereas the
|
||||
target URL will return after 6 sec.
|
||||
"""
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
|
||||
url = 'http://localhost:%s/slow-response' % http_daemon.server_port
|
||||
response = requests.get(url, proxies=archiving_proxies, verify=False)
|
||||
assert response.status_code == 502
|
||||
@ -1850,6 +1900,8 @@ def test_socket_timeout_response(
|
||||
assert response.status_code == 404
|
||||
assert response.content == b'404 Not Found\n'
|
||||
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
|
||||
|
||||
def test_empty_response(
|
||||
warcprox_, http_daemon, https_daemon, archiving_proxies,
|
||||
playback_proxies):
|
||||
@ -1941,7 +1993,7 @@ def test_trough_segment_promotion(warcprox_):
|
||||
time.sleep(3)
|
||||
assert promoted == []
|
||||
|
||||
def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_proxies):
|
||||
def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies):
|
||||
"""We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we
|
||||
try to download content smaller than these limits to make sure that it is
|
||||
not deduplicated. We create the digest_str with the following code:
|
||||
@ -1951,36 +2003,155 @@ def test_dedup_min_size(http_daemon, warcprox_, archiving_proxies, playback_prox
|
||||
warcprox.digest_str(payload_digest)
|
||||
```
|
||||
"""
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
|
||||
# start a fresh warc
|
||||
warcprox_.warc_writer_processor.writer_pool.close_writers()
|
||||
|
||||
# fetch small text
|
||||
url = 'http://localhost:%s/text-2bytes' % http_daemon.server_port
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, verify=False, timeout=10)
|
||||
assert len(response.content) == 2
|
||||
# wait for postfetch chain
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
|
||||
# check no dedup was saved (except RethinkCapturesDedup which always saves)
|
||||
dedup_lookup = warcprox_.dedup_db.lookup(
|
||||
b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37')
|
||||
assert dedup_lookup is None
|
||||
time.sleep(3)
|
||||
if not isinstance(warcprox_.dedup_db, warcprox.bigtable.RethinkCapturesDedup):
|
||||
assert dedup_lookup is None
|
||||
|
||||
# fetch again saving dedup info so that we can test dedup info ignored
|
||||
orig_should_dedup = warcprox_.dedup_db.should_dedup
|
||||
warcprox_.dedup_db.should_dedup = lambda *args, **kwargs: True
|
||||
try:
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, verify=False, timeout=10)
|
||||
assert len(response.content) == 2
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
|
||||
# check dedup was saved
|
||||
dedup_lookup = warcprox_.dedup_db.lookup(
|
||||
b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37')
|
||||
assert dedup_lookup
|
||||
finally:
|
||||
warcprox_.dedup_db.should_dedup = orig_should_dedup
|
||||
else:
|
||||
assert dedup_lookup
|
||||
|
||||
# fetch again and check that it was not deduped
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, verify=False, timeout=10)
|
||||
dedup_lookup = warcprox_.dedup_db.lookup(
|
||||
b'sha1:e0c9035898dd52fc65c41454cec9c4d2611bfb37')
|
||||
# This would return dedup data if payload_size > dedup-min-text-size
|
||||
assert dedup_lookup is None
|
||||
assert len(response.content) == 2
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
|
||||
|
||||
# check that response records were written
|
||||
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
|
||||
with open(warc, 'rb') as f:
|
||||
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'warcinfo'
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'response'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'request'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'response'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'request'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
if not isinstance(warcprox_.dedup_db, warcprox.bigtable.RethinkCapturesDedup):
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'response'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'request'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
with pytest.raises(StopIteration):
|
||||
next(rec_iter)
|
||||
|
||||
def test_dedup_min_binary_size(http_daemon, warcprox_, archiving_proxies):
|
||||
"""We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we
|
||||
try to download content smaller than these limits to make sure that it is
|
||||
not deduplicated. We create the digest_str with the following code:
|
||||
```
|
||||
payload_digest = hashlib.new('sha1')
|
||||
payload_digest.update(b'aa')
|
||||
warcprox.digest_str(payload_digest)
|
||||
```
|
||||
"""
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
|
||||
# start a fresh warc
|
||||
warcprox_.warc_writer_processor.writer_pool.close_writers()
|
||||
|
||||
# fetch small binary
|
||||
url = 'http://localhost:%s/binary-4bytes' % http_daemon.server_port
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, verify=False, timeout=10)
|
||||
assert len(response.content) == 4
|
||||
# wait for postfetch chain
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
|
||||
# check no dedup was saved (except RethinkCapturesDedup which always saves)
|
||||
dedup_lookup = warcprox_.dedup_db.lookup(
|
||||
b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79')
|
||||
assert dedup_lookup is None
|
||||
time.sleep(3)
|
||||
if not isinstance(warcprox_.dedup_db, warcprox.bigtable.RethinkCapturesDedup):
|
||||
assert dedup_lookup is None
|
||||
|
||||
# fetch again saving dedup info so that we can test dedup info ignored
|
||||
orig_should_dedup = warcprox_.dedup_db.should_dedup
|
||||
warcprox_.dedup_db.should_dedup = lambda *args, **kwargs: True
|
||||
try:
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, verify=False, timeout=10)
|
||||
assert len(response.content) == 4
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
|
||||
# check dedup was saved
|
||||
dedup_lookup = warcprox_.dedup_db.lookup(
|
||||
b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79')
|
||||
assert dedup_lookup
|
||||
finally:
|
||||
warcprox_.dedup_db.should_dedup = orig_should_dedup
|
||||
else:
|
||||
assert dedup_lookup
|
||||
|
||||
# fetch again and check that it was not deduped
|
||||
urls_before = warcprox_.proxy.running_stats.urls
|
||||
response = requests.get(
|
||||
url, proxies=archiving_proxies, verify=False, timeout=10)
|
||||
dedup_lookup = warcprox_.dedup_db.lookup(
|
||||
b'sha1:70c881d4a26984ddce795f6f71817c9cf4480e79')
|
||||
# This would return dedup data if payload_size > dedup-min-binary-size
|
||||
assert dedup_lookup is None
|
||||
assert len(response.content) == 4
|
||||
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
|
||||
|
||||
# check that response records were written
|
||||
warc = warcprox_.warc_writer_processor.writer_pool.default_warc_writer._available_warcs.queue[0].path
|
||||
with open(warc, 'rb') as f:
|
||||
rec_iter = iter(warcio.archiveiterator.ArchiveIterator(f))
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'warcinfo'
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'response'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'request'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'response'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'request'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
if not isinstance(warcprox_.dedup_db, warcprox.bigtable.RethinkCapturesDedup):
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'response'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
record = next(rec_iter)
|
||||
assert record.rec_type == 'request'
|
||||
assert record.rec_headers.get_header('warc-target-uri') == url
|
||||
with pytest.raises(StopIteration):
|
||||
next(rec_iter)
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main()
|
||||
|
@ -122,14 +122,19 @@ class BasePostfetchProcessor(threading.Thread):
|
||||
self.profiler = None
|
||||
|
||||
def run(self):
|
||||
if self.options.profile:
|
||||
import cProfile
|
||||
self.profiler = cProfile.Profile()
|
||||
self.profiler.enable()
|
||||
self._run()
|
||||
self.profiler.disable()
|
||||
else:
|
||||
self._run()
|
||||
try:
|
||||
if self.options.profile:
|
||||
import cProfile
|
||||
self.profiler = cProfile.Profile()
|
||||
self.profiler.enable()
|
||||
self._run()
|
||||
self.profiler.disable()
|
||||
else:
|
||||
self._run()
|
||||
except:
|
||||
self.logger.critical(
|
||||
'%s dying due to uncaught exception',
|
||||
self.name, exc_info=True)
|
||||
|
||||
def _get_process_put(self):
|
||||
'''
|
||||
|
@ -253,6 +253,4 @@ class RethinkCapturesDedup(warcprox.dedup.DedupDb, DedupableMixin):
|
||||
self.captures_db.close()
|
||||
|
||||
def notify(self, recorded_url, records):
|
||||
if (records and records[0].type == b'response'
|
||||
and self.should_dedup(recorded_url)):
|
||||
self.captures_db.notify(recorded_url, records)
|
||||
self.captures_db.notify(recorded_url, records)
|
||||
|
@ -39,9 +39,9 @@ urllib3.disable_warnings()
|
||||
|
||||
class DedupableMixin(object):
|
||||
def __init__(self, options=warcprox.Options()):
|
||||
self.min_text_size = options.dedup_min_text_size
|
||||
self.min_binary_size = options.dedup_min_binary_size
|
||||
self.dedup_only_with_bucket = options.dedup_only_with_bucket
|
||||
self.min_text_size = options.dedup_min_text_size or 0
|
||||
self.min_binary_size = options.dedup_min_binary_size or 0
|
||||
self.dedup_only_with_bucket = options.dedup_only_with_bucket or False
|
||||
|
||||
def should_dedup(self, recorded_url):
|
||||
"""Check if we should try to run dedup on resource based on payload
|
||||
@ -326,10 +326,9 @@ class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin)
|
||||
if self.outq:
|
||||
self.outq.put(recorded_url)
|
||||
|
||||
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
|
||||
class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
|
||||
def __init__(self, trough_dedup_db, options=warcprox.Options()):
|
||||
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
|
||||
DedupableMixin.__init__(self, options)
|
||||
self.trough_dedup_db = trough_dedup_db
|
||||
|
||||
def _filter_and_bucketize(self, batch):
|
||||
@ -341,7 +340,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
|
||||
for recorded_url in batch:
|
||||
if (recorded_url.warc_records
|
||||
and recorded_url.warc_records[0].type == b'response'
|
||||
and self.should_dedup(recorded_url)):
|
||||
and self.trough_dedup_db.should_dedup(recorded_url)):
|
||||
if (recorded_url.warcprox_meta
|
||||
and 'dedup-bucket' in recorded_url.warcprox_meta):
|
||||
bucket = recorded_url.warcprox_meta['dedup-bucket']
|
||||
@ -373,10 +372,11 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
|
||||
logging.warn(
|
||||
'timed out saving dedup info to trough', exc_info=True)
|
||||
|
||||
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
|
||||
class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
|
||||
logger = logging.getLogger("warcprox.dedup.BatchTroughLoader")
|
||||
|
||||
def __init__(self, trough_dedup_db, options=warcprox.Options()):
|
||||
warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
|
||||
DedupableMixin.__init__(self, options)
|
||||
self.trough_dedup_db = trough_dedup_db
|
||||
|
||||
def _startup(self):
|
||||
@ -388,16 +388,24 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
|
||||
be looked up.
|
||||
'''
|
||||
buckets = collections.defaultdict(list)
|
||||
discards = []
|
||||
for recorded_url in batch:
|
||||
if (recorded_url.response_recorder
|
||||
and recorded_url.payload_digest
|
||||
and self.should_dedup(recorded_url)):
|
||||
and self.trough_dedup_db.should_dedup(recorded_url)):
|
||||
if (recorded_url.warcprox_meta
|
||||
and 'dedup-bucket' in recorded_url.warcprox_meta):
|
||||
bucket = recorded_url.warcprox_meta['dedup-bucket']
|
||||
else:
|
||||
bucket = '__unspecified__'
|
||||
buckets[bucket].append(recorded_url)
|
||||
else:
|
||||
discards.append(
|
||||
warcprox.digest_str(
|
||||
recorded_url.payload_digest, self.options.base32)
|
||||
if recorded_url.payload_digest else 'n/a')
|
||||
self.logger.debug(
|
||||
'filtered out digests (not loading dedup): %r', discards)
|
||||
return buckets
|
||||
|
||||
def _build_key_index(self, batch):
|
||||
@ -445,10 +453,19 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, DedupableMixin):
|
||||
'problem looking up dedup info for %s urls '
|
||||
'in bucket %s', len(buckets[bucket]), bucket,
|
||||
exc_info=True)
|
||||
|
||||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
dups = sorted([e['digest_key'] for e in future.result()])
|
||||
novel = sorted([
|
||||
k for k in key_index.keys() if k not in dups])
|
||||
self.logger.debug(
|
||||
'bucket %s: dups=%r novel=%r',
|
||||
bucket, dups, novel)
|
||||
|
||||
except futures.TimeoutError as e:
|
||||
# the remaining threads actually keep running in this case,
|
||||
# there's no way to stop them, but that should be harmless
|
||||
logging.warn(
|
||||
self.logger.warn(
|
||||
'timed out loading dedup info from trough', exc_info=True)
|
||||
|
||||
class TroughDedupDb(DedupDb, DedupableMixin):
|
||||
@ -480,7 +497,13 @@ class TroughDedupDb(DedupDb, DedupableMixin):
|
||||
return BatchTroughStorer(self, self.options)
|
||||
|
||||
def start(self):
|
||||
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
||||
try:
|
||||
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
|
||||
except Exception as e:
|
||||
# can happen. hopefully someone else has registered it
|
||||
self.logger.critical(
|
||||
'will try to continue after problem registering schema %s',
|
||||
self.SCHEMA_ID, exc_info=True)
|
||||
|
||||
def save(self, digest_key, response_record, bucket='__unspecified__'):
|
||||
record_id = response_record.get_header(warctools.WarcRecord.ID)
|
||||
|
@ -193,7 +193,7 @@ def _build_arg_parser(prog='warcprox'):
|
||||
action='append', help=(
|
||||
'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
|
||||
'May be used multiple times to register multiple plugins. '
|
||||
'See README.rst for more information.'))
|
||||
'See readme.rst for more information.'))
|
||||
arg_parser.add_argument('--version', action='version',
|
||||
version="warcprox {}".format(warcprox.__version__))
|
||||
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
|
||||
|
@ -53,6 +53,53 @@ def _empty_bucket(bucket):
|
||||
},
|
||||
}
|
||||
|
||||
def unravel_buckets(url, warcprox_meta):
|
||||
'''
|
||||
Unravels bucket definitions in Warcprox-Meta header. Each bucket
|
||||
definition can either be a string, which signifies the name of the
|
||||
bucket, or a dict. If a dict it is expected to have at least an item
|
||||
with key 'bucket' whose value is the name of the bucket. The other
|
||||
currently recognized item is 'tally-domains', which if supplied should
|
||||
be a list of domains. This instructs warcprox to additionally tally
|
||||
substats of the given bucket by domain. Host stats are stored in the
|
||||
stats table under the key '{parent-bucket}:{domain(normalized)}'.
|
||||
|
||||
Returns:
|
||||
list of strings
|
||||
|
||||
Example Warcprox-Meta header (a real one will likely have other
|
||||
sections besides 'stats'):
|
||||
|
||||
Warcprox-Meta: {"stats":{"buckets":["bucket1",{"bucket":"bucket2","tally-domains":["foo.bar.com","192.168.10.20"}]}}
|
||||
|
||||
In this case the return value would be
|
||||
["bucket1","bucket2","bucket2:foo.bar.com","bucket2:192.168.10.20"]
|
||||
'''
|
||||
buckets = ["__all__"]
|
||||
if (warcprox_meta and "stats" in warcprox_meta
|
||||
and "buckets" in warcprox_meta["stats"]):
|
||||
for bucket in warcprox_meta["stats"]["buckets"]:
|
||||
if isinstance(bucket, dict):
|
||||
if not 'bucket' in bucket:
|
||||
self.logger.warn(
|
||||
'ignoring invalid stats bucket in '
|
||||
'warcprox-meta header %s', bucket)
|
||||
continue
|
||||
buckets.append(bucket['bucket'])
|
||||
if bucket.get('tally-domains'):
|
||||
canon_url = urlcanon.semantic(url)
|
||||
for domain in bucket['tally-domains']:
|
||||
domain = urlcanon.normalize_host(domain).decode('ascii')
|
||||
if urlcanon.url_matches_domain(canon_url, domain):
|
||||
buckets.append(
|
||||
'%s:%s' % (bucket['bucket'], domain))
|
||||
else:
|
||||
buckets.append(bucket)
|
||||
else:
|
||||
buckets.append("__unspecified__")
|
||||
|
||||
return buckets
|
||||
|
||||
class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
|
||||
logger = logging.getLogger("warcprox.stats.StatsProcessor")
|
||||
|
||||
@ -153,46 +200,7 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
|
||||
return None
|
||||
|
||||
def buckets(self, recorded_url):
|
||||
'''
|
||||
Unravels bucket definitions in Warcprox-Meta header. Each bucket
|
||||
definition can either be a string, which signifies the name of the
|
||||
bucket, or a dict. If a dict it is expected to have at least an item
|
||||
with key 'bucket' whose value is the name of the bucket. The other
|
||||
currently recognized item is 'tally-domains', which if supplied should
|
||||
be a list of domains. This instructs warcprox to additionally tally
|
||||
substats of the given bucket by domain. Host stats are stored in the
|
||||
stats table under the key '{parent-bucket}:{domain(normalized)}'.
|
||||
|
||||
Example Warcprox-Meta header (a real one will likely have other
|
||||
sections besides 'stats'):
|
||||
|
||||
Warcprox-Meta: {'stats':{'buckets':['bucket1',{'bucket':'bucket2','tally-domains':['foo.bar.com','192.168.10.20'}]}}
|
||||
'''
|
||||
buckets = ["__all__"]
|
||||
if (recorded_url.warcprox_meta
|
||||
and "stats" in recorded_url.warcprox_meta
|
||||
and "buckets" in recorded_url.warcprox_meta["stats"]):
|
||||
for bucket in recorded_url.warcprox_meta["stats"]["buckets"]:
|
||||
if isinstance(bucket, dict):
|
||||
if not 'bucket' in bucket:
|
||||
self.logger.warn(
|
||||
'ignoring invalid stats bucket in '
|
||||
'warcprox-meta header %s', bucket)
|
||||
continue
|
||||
buckets.append(bucket['bucket'])
|
||||
if bucket.get('tally-domains'):
|
||||
url = urlcanon.semantic(recorded_url.url)
|
||||
for domain in bucket['tally-domains']:
|
||||
domain = urlcanon.normalize_host(domain).decode('ascii')
|
||||
if urlcanon.url_matches_domain(url, domain):
|
||||
buckets.append(
|
||||
'%s:%s' % (bucket['bucket'], domain))
|
||||
else:
|
||||
buckets.append(bucket)
|
||||
else:
|
||||
buckets.append("__unspecified__")
|
||||
|
||||
return buckets
|
||||
return unravel_buckets(recorded_url.url, recorded_url.warcprox_meta)
|
||||
|
||||
class RethinkStatsProcessor(StatsProcessor):
|
||||
logger = logging.getLogger("warcprox.stats.RethinkStatsProcessor")
|
||||
@ -301,11 +309,9 @@ class RunningStats:
|
||||
need_ten_sec_snap = (now - self.ten_sec_snaps[0][0]) // 10 > (self.ten_sec_snaps[-1][0] - self.ten_sec_snaps[0][0]) // 10
|
||||
if need_minute_snap:
|
||||
self.minute_snaps.append((now, self.urls, self.warc_bytes))
|
||||
logging.debug('added minute snap %r', self.minute_snaps[-1])
|
||||
if need_ten_sec_snap:
|
||||
self.ten_sec_snaps.popleft()
|
||||
self.ten_sec_snaps.append((now, self.urls, self.warc_bytes))
|
||||
logging.trace('rotated in ten second snap %r', self.ten_sec_snaps[-1])
|
||||
|
||||
def _closest_ten_sec_snap(self, t):
|
||||
# it's a deque so iterating over it is faster than indexed lookup
|
||||
|
@ -72,13 +72,13 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
block_rule = urlcanon.MatchRule(**rule)
|
||||
if block_rule.applies(url):
|
||||
body = ("request rejected by warcprox: blocked by "
|
||||
"rule found in Warcprox-Meta header: %s"
|
||||
% rule).encode("utf-8")
|
||||
"rule found in Warcprox-Meta header: %s\n"
|
||||
% json.dumps(rule)).encode("utf-8")
|
||||
self.send_response(403, "Forbidden")
|
||||
self.send_header("Content-Type", "text/plain;charset=utf-8")
|
||||
self.send_header("Connection", "close")
|
||||
self.send_header("Content-Length", len(body))
|
||||
response_meta = {"blocked-by-rule":rule}
|
||||
response_meta = {"blocked-by-rule": rule}
|
||||
self.send_header(
|
||||
"Warcprox-Meta",
|
||||
json.dumps(response_meta, separators=(",",":")))
|
||||
@ -92,26 +92,26 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
self.client_address[0], self.command,
|
||||
self.url, rule))
|
||||
|
||||
def _enforce_limit(self, limit_key, limit_value, soft=False):
|
||||
def _enforce_limit(self, buckets, limit_key, limit_value, soft=False):
|
||||
if not self.server.stats_db:
|
||||
return
|
||||
bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2)
|
||||
_limit_key = limit_key
|
||||
|
||||
# if limit_key looks like 'job1:foo.com/total/urls' then we only want
|
||||
# to apply this rule if the requested url is within domain
|
||||
bucket0_fields = bucket0.split(':')
|
||||
if len(bucket0_fields) == 2:
|
||||
domain = urlcanon.normalize_host(bucket0_fields[1])
|
||||
if not urlcanon.host_matches_domain(self.hostname, domain):
|
||||
return # else host matches, go ahead and enforce the limit
|
||||
bucket0 = '%s:%s' % (bucket0_fields[0], domain.decode('ascii'))
|
||||
_limit_key = '%s/%s/%s' % (bucket0, bucket1, bucket2)
|
||||
# parse limit key
|
||||
bucket0, bucket1, bucket2 = limit_key.rsplit("/", 2)
|
||||
# normalize domain if part of bucket
|
||||
if ":" in bucket0:
|
||||
b, raw_domain = bucket0.split(":", 1)
|
||||
domain = urlcanon.normalize_host(raw_domain).decode("ascii")
|
||||
bucket0 = "%s:%s" % (b, domain)
|
||||
limit_key = "%s/%s/%s" % (bucket0, bucket1, bucket2)
|
||||
|
||||
if not bucket0 in buckets:
|
||||
return
|
||||
|
||||
value = self.server.stats_db.value(bucket0, bucket1, bucket2)
|
||||
if value and limit_value and limit_value > 0 and value >= limit_value:
|
||||
body = ("request rejected by warcprox: reached %s %s=%s\n" % (
|
||||
"soft limit" if soft else "limit", _limit_key,
|
||||
"soft limit" if soft else "limit", limit_key,
|
||||
limit_value)).encode("utf-8")
|
||||
if soft:
|
||||
self.send_response(430, "Reached soft limit")
|
||||
@ -124,12 +124,11 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
"stats": {bucket0:self.server.stats_db.value(bucket0)}
|
||||
}
|
||||
if soft:
|
||||
response_meta["reached-soft-limit"] = {_limit_key:limit_value}
|
||||
response_meta["reached-soft-limit"] = {limit_key:limit_value}
|
||||
else:
|
||||
response_meta["reached-limit"] = {_limit_key:limit_value}
|
||||
response_meta["reached-limit"] = {limit_key:limit_value}
|
||||
self.send_header(
|
||||
"Warcprox-Meta",
|
||||
json.dumps(response_meta, separators=(",",":")))
|
||||
"Warcprox-Meta", json.dumps(response_meta, separators=",:"))
|
||||
self.end_headers()
|
||||
if self.command != "HEAD":
|
||||
self.wfile.write(body)
|
||||
@ -139,7 +138,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
self.client_address[0], 430 if soft else 420,
|
||||
self.command, self.url,
|
||||
"soft limit" if soft else "limit",
|
||||
_limit_key, limit_value))
|
||||
limit_key, limit_value))
|
||||
|
||||
def _enforce_limits(self, warcprox_meta):
|
||||
"""
|
||||
@ -147,14 +146,15 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
|
||||
warcprox.RequestBlockedByRule if a limit specified in warcprox_meta is
|
||||
reached.
|
||||
"""
|
||||
buckets = warcprox.stats.unravel_buckets(self.url, warcprox_meta)
|
||||
if warcprox_meta and "limits" in warcprox_meta:
|
||||
for item in warcprox_meta["limits"].items():
|
||||
limit_key, limit_value = item
|
||||
self._enforce_limit(limit_key, limit_value, soft=False)
|
||||
self._enforce_limit(buckets, limit_key, limit_value, soft=False)
|
||||
if warcprox_meta and "soft-limits" in warcprox_meta:
|
||||
for item in warcprox_meta["soft-limits"].items():
|
||||
limit_key, limit_value = item
|
||||
self._enforce_limit(limit_key, limit_value, soft=True)
|
||||
self._enforce_limit(buckets, limit_key, limit_value, soft=True)
|
||||
|
||||
def _security_check(self, warcprox_meta):
|
||||
'''
|
||||
|
Loading…
x
Reference in New Issue
Block a user