Compare commits

..

151 Commits

Author SHA1 Message Date
Barbara Miller
369e8a4657
Merge pull request #209 from TheTechRobo/patch-1
Document `compressed_blocks` in api.rst
2024-12-13 14:04:05 -08:00
TheTechRobo
66ad775188
Document compressed_blocks in api.rst
This was introduced in #177 for brozzler, but isn't documented anywhere.
2024-12-13 16:45:09 -05:00
Barbara Miller
fbed60ff38
bump version to 2.6.1 2024-12-05 17:52:02 -08:00
Barbara Miller
d6b9058e3b
Merge pull request #207 from vbanos/certauth-speedup
Do not generate an RSA private key for every https connection to a new host

Thank you, @vbanos!
2024-12-05 17:49:46 -08:00
vbanos
bfe18aeaf1 Do not generate an RSA private key for every https connection
We can reuse the RSA private key we create or load on
`CertificateAuthority.__init__`. There is no need to create another one
for each host we connect to.

`rsa.generate_private_key` is a very slow function.
2024-12-05 16:28:08 +01:00
Barbara Miller
6028e523f3
Merge pull request #206 from internetarchive/trough_dep
update extras trough dependency for pypi
2024-11-05 19:15:07 -08:00
Barbara Miller
7ce00f001c update extras trough dependency for pypi 2024-11-05 19:11:55 -08:00
Barbara Miller
0e565889e1
Merge pull request #205 from internetarchive/for_pypi
updates for pypi update v.2.6.0
2024-11-05 18:11:37 -08:00
Barbara Miller
01832c3cc5 for pypi v.2.6.0 2024-11-05 18:05:51 -08:00
Barbara Miller
ef774f5f29
Merge pull request #204 from galgeek/doublethink_up
update doublethink dependency
2024-10-31 11:29:36 -07:00
Barbara Miller
c3ce3b160a update doublethink dependency 2024-10-31 11:10:47 -07:00
Barbara Miller
14d2a0c005
Merge pull request #201 from vbanos/pyopenssl-cryptography
Upgrade cryptography dependency to >=39,<40
2024-07-28 10:15:35 -07:00
Vangelis Banos
aef8ca7012 Upgrade cryptography dependency to >=39,<40
warcprox crashes with the following error when using
`cryptography==35.0.0`.

```
ValueError: Valid PEM but no BEGIN CERTIFICATE/END CERTIFICATE delimiters. Are you sure this is a certificate?
Traceback (most recent call last):
  File "/opt/spn2/bin/warcprox", line 8, in <module>
    sys.exit(main())
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/main.py", line 330, in main
    controller = warcprox.controller.WarcproxController(options)
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/controller.py", line 145, in __init__
    self.proxy = warcprox.warcproxy.WarcProxy(
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/warcproxy.py", line 561, in __init__
    SingleThreadedWarcProxy.__init__(
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/warcproxy.py", line 509, in __init__
    warcprox.mitmproxy.SingleThreadedMitmProxy.__init__(
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/mitmproxy.py", line 861, in __init__
    self.ca = CertificateAuthority(
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/certauth.py", line 69, in __init__
    self.cert, self.key = self.read_pem(ca_file)
  File "/opt/spn2/lib/python3.8/site-packages/warcprox/certauth.py", line 210, in read_pem
    cert = x509.load_pem_x509_certificate(f.read(), default_backend())
  File "/opt/spn2/lib/python3.8/site-packages/cryptography/x509/base.py", line 436, in load_pem_x509_certificate
    return rust_x509.load_pem_x509_certificate(data)
ValueError: Valid PEM but no BEGIN CERTIFICATE/END CERTIFICATE delimiters. Are you sure this is a certificate?
```
2024-07-28 10:01:01 +00:00
Barbara Miller
701b659510
Merge pull request #200 from vbanos/pyopenssl-cryptography
Thank you, @vbanos!

Replace PyOpenSSL with cryptography
2024-07-27 09:09:29 -07:00
Vangelis Banos
10d36cc943 Replace PyOpenSSL with cryptography
PyOpenSSL is deprecated. We replace it with `cryptography` following
their recommendation at: https://pypi.org/project/pyOpenSSL/

We drop the `pyopenssl` dependency.
2024-07-26 13:04:15 +00:00
Barbara Miller
a65b8b82b9
bump version 2024-07-24 17:10:27 -07:00
Barbara Miller
6756ba60fa
Merge pull request #199 from vbanos/add-certauth
Create warcprox.certauth and drop certauth dependency
2024-07-24 17:09:19 -07:00
Vangelis Banos
2068c037ea Create warcprox.certauth and drop certauth dependency
Copy certauth.py and tests_certauth.gr from `certauth==1.1.6`
b526eb2bfd

Change only imports.

Drop unused imports.

Update setup.py: drop `certauth` and add `pyopenssl`.
2024-07-09 11:56:06 +00:00
Barbara Miller
f00ca5c336
Update copyright 2024-06-04 11:48:25 -07:00
Barbara Miller
c0ea6ef00f
bump version 2024-06-04 11:46:59 -07:00
Barbara Miller
f7d4286b54
Merge pull request #198 from vbanos/subdir-prefix
New option --subdir-prefix
2024-06-04 11:46:07 -07:00
Vangelis Banos
56e0b17dc9 New option --subdir-prefix
Save WARCs in subdirectories equal to the current value of Warcprox-Meta['warc-prefix'].
E.g. if warc-prefix=='spn2' and --dir=/warcs, save them in /warcs/spn2/.
2024-06-03 21:21:19 +00:00
Barbara Miller
af52dec469
bump version 2023-10-17 09:19:56 -07:00
Barbara Miller
848c089afa
Merge pull request #194 from vbanos/socksproxy
Thank you, @vbanos!
2023-10-17 09:18:11 -07:00
Vangelis Banos
9fd5a22502 fix typo 2023-10-17 06:12:28 +00:00
Vangelis Banos
3d653e023c Add SOCKS proxy options
Add options `--socks-proxy`, `--socks-proxy-username,
`--socks-proxy-password`.

If enabled, all traffic is routed throught the SOCKS proxy.
2023-10-16 18:33:42 +00:00
Barbara Miller
4cb8e0d5dc
Merge pull request #192 from internetarchive/Py311
updates for 3.11 (and back to 3.8)
@vbanos and @avdempsey have agreed this PR is ok to merge
2023-09-27 12:03:26 -07:00
Barbara Miller
a20ad226cb
update version to 2.5, for Python version updates 2023-09-27 11:58:39 -07:00
Barbara Miller
bc0da12c48
bump version for Py311 2023-09-20 10:57:54 -07:00
Barbara Miller
8f0039de02 internetarchive/doublethink.git@Py311 2023-09-19 13:57:34 -07:00
Barbara Miller
c620d7dd19 use galgeek for now 2023-09-13 18:03:38 -07:00
Barbara Miller
4fbf523a3e get doublethink from github.com/internetarchive 2023-09-12 16:05:23 -07:00
Barbara Miller
3b5d9d8ef0 update rethinkdb import 2023-09-12 14:39:09 -07:00
Barbara Miller
5e779af2e9 trough and doublethink updates 2023-09-11 17:38:10 -07:00
Barbara Miller
a90c9c3dd4 trough 0.20 maybe 2023-09-11 17:01:02 -07:00
Barbara Miller
99a825c055 initial commit, trying trough branch jammy+focal 2023-09-11 16:40:39 -07:00
Barbara Miller
c01d58df78
Merge pull request #189 from vbanos/idna-update
Thank you, @vbanos!
2023-07-11 14:13:47 -07:00
Vangelis Banos
6eb2bd1265 Drop idna==2.10 version lock
There is no need to use such an old `idna` version.
The latest works with py35+ and all tests pass.
Newer `idna` supports the latest Unicode standard and latest python
versions.
https://github.com/kjd/idna/blob/master/HISTORY.rst
2023-07-09 10:02:13 +00:00
Barbara Miller
d864ea91ee
Merge pull request #187 from vbanos/cryptography-limit
Thanks, @vbanos!
2023-06-22 08:55:33 -07:00
Vangelis Banos
83c109bc9b Change cryptography version limit to >=2.3,<40 2023-06-22 12:22:24 +00:00
Vangelis Banos
1cc08233d6 Limit dependency version cryptography>=2.3,<=39.0.0
cryptography 41.0.0 crashes warcprox with the following exception:
```
File "/opt/spn2/lib/python3.8/site-packages/warcprox/main.py", line 317, in main
  cryptography.hazmat.backends.openssl.backend.activate_builtin_random()
AttributeError: 'Backend' object has no attribute 'activate_builtin_random'
```

Also, cryptography==40.0.0 isn't OK because when I try to use it I get:
```
pyopenssl 23.2.0 requires cryptography!=40.0.0,!=40.0.1,<42,>=38.0.0, but you have cryptography 40.0.0 which is incompatible.
```

So, the version should be <=39.0.0
2023-06-18 09:09:07 +00:00
Barbara Miller
ca02c22ff7
Merge pull request #180 from cclauss/patch-1
Thanks, @cclauss!
2023-04-12 11:45:41 -07:00
Barbara Miller
1fd3b2c7a1
update readme — rm travis 2023-04-12 11:44:01 -07:00
Christian Clauss
ba14480a2d
Delete .travis.yml 2023-04-12 11:37:56 +02:00
Barbara Miller
50a4f35e5f
Merge pull request #177 from internetarchive/blocks-shrink
@adam-miller ok'd this elsewhere
2022-08-05 15:44:05 -07:00
Barbara Miller
9973d28de9 bump version 2022-08-04 17:28:33 -07:00
Barbara Miller
ee9e375560 zlib decompression 2022-08-04 11:14:33 -07:00
Barbara Miller
c008c2eca7
bump version 2022-07-01 14:18:17 -07:00
Barbara Miller
7958921053
Merge pull request #175 from vbanos/random-tls-fingerprint
Thanks, @vbanos!
2022-07-01 14:16:05 -07:00
Vangelis Banos
329fef31a8 Randomize TLS fingerprint
Create a random TLS fingerprint per HTTPS connection to avoid TLS
fingerprinting.
2022-07-01 17:39:49 +00:00
Barbara Miller
d253ea85c3
Merge pull request #173 from internetarchive/increase_batch_sec
tune MIN_BATCH_SEC, MAX_BATCH_SEC for fewer dedup errors
2022-06-24 11:13:18 -07:00
Barbara Miller
8418fe10ba add explanatory comment 2022-06-24 11:07:35 -07:00
Adam Miller
fcd9b2b3bd
Merge pull request #172 from internetarchive/adds-canonicalization-tests
Adding url canonicalization tests and handling of edge cases to reduc…
2022-04-27 09:57:03 -07:00
Adam Miller
731cfe80cc Adding url canonicalization tests and handling of edge cases to reduce log noise 2022-04-26 23:48:54 +00:00
Adam Miller
9521042a23
Merge pull request #171 from internetarchive/adds-hop-path-logging
Adds hop path logging
2022-04-26 12:11:11 -07:00
Adam Miller
daa925db17
Bump version 2022-04-26 09:55:48 -07:00
Adam Miller
d96dd5d842 Adjust rfc3986 package version for deployment across more versions 2022-04-21 18:37:27 +00:00
Adam Miller
1e3d22aba4 Better handle non-ascii urls for crawl log hop info 2022-04-20 22:48:28 +00:00
Adam Miller
5ae1291e37 Refactor of hop path referer logic 2022-03-24 21:40:55 +00:00
Barbara Miller
05daafa19e increase MIN_BATCH_SEC, MAX_BATCH_SEC 2022-03-03 18:46:20 -08:00
Adam Miller
ade2373711 Fixing referer on request with null hop path 2022-03-04 02:01:55 +00:00
Adam Miller
3a234d0cec Refactor hop_path metadata 2022-03-03 00:18:16 +00:00
Adam Miller
366ed5155f Merge branch 'master' into adds-hop-path-logging 2022-02-09 18:18:32 +00:00
Barbara Miller
c027659001
Merge pull request #167 from galgeek/WT-31
fix logging buglet iii
2021-12-29 12:14:56 -08:00
Barbara Miller
9e8ea5bb45 fix logging buglet iii 2021-12-29 12:06:18 -08:00
Barbara Miller
bc3d1e6d00 fix logging buglet ii 2021-12-29 11:55:39 -08:00
Barbara Miller
6b372e2f3f
Merge pull request #166 from galgeek/WT-31
fix logging buglet
2021-12-29 11:04:03 -08:00
Barbara Miller
5d8fbf7038 fix logging buglet 2021-12-29 10:25:04 -08:00
Barbara Miller
a969430b37
Merge pull request #163 from internetarchive/idna2_10
idna==2.10
2021-12-28 13:50:23 -08:00
Barbara Miller
aeecb6515f
bump version 2021-12-28 11:58:30 -08:00
Adam Miller
e1eddb8fa7
Merge pull request #165 from galgeek/WT-31
in-batch dedup
2021-12-28 11:52:41 -08:00
Barbara Miller
d7aec77597 faster, likely 2021-12-16 18:36:00 -08:00
Barbara Miller
bcaf293081 better logging 2021-12-09 12:19:45 -08:00
Barbara Miller
7d4c8dcb4e recorded_url.do_not_archive = True 2021-12-08 11:04:09 -08:00
Barbara Miller
da089e0a92 bytes not str 2021-12-06 20:33:16 -08:00
Barbara Miller
3eeccd0016 more hash_plus_url 2021-12-06 19:43:27 -08:00
Barbara Miller
5e5a74f204 str, not object 2021-12-06 19:33:10 -08:00
Barbara Miller
b67f1ad0f3 add logging 2021-12-06 17:29:27 -08:00
Barbara Miller
e6a1a7dd7e increase trough dedup batch window 2021-12-06 17:29:02 -08:00
Barbara Miller
e744075913 python 3.5 version, mostly 2021-12-02 11:46:39 -08:00
Barbara Miller
1476bfec8c discard batch hash+url match 2021-12-02 11:17:59 -08:00
Adam Miller
b57ec9c589 Check warcprox meta headers for hop information necessary to record a hop path if provided 2021-08-31 17:09:06 +00:00
Barbara Miller
e61099ff5f idna==2.10 2021-04-27 10:26:45 -07:00
Barbara Miller
0e23a31a31
Merge pull request #161 from internetarchive/fixes-malformed-crawl-log-lines
Checking for content type header consiting of only empty spaces and r…
2021-04-21 15:31:17 -07:00
Adam Miller
7f406b7942 Trying to fix tests that only fail during ci 2021-04-01 00:01:47 +00:00
Adam Miller
5f1c8c75fa Add test cases for space in content type header and exception messages 2021-03-31 23:22:04 +00:00
Adam Miller
e0732ffaf4 Checking for content type header consiting of only empty spaces and removing spaces from exception messages in json section 2021-03-29 22:22:19 +00:00
Adam Miller
b8057825d8
Merge pull request #158 from galgeek/failed_url.timestamp
set failed_url.timestamp
2020-09-30 14:49:17 -07:00
Barbara Miller
e2e2c02802 set failed_url.timestamp 2020-09-30 11:47:17 -07:00
jkafader
f19ead0058
Merge pull request #145 from internetarchive/adds-logging-for-failed-connections
Adds logging for failed connections
2020-09-23 12:22:12 -07:00
Adam Miller
36784de174 Merge branch 'master' into adds-logging-for-failed-connections 2020-09-23 19:18:41 +00:00
Barbara Miller
ce1f32dc41
Merge pull request #154 from internetarchive/galgeek-version-update
bump version
2020-08-18 09:30:28 -07:00
Barbara Miller
ae11daedc1
bump version 2020-08-18 09:29:57 -07:00
Barbara Miller
456698fe06
Merge pull request #153 from vbanos/should-dedup-impr
Thanks, @vbanos!
2020-08-17 14:04:49 -07:00
Barbara Miller
d90367f21f
Merge pull request #152 from cclauss/patch-1
Thank you, @cclauss!
2020-08-15 08:49:59 -07:00
Vangelis Banos
8078ee7af9 DedupableMixin.should_dedup() improvement
When a recorded URL has `recorded_url.do_not_archive = True`, it is not
written to WARC. This is checked in
`WarcWriterProcessor._should_archive`.
We shouldn't waste time on deduping something that is not going to be
written to WARC anyway.
2020-08-15 09:17:39 +00:00
Christian Clauss
c649355285
setup.py: Add Python 3.8 2020-08-06 17:58:00 +02:00
Christian Clauss
21351094ec
Travis CI: Add Python 3.8 to testing 2020-08-06 17:27:15 +02:00
Adam Miller
edeae3b21a Expanding logging to handle DNS failures, print error message to crawl log info, and report cached connection errors. 2020-07-22 21:36:39 +00:00
Noah Levitt
b34419543f Oops! 2020-05-06 14:52:32 -07:00
Noah Levitt
5e397e9bca Elide unnecessary params 2020-05-06 14:28:00 -07:00
Noah Levitt
d0b21f5dc4 Undo accidentally committed code 2020-05-06 14:27:34 -07:00
Noah Levitt
36711c0148 try to fix .travis.yml 2020-05-06 14:19:19 -07:00
Noah Levitt
a5e9c27223 Share code, handle exception during CONNECT 2020-05-06 09:54:17 -07:00
Noah Levitt
de9219e646 require more recent urllib3
to avoid this error: https://github.com/internetarchive/warcprox/issues/148

2020-01-28 14:42:44,851 2023 ERROR MitmProxyHandler(tid=2037,started=2020-01-28T20:42:44.834551,client=127.0.0.1:49100) warcprox.warcprox.WarcProxyHandler.do_COMMAND(mitmproxy.py:442) problem processing request 'GET / HTTP/1.1': TypeError("connection_from_host() got an unexpected keyword argument 'pool_kwargs'",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/warcprox/mitmproxy.py", line 413, in do_COMMAND
    self._connect_to_remote_server()
  File "/usr/local/lib/python3.5/dist-packages/warcprox/warcproxy.py", line 189, in _connect_to_remote_server
    return warcprox.mitmproxy.MitmProxyHandler._connect_to_remote_server(self)
  File "/usr/local/lib/python3.5/dist-packages/warcprox/mitmproxy.py", line 277, in _connect_to_remote_server
    pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout})
TypeError: connection_from_host() got an unexpected keyword argument 'pool_kwargs'
2020-02-06 10:10:53 -08:00
Noah Levitt
5c15582be5
Merge pull request #147 from nlevitt/fix-travis-jan2020
tests need trough
2020-01-08 14:29:16 -08:00
Noah Levitt
47731c61c1 tests need trough 2020-01-08 14:05:04 -08:00
Noah Levitt
90fba01514 make trough dependency optional 2020-01-08 13:37:01 -08:00
Noah Levitt
a8cd53bfe4 bump version, trough dep version 2020-01-08 13:24:00 -08:00
Noah Levitt
ee6bc151e1
Merge pull request #146 from vbanos/warc-filename-port
Add port to custom WARC filename vars
2020-01-08 13:22:50 -08:00
Vangelis Banos
ca0197330d Add port to custom WARC filename vars 2020-01-08 21:19:48 +00:00
Noah Levitt
469b41773a fix logging config which trough interfered with 2020-01-07 15:19:03 -08:00
Noah Levitt
91fcc054c4 bump version after merge 2020-01-07 14:42:40 -08:00
Noah Levitt
3f5251ed60
Merge pull request #144 from nlevitt/trough-dedup-schema
change trough dedup `date` type to varchar
2020-01-07 14:41:45 -08:00
Noah Levitt
f54e1b37c7 bump version after merge 2020-01-07 14:40:58 -08:00
Noah Levitt
47ec5d7644
Merge pull request #143 from nlevitt/use-trough-lib
use trough.client instead of warcprox.trough
2020-01-07 14:40:41 -08:00
Adam Miller
4ceebe1fa9 Moving more variables from RecordedUrl to RequiredUrl 2020-01-04 01:41:28 +00:00
Adam Miller
e88a88f247 Refactor failed requests into new class. 2020-01-03 20:43:47 +00:00
Adam Miller
f9c9443d2f Beginning modifications to pass along a dummy RecordedUrl on connection timeout for logging 2019-12-11 01:54:11 +00:00
Noah Levitt
ac959c6db5 change trough dedup date type to varchar
This is a backwards-compatible change whose purpose is to clarify the
existing usage.

In sqlite (and therefore trough), the datatypes of columns are just
suggestions. In fact the values can have any type. See
https://sqlite.org/datatype3.html. `datetime` isn't even a real sqlite
type.

Warcprox stores a string formatted like '2019-11-19T01:23:45Z' in that
field. When it pulls it out of the database and writes a revisit record,
it sticks the raw value in the `WARC-Date` header of that record.
Warcprox never parses the string value.

Since we use the raw textual value of the field, it makes sense to use a
textual datatype to store it.
2019-11-19 13:33:59 -08:00
Noah Levitt
ad652b407c trough uses py3.5+ async syntax
so don't test 3.4; also we know warcprox requires py3 now so don't test
py2
2019-11-19 11:58:56 -08:00
Noah Levitt
fe19bb268f use trough.client instead of warcprox.trough
less redundant code!
trough.client was based off of warcprox.trough but has been improved
since then
2019-11-19 11:45:14 -08:00
Noah Levitt
f77c152037 bump version after merge 2019-09-26 11:49:07 -07:00
Noah Levitt
22d786f72e
Merge pull request #142 from vbanos/fix-close-rename
Another exception when trying to close a WARC file
2019-09-26 11:20:27 -07:00
Vangelis Banos
52e83632dd Another exception when trying to close a WARC file
Recently, we found and fixed a problem when closing a WARC file.
https://github.com/internetarchive/warcprox/pull/140

After using the updated warcprox in production, we got another exception
in the same method, right after that point.

```
ERROR:root:caught exception processing
b'https://abs.twimg.com/favicons/favicon.ico'
Traceback (most recent call last):
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writerthread.py",
line 78, in _process_url
    records = self.writer_pool.write_records(recorded_url)
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line
227, in write_records
    return self._writer(recorded_url).write_records(recorded_url)
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line
139, in write_records
    offset = self.f.tell()
ValueError: I/O operation on closed file
ERROR:warcprox.writer.WarcWriter:could not unlock file
/1/liveweb/warcs/liveweb-20190923194044-wwwb-spn14.us.archive.org.warc.gz
(I/O operation on closed file)
CRITICAL:warcprox.writerthread.WarcWriterProcessor:WarcWriterProcessor(tid=6228)
will try to continue after unexpected error
Traceback (most recent call last):
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/__init__.py",
line 140, in _run
    self._get_process_put()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writerthread.py",
line 60, in _get_process_put
    self.writer_pool.maybe_idle_rollover()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line
233, in maybe_idle_rollover
    w.maybe_idle_rollover()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line
188, in maybe_idle_rollover
    self.close()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line
176, in close
    os.rename(self.path, finalpath)
FileNotFoundError: [Errno 2] No such file or directory:
'/1/liveweb/warcs/liveweb-20190923194044-wwwb-spn14.us.archive.org.warc.gz'
->
'/1/liveweb/warcs/liveweb-20190923194044-wwwb-spn14.us.archive.org.warc.gz'
```

We don't have a WARC file and our code tries to run `os.rename` on a
file that doesn't exist. We add exception handling for that case as
well.

I should have foreseen that when doing the previous fix :(
2019-09-26 17:34:31 +00:00
Noah Levitt
1f852f5f36 bump version after merges 2019-09-23 11:55:00 -07:00
Noah Levitt
a34b7be431
Merge pull request #141 from nlevitt/fix-tests
try to fix test failing due to url-encoding
2019-09-23 11:54:30 -07:00
Noah Levitt
d1b52f8d80 try to fix test failing due to url-encoding
https://travis-ci.org/internetarchive/warcprox/jobs/588557539
test_domain_data_soft_limit
not sure what changed, maybe the requests library, though i can't
reproduce locally, but explicitly decoding should fix the problem
2019-09-23 11:16:48 -07:00
Noah Levitt
da9c4b0b4e
Merge pull request #138 from vbanos/increase-connection-pool-size
Increase remote_connection_pool maxsize
2019-09-23 10:09:05 -07:00
Noah Levitt
af0fe2892c
Merge pull request #140 from vbanos/fix-writer-problem
Handle ValueError when trying to close WARC file
2019-09-23 10:08:36 -07:00
Vangelis Banos
a09901dcef Use "except Exception" to catch all exception types 2019-09-21 09:43:27 +00:00
Vangelis Banos
407e890258 Set connection pool maxsize=6 2019-09-21 09:29:19 +00:00
Noah Levitt
8460a670b2
Merge pull request #139 from vbanos/dedup-impr
Skip cdx dedup for volatile URLs with session params
2019-09-20 14:20:54 -07:00
Vangelis Banos
6536516375 Handle ValueError when trying to close WARC file
We get a lot of the following error in production and warcprox becomes
totally unresponsive when this happens.
```
CRITICAL:warcprox.writerthread.WarcWriterProcessor:WarcWriterProcessor(tid=16646) will try to continue after unexpected error
Traceback (most recent call last):
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/__init__.py", line 140, in _run
    self._get_process_put()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writerthread.py", line 60, in _get_process_put
    self.writer_pool.maybe_idle_rollover()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line 233, in maybe_idle_rollover
    w.maybe_idle_rollover()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line 188, in maybe_idle_rollover
    self.close()
  File "/opt/spn2/lib/python3.5/site-packages/warcprox/writer.py", line 169, in close
    fcntl.lockf(self.f, fcntl.LOCK_UN)
ValueError: I/O operation on closed file
```

Current code handles `IOError`. We also need to handle `ValueError` to address this.
2019-09-20 12:49:09 +00:00
Vangelis Banos
8f20fc014e Skip cdx dedup for volatile URLs with session params
A lot of cdx dedup requests fail. Checking production logs, we see that
we try to dedup URLs that are certainly volative and session-specific.
We can skip them to reduce cdx dedup load. We won't find any matches
anyway since they contain session-specific vars.

We suggest to skip cdx dedup for URL that include `JSESSIONID=`,
`session=` or `sess=`. These are common session URL params, there could
be many-many more.

Example URLs:
```
/session/683/urii8zej/xhr_streaming?JSESSIONID=dv0jkbk2-8xm9t9tf-7wp8lx0m-x4vb22ys

https://tw.popin.cc/popin_discovery/recommend?mode=new&url=https%3A%2F%2Fwww.nownews.com%2Fcat%2Fpolitics%2Fmilitary%2F&&device=pc&media=www.nownews.com&extra=other&agency=cnplus&topn=100&ad=100&r_category=all&country=tw&redirect=false&infinite=nownews&infinite_domain=m.nownews.com&piuid=43757d2474f09288b8410a9f2a40acf1&info=eyJ1c2VyX3RkX29zIjoib3RoZXIiLCJ1c2VyX3RkX29zX3ZlcnNpb24iOiIwLjAuMCIsInVzZXJfdGRfYnJvd3NlciI6IkNocm9tZSIsInVzZXJfdGRfYnJvd3Nlcl92ZXJzaW9uIjoiNzQuMC4zNzI5IiwidXNlcl90ZF9zY3JlZW4iOiIxNjAweDEwMDAiLCJ1c2VyX3RkX3ZpZXdwb3J0IjoiMTEwMHg3ODQiLCJ1c2VyX3RkX3VzZXJfYWdlbnQiOiJNb3ppbGxhLzUuMCAoWDExOyBMaW51eCB4ODZfNjQpIEFwcGxlV2ViS2l0LzUzNy4zNiAoS0hUTUwsIGxpa2UgR2Vja28pIFVidW50dSBDaHJvbWl1bS83NC4wLjM3MjkuMTY5IENocm9tZS83NC4wLjM3MjkuMTY5IFNhZmFyaS81MzcuMzYiLCJ1c2VyX3RkX3JlZmVycmVyIjoiIiwidXNlcl90ZF9wYXRoIjoiL2NhdC9wb2xpdGljcy9taWxpdGFyeS8iLCJ1c2VyX3RkX2NoYXJzZXQiOiJ1dGYtOCIsInVzZXJfdGRfbGFuZ3VhZ2UiOiJlbi11cyIsInVzZXJfdGRfY29sb3IiOiIyNC1iaXQiLCJ1c2VyX3RkX3RpdGxlIjoiJUU4JUJCJThEJUU2JUFEJUE2JTIwJTdDJTIwTk9XbmV3cyUyMCVFNCVCQiU4QSVFNiU5NyVBNSVFNiU5NiVCMCVFOCU4MSU5RSIsInVzZXJfdGRfdXJsIjoiaHR0cHM6Ly93d3cubm93bmV3cy5jb20vY2F0L3BvbGl0aWNzL21pbGl0YXJ5LyIsInVzZXJfdGRfcGxhdGZvcm0iOiJMaW51eCB4ODZfNjQiLCJ1c2VyX3RkX2hvc3QiOiJ3d3cubm93bmV3cy5jb20iLCJ1c2VyX2RldmljZSI6InBjIiwidXNlcl90aW1lIjoxNTYyMDAxMzkyNzY2fQ==&session=13927861b5403&callback=_p6_8e102dd0c975

http://c.statcounter.com/text.php?sc_project=4092884&java=1&security=10fe3b6b&u1=915B47A927524F10185B2F074074BDCB&sc_random=0.017686960888044556&jg=310&rr=1.1.1.1.1.1.1.1.1&resolution=1600&h=1000&camefrom=&u=http%3A//buchlatech.blogspot.com/search/label/prototype&t=Buchla%20Tech%3A%20prototype&rcat=d&rdomo=d&rdomg=310&bb=0&sc_snum=1&sess=cfa820&p=0&text=2
```
2019-09-20 06:31:15 +00:00
Vangelis Banos
84a46e4323 Increase remote_connection_pool maxsize
We noticed a lot of log entries like this in production:
```
WARNING:urllib3.connectionpool:Connection pool is full, discarding
connection: static.xx.fbcdn.net
```
this happens because we use a `PoolManager` and create a number of pools
(param `num_pools`) but the number of connections each pool can have is
just 1 by default (param `maxsize` is 1 by default).

`urllib3` docs say: `maxsize` – Number of connections to save that can be
reused. More than 1 is useful in multithreaded situations.
Ref:
https://urllib3.readthedocs.io/en/1.2.1/pools.html#urllib3.connectionpool.HTTPConnectionPool

I suggest to use `maxsize=10` and re-evaluate after some time if its big
enough.

This improvement will boost performance as we'll reuse more connections
to remote hosts.
2019-09-20 05:55:51 +00:00
Noah Levitt
88a7f79a7e bump version 2019-09-13 10:58:16 -07:00
Noah Levitt
a8cd219da7 add missing import
fixes this problem:

Traceback (most recent call last):
  File "/opt/warcprox-ve3/lib/python3.5/site-packages/warcprox/main.py", line 330, in main
    controller.run_until_shutdown()
  File "/opt/warcprox-ve3/lib/python3.5/site-packages/warcprox/controller.py", line 449, in run_until_shutdown
    os.kill(os.getpid(), 9)
NameError: name 'os' is not defined
2019-09-13 10:57:28 -07:00
Noah Levitt
2b408b3af0 avoid this problem
2019-09-13 17:15:40,659 594 CRITICAL MainThread warcprox.controller.WarcproxController.run_until_shutdown(controller.py:447) graceful shutdown failed
Traceback (most recent call last):
  File "/opt/warcprox-ve3/lib/python3.5/site-packages/warcprox/controller.py", line 445, in run_until_shutdown
    self.shutdown()
  File "/opt/warcprox-ve3/lib/python3.5/site-packages/warcprox/controller.py", line 371, in shutdown
    self.proxy.server_close()
  File "/opt/warcprox-ve3/lib/python3.5/site-packages/warcprox/warcproxy.py", line 503, in server_close
    warcprox.mitmproxy.PooledMitmProxy.server_close(self)
  File "/opt/warcprox-ve3/lib/python3.5/site-packages/warcprox/mitmproxy.py", line 754, in server_close
    for sock in self.remote_server_socks:
RuntimeError: Set changed size during iteration
2019-09-13 10:56:58 -07:00
Noah Levitt
1aa6b0c5d6 log remote host/ip/port on SSLError 2019-08-16 18:31:35 +00:00
Noah Levitt
fce1c3d722 requests/urllib3 version conflict from april must
be obsolete by now...
2019-07-26 14:03:36 -07:00
Noah Levitt
932001c921 bump version after merge 2019-06-20 14:57:36 -07:00
Noah Levitt
a4253d5425
Merge pull request #133 from galgeek/dedup-fixes
handle multiple dedup-buckets, rw or ro (and dedup brozzler test crawls against collection seed)
2019-06-20 14:57:20 -07:00
Barbara Miller
48d96fbc79 fix link 2019-06-20 14:54:44 -07:00
Barbara Miller
c0fcf59c86 rm test not matching use case 2019-06-14 13:34:47 -07:00
Barbara Miller
79aab697e2 more tests 2019-06-14 12:42:25 -07:00
Barbara Miller
51c4f6d622 test_dedup_buckets_multiple 2019-06-13 17:57:29 -07:00
Barbara Miller
8c52bd8442 docs updates 2019-06-13 17:18:51 -07:00
Barbara Miller
d133565061 continue support for _singular_ dedup-bucket 2019-06-04 14:53:06 -07:00
Barbara Miller
6ee7ab36a2 fix tests too 2019-05-31 17:36:13 -07:00
Barbara Miller
957bd079e8 WIP (untested): handle multiple dedup-buckets, rw or ro 2019-05-30 19:27:46 -07:00
23 changed files with 1150 additions and 500 deletions

View File

@ -1,69 +0,0 @@
sudo: required
dist: xenial
language: python
python:
- 3.7
- 3.6
- 3.5
- 3.4
- 2.7
- pypy
- pypy3.5
- nightly
matrix:
allow_failures:
- python: nightly
- python: 2.7
- python: pypy
addons:
apt:
packages:
- tor
services:
- docker
before_install:
- sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778
- docker network create --driver=bridge trough
- docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb
- docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed
- docker run --detach --network=trough --hostname=trough --name=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3.6 bash /run-trough.sh
- cat /etc/hosts
- echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄
- echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts
- echo 127.0.0.1 hadoop | sudo tee -a /etc/hosts
- echo 127.0.0.1 trough | sudo tee -a /etc/hosts
- cat /etc/hosts
- ping -c2 trough
install:
- pip install . pytest requests warcio mock
before_script:
- docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true
- docker logs --timestamps --details trough
- ps ww -fHe
- docker ps
script:
- py.test -v --tb=native tests
- py.test -v --tb=native --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests
- py.test -v --tb=native --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests
- py.test -v --tb=native --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests
after_script:
- ps ww -fHe
- docker exec trough cat /tmp/trough-write.out
- docker exec trough cat /tmp/trough-segment-manager-server.out
- docker exec trough cat /tmp/trough-segment-manager-local.out
- docker exec trough cat /tmp/trough-sync-server.out
- docker exec trough cat /tmp/trough-sync-local.out
- docker exec trough cat /tmp/trough-read.out
notifications:
slack:
secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo=
secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc=

View File

@ -1,7 +1,5 @@
Warcprox - WARC writing MITM HTTP/S proxy
*****************************************
.. image:: https://travis-ci.org/internetarchive/warcprox.svg?branch=master
:target: https://travis-ci.org/internetarchive/warcprox
Warcprox is an HTTP proxy designed for web archiving applications. When used in
parallel with `brozzler <https://github.com/internetarchive/brozzler>`_ it
@ -89,12 +87,13 @@ for deduplication works similarly to deduplication by `Heritrix
4. If not found,
a. Write ``response`` record with full payload
b. Store new entry in deduplication database
b. Store new entry in deduplication database (can be disabled, see
`Warcprox-Meta HTTP request header <api.rst#warcprox-meta-http-request-header>`_)
The deduplication database is partitioned into different "buckets". URLs are
deduplicated only against other captures in the same bucket. If specified, the
``dedup-bucket`` field of the `Warcprox-Meta HTTP request header
<api.rst#warcprox-meta-http-request-header>`_ determines the bucket. Otherwise,
``dedup-buckets`` field of the `Warcprox-Meta HTTP request header
<api.rst#warcprox-meta-http-request-header>`_ determines the bucket(s). Otherwise,
the default bucket is used.
Deduplication can be disabled entirely by starting warcprox with the argument

0
__init__.py Normal file
View File

26
api.rst
View File

@ -137,14 +137,16 @@ Example::
Warcprox-Meta: {"warc-prefix": "special-warc"}
``dedup-bucket`` (string)
``dedup-buckets`` (string)
~~~~~~~~~~~~~~~~~~~~~~~~~
Specifies the deduplication bucket. For more information about deduplication
Specifies the deduplication bucket(s). For more information about deduplication
see `<README.rst#deduplication>`_.
Example::
Examples::
Warcprox-Meta: {"dedup-bucket":"my-dedup-bucket"}
Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw"}}
Warcprox-Meta: {"dedup-buckets":{"my-dedup-bucket":"rw", "my-read-only-dedup-bucket": "ro"}}
``blocks`` (list)
~~~~~~~~~~~~~~~~~
@ -184,6 +186,22 @@ to evaluate the block rules. In particular, this circumstance prevails when the
browser controlled by brozzler is requesting images, javascript, css, and so
on, embedded in a page.
``compressed_blocks`` (string)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If the ``blocks`` header is large, it may be useful or necessary to compress it.
``compressed_blocks`` is a string containing a zlib and base64-encoded
``blocks`` list. If both ``blocks`` and ``compressed_blocks`` are provided,
warcprox will use the value of ``compressed_blocks``, however this behavior
is not guaranteed.
Example::
Warcprox-Meta: {"compressed_blocks": "eJwVykEKgCAQQNGryKwt90F0kGgxlZSgzuCMFIR3r7b//fkBkVoUBgMbJetvTBy9de5U5cFBs+aBnRKG/D8J44XF91XAGpC6ipaQj58u7iIdIfd88oSbBsrjF6gqtOUFJ5YjwQ=="}
Is equivalent to::
{"blocks": [{"ssurt": "com,example,//http:/"}, {"domain": "malware.us", "substring": "wp-login.php?action=logout"}]}
``stats`` (dictionary)
~~~~~~~~~~~~~~~~~~~~~~
``stats`` is a dictionary with only one field understood by warcprox,

28
pyproject.toml Normal file
View File

@ -0,0 +1,28 @@
[project]
name = "warcprox"
authors = [
{ name="Noah Levitt", email="nlevitt@archive.org" },
]
maintainers = [
{ name="Vangelis Banos", email="vangelis@archive.org" },
{ name="Adam Miller", email="adam@archive.org" },
{ name="Barbara Miller", email="barbara@archive.org" },
{ name="Alex Dempsey", email="avdempsey@archive.org" },
]
description = "WARC writing MITM HTTP/S proxy"
readme = "README.rst"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
]
dynamic = [ "version", "license", "scripts", "dependencies", "optional-dependencies" ]
[project.urls]
Homepage = "https://github.com/internetarchive/warcprox"
Issues = "https://github.com/internetarchive/warcprox/issues"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

View File

@ -2,7 +2,7 @@
'''
setup.py - setuptools installation configuration for warcprox
Copyright (C) 2013-2019 Internet Archive
Copyright (C) 2013-2024 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -24,17 +24,17 @@ import sys
import setuptools
deps = [
'certauth==1.1.6',
'warctools>=4.10.0',
'urlcanon>=0.3.0',
'doublethink>=0.2.0.dev87',
'urllib3>=1.14,<1.25',
'doublethink==0.4.9',
'urllib3>=1.23',
'requests>=2.0.1',
'PySocks>=1.6.8',
'cryptography>=2.3',
'idna>=2.5',
'cryptography>=39,<40',
'idna',
'PyYAML>=5.1',
'cachetools',
'rfc3986>=1.5.0',
]
try:
import concurrent.futures
@ -43,7 +43,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.4.15',
version='2.6.1',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',
@ -52,6 +52,8 @@ setuptools.setup(
license='GPL',
packages=['warcprox'],
install_requires=deps,
# preferred trough 'trough @ git+https://github.com/internetarchive/trough.git@jammy_focal'
extras_require={'trough': 'trough'},
setup_requires=['pytest-runner'],
tests_require=['mock', 'pytest', 'warcio'],
entry_points={
@ -66,13 +68,12 @@ setuptools.setup(
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'License :: OSI Approved :: GNU General Public License (GPL)',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Internet :: Proxy Servers',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Archiving',
])

View File

@ -19,7 +19,7 @@
# USA.
#
FROM phusion/baseimage
FROM ubuntu:focal-20220404
MAINTAINER Noah Levitt <nlevitt@archive.org>
# see https://github.com/stuartpb/rethinkdb-dockerfiles/blob/master/trusty/2.1.3/Dockerfile
@ -28,10 +28,11 @@ MAINTAINER Noah Levitt <nlevitt@archive.org>
ENV LANG=C.UTF-8
RUN apt-get update && apt-get --auto-remove -y dist-upgrade
RUN apt-get install -y ca-certificates curl gnupg wget
# Add the RethinkDB repository and public key
RUN curl -s https://download.rethinkdb.com/apt/pubkey.gpg | apt-key add - \
&& echo "deb http://download.rethinkdb.com/apt xenial main" > /etc/apt/sources.list.d/rethinkdb.list \
RUN curl -Ss https://download.rethinkdb.com/repository/raw/pubkey.gpg | apt-key add -
RUN echo "deb https://download.rethinkdb.com/repository/ubuntu-focal focal main" > /etc/apt/sources.list.d/rethinkdb.list \
&& apt-get update && apt-get -y install rethinkdb
RUN mkdir -vp /etc/service/rethinkdb \
@ -57,25 +58,54 @@ RUN mkdir -vp /etc/service/tor \
&& chmod a+x /etc/service/tor/run
# hadoop hdfs for trough
RUN curl -s https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/archive.key | apt-key add - \
&& . /etc/lsb-release \
&& echo "deb [arch=amd64] http://archive.cloudera.com/cdh5/ubuntu/$DISTRIB_CODENAME/amd64/cdh $DISTRIB_CODENAME-cdh5 contrib" >> /etc/apt/sources.list.d/cloudera.list
RUN apt-get update
RUN apt-get install -y openjdk-8-jdk hadoop-conf-pseudo
ARG DEBIAN_FRONTEND=noninteractive
ENV TZ=Etc/UTC
RUN apt-get install -y openjdk-8-jdk openssh-server
RUN su hdfs -c 'hdfs namenode -format'
# set java home
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
RUN mv -v /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/core-site.xml.orig \
&& cat /etc/hadoop/conf/core-site.xml.orig | sed 's,localhost:8020,0.0.0.0:8020,' > /etc/hadoop/conf/core-site.xml
# setup ssh with no passphrase
RUN ssh-keygen -t rsa -f $HOME/.ssh/id_rsa -P "" \
&& cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
RUN mv -v /etc/hadoop/conf/hdfs-site.xml /etc/hadoop/conf/hdfs-site.xml.orig \
&& cat /etc/hadoop/conf/hdfs-site.xml.orig | sed 's,^</configuration>$, <property>\n <name>dfs.permissions.enabled</name>\n <value>false</value>\n </property>\n</configuration>,' > /etc/hadoop/conf/hdfs-site.xml
RUN wget -O /hadoop-2.7.3.tar.gz -q https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz \
&& tar xfz hadoop-2.7.3.tar.gz \
&& mv /hadoop-2.7.3 /usr/local/hadoop \
&& rm /hadoop-2.7.3.tar.gz
# hadoop environment variables
ENV HADOOP_HOME=/usr/local/hadoop
ENV PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
RUN echo '#!/bin/bash\nservice hadoop-hdfs-namenode start\nservice hadoop-hdfs-datanode start' > /etc/my_init.d/50_start_hdfs.sh \
&& chmod a+x /etc/my_init.d/50_start_hdfs.sh
# hadoop-store
RUN mkdir -p $HADOOP_HOME/hdfs/namenode \
&& mkdir -p $HADOOP_HOME/hdfs/datanode
RUN apt-get install -y libsqlite3-dev
# Temporary files: http://refspecs.linuxfoundation.org/FHS_3.0/fhs/ch03s18.html
COPY config/ /tmp/
RUN mv /tmp/ssh_config $HOME/.ssh/config \
&& mv /tmp/hadoop-env.sh $HADOOP_HOME/etc/hadoop/hadoop-env.sh \
&& mv /tmp/core-site.xml $HADOOP_HOME/etc/hadoop/core-site.xml \
&& mv /tmp/hdfs-site.xml $HADOOP_HOME/etc/hadoop/hdfs-site.xml \
&& mv /tmp/mapred-site.xml $HADOOP_HOME/etc/hadoop/mapred-site.xml.template \
&& cp $HADOOP_HOME/etc/hadoop/mapred-site.xml.template $HADOOP_HOME/etc/hadoop/mapred-site.xml \
&& mv /tmp/yarn-site.xml $HADOOP_HOME/etc/hadoop/yarn-site.xml
# Add startup script
ADD config/hadoop-services.sh $HADOOP_HOME/hadoop-services.sh
# set permissions
RUN chmod 744 -R $HADOOP_HOME
# format namenode
RUN $HADOOP_HOME/bin/hdfs namenode -format
# run hadoop services
#ENTRYPOINT $HADOOP_HOME/hadoop-services.sh; bash
RUN apt-get install -y libsqlite3-dev build-essential
# trough itself
RUN virtualenv -p python3 /opt/trough-ve3 \
@ -107,3 +137,4 @@ RUN mkdir -vp /etc/service/trough-segment-manager-server \
&& echo '#!/bin/bash\nvenv=/opt/trough-ve3\nsource $venv/bin/activate\nsleep 5\npython -c $"import doublethink ; from trough.settings import settings ; rr = doublethink.Rethinker(settings[\"RETHINKDB_HOSTS\"]) ; rr.db(\"trough_configuration\").wait().run()"\nexec uwsgi --venv=$venv --http :6111 --master --processes=2 --harakiri=7200 --http-timeout=7200 --max-requests=50000 --vacuum --die-on-term --mount /=trough.wsgi.segment_manager:server >>/tmp/trough-segment-manager-server.out 2>&1' > /etc/service/trough-segment-manager-server/run \
&& chmod a+x /etc/service/trough-segment-manager-server/run
RUN apt-get install -y daemontools daemontools-run

View File

@ -31,15 +31,18 @@ script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
docker build -t internetarchive/warcprox-tests $script_dir
docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests /sbin/my_init -- \
docker run --rm --volume="$script_dir/..:/warcprox" internetarchive/warcprox-tests \
bash -x -c "cd /tmp && git clone /warcprox && cd /tmp/warcprox \
&& (cd /warcprox && git diff HEAD) | patch -p1 \
&& virtualenv -p python3 /tmp/venv \
&& source /tmp/venv/bin/activate \
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio \
&& py.test -v tests \
&& py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
&& pip --log-file /tmp/pip.log install . pytest mock requests warcio trough \
&& py.test -v tests; \
svscan /etc/service & \
sleep 10; \
py.test -v --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests \
&& py.test -v --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests \
&& /usr/local/hadoop/hadoop-services.sh \
&& py.test -v --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests \
"

89
tests/test_certauth.py Normal file
View File

@ -0,0 +1,89 @@
import os
import shutil
from warcprox.certauth import main, CertificateAuthority
import tempfile
from OpenSSL import crypto
import datetime
import time
def setup_module():
global TEST_CA_DIR
TEST_CA_DIR = tempfile.mkdtemp()
global TEST_CA_ROOT
TEST_CA_ROOT = os.path.join(TEST_CA_DIR, 'certauth_test_ca.pem')
def teardown_module():
shutil.rmtree(TEST_CA_DIR)
assert not os.path.isdir(TEST_CA_DIR)
assert not os.path.isfile(TEST_CA_ROOT)
def test_create_root():
ret = main([TEST_CA_ROOT, '-c', 'Test Root Cert'])
assert ret == 0
def test_create_host_cert():
ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '-n', 'example.com'])
assert ret == 0
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
def test_create_wildcard_host_cert_force_overwrite():
ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '--hostname', 'example.com', '-w', '-f'])
assert ret == 0
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
def test_explicit_wildcard():
ca = CertificateAuthority(TEST_CA_ROOT, TEST_CA_DIR, 'Test CA')
filename = ca.get_wildcard_cert('test.example.proxy')
certfile = os.path.join(TEST_CA_DIR, 'example.proxy.pem')
assert filename == certfile
assert os.path.isfile(certfile)
os.remove(certfile)
def test_create_already_exists():
ret = main([TEST_CA_ROOT, '-d', TEST_CA_DIR, '-n', 'example.com', '-w'])
assert ret == 1
certfile = os.path.join(TEST_CA_DIR, 'example.com.pem')
assert os.path.isfile(certfile)
# remove now
os.remove(certfile)
def test_create_root_already_exists():
ret = main([TEST_CA_ROOT])
# not created, already exists
assert ret == 1
# remove now
os.remove(TEST_CA_ROOT)
def test_create_root_subdir():
# create a new cert in a subdirectory
subdir = os.path.join(TEST_CA_DIR, 'subdir')
ca_file = os.path.join(subdir, 'certauth_test_ca.pem')
ca = CertificateAuthority(ca_file, subdir, 'Test CA',
cert_not_before=-60 * 60,
cert_not_after=60 * 60 * 24 * 3)
assert os.path.isdir(subdir)
assert os.path.isfile(ca_file)
buff = ca.get_root_PKCS12()
assert len(buff) > 0
expected_not_before = datetime.datetime.utcnow() - datetime.timedelta(seconds=60 * 60)
expected_not_after = datetime.datetime.utcnow() + datetime.timedelta(seconds=60 * 60 * 24 * 3)
cert = crypto.load_pkcs12(buff).get_certificate()
actual_not_before = datetime.datetime.strptime(
cert.get_notBefore().decode('ascii'), '%Y%m%d%H%M%SZ')
actual_not_after = datetime.datetime.strptime(
cert.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ')
time.mktime(expected_not_before.utctimetuple())
assert abs((time.mktime(actual_not_before.utctimetuple()) - time.mktime(expected_not_before.utctimetuple()))) < 10
assert abs((time.mktime(actual_not_after.utctimetuple()) - time.mktime(expected_not_after.utctimetuple()))) < 10

View File

@ -52,6 +52,7 @@ import mock
import email.message
import socketserver
from concurrent import futures
import urllib.parse
try:
import http.server as http_server
@ -67,6 +68,7 @@ import certauth.certauth
import warcprox
import warcprox.main
import warcprox.crawl_log as crawl_log
try:
import http.client as http_client
@ -175,8 +177,10 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
def build_response(self):
m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
if m is not None:
special_header = 'warcprox-test-header: {}!'.format(m.group(1)).encode('utf-8')
payload = 'I am the warcprox test payload! {}!\n'.format(10*m.group(2)).encode('utf-8')
seg1 = urllib.parse.unquote(m.group(1))
seg2 = urllib.parse.unquote(m.group(2))
special_header = 'warcprox-test-header: {}!'.format(seg1).encode('utf-8')
payload = 'I am the warcprox test payload! {}!\n'.format(10*seg2).encode('utf-8')
headers = (b'HTTP/1.1 200 OK\r\n'
+ b'Content-Type: text/plain\r\n'
+ special_header + b'\r\n'
@ -290,6 +294,12 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
payload = chunkify(
b'Server closes connection when client expects next chunk')
payload = payload[:-7]
elif self.path == '/space_in_content_type':
payload = b'test'
headers = (b'HTTP/1.1 200 OK\r\n'
+ b'Content-Type: \r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n'
+ b'\r\n')
else:
payload = b'404 Not Found\n'
headers = (b'HTTP/1.1 404 Not Found\r\n'
@ -790,7 +800,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
# archive url1 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_a"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_a":"rw"}})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -816,7 +826,7 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
assert dedup_lookup is None
# archive url2 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-bucket":"bucket_b"})}
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","dedup-buckets":{"bucket_b":""}})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
@ -916,6 +926,71 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
finally:
fh.close()
def test_dedup_buckets_readonly(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
# archive url1
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets_readonly",
"dedup-buckets":{"bucket_1":"rw", "bucket_2":"ro"}})
}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 1)
# check url1 in dedup db bucket_1 (rw)
# logging.info('looking up sha1:bc3fac8847c9412f49d955e626fb58a76befbf81 in bucket_1')
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_1")
assert dedup_lookup
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
record_id = dedup_lookup['id']
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_2 (ro)
dedup_lookup = warcprox_.dedup_db.lookup(
b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_2")
assert dedup_lookup is None
# close the warc
assert warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"]
writer = warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"]
warc_path = os.path.join(writer.directory, writer.finalname)
assert not os.path.exists(warc_path)
warcprox_.warc_writer_processor.writer_pool.warc_writers["test_dedup_buckets_readonly"].close()
assert os.path.exists(warc_path)
# read the warc
fh = warctools.ArchiveRecord.open_archive(warc_path)
record_iter = fh.read_records(limit=None, offsets=True)
try:
(offset, record, errors) = next(record_iter)
assert record.type == b'warcinfo'
# url1 bucket_1
(offset, record, errors) = next(record_iter)
assert record.type == b'response'
assert record.url == url1.encode('ascii')
# check for duplicate warc record headers
assert Counter(h[0] for h in record.headers).most_common(1)[0][1] == 1
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# that's all folks
assert next(record_iter)[1] == None
assert next(record_iter, None) == None
finally:
fh.close()
def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
revisits_before = warcprox_.proxy.stats_db.value(
@ -928,7 +1003,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
"dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
@ -944,7 +1019,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, -i - 1)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
"dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
@ -959,7 +1034,7 @@ def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archivin
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
"dedup-buckets":{"bucket_%s" % i:"rw"}})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
@ -1286,7 +1361,7 @@ def test_domain_data_soft_limit(
warcprox_.proxy.remote_connection_pool.clear()
# novel, pushes stats over the limit
url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/~'.format(https_daemon.server_port)
url = 'https://muh.XN--Zz-2Ka.locALHOst:{}/z/%7E'.format(https_daemon.server_port)
response = requests.get(
url, proxies=archiving_proxies, headers=headers, stream=True,
verify=False)
@ -1413,7 +1488,7 @@ def test_missing_content_length(archiving_proxies, http_daemon, https_daemon, wa
assert not 'content-length' in response.headers
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 2, timeout=20)
def test_limit_large_resource(archiving_proxies, http_daemon, warcprox_):
"""We try to load a 300k response but we use --max-resource-size=200000 in
@ -1500,7 +1575,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None
# archive with dedup_ok:False
request_meta = {'dedup-bucket':'test_dedup_ok_flag','dedup-ok':False}
request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''},'dedup-ok':False}
headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1518,7 +1593,7 @@ def test_dedup_ok_flag(
assert dedup_lookup is None
# archive without dedup_ok:False
request_meta = {'dedup-bucket':'test_dedup_ok_flag'}
request_meta = {'dedup-buckets':{'test_dedup_ok_flag':''}}
headers = {'Warcprox-Meta': json.dumps(request_meta)}
response = requests.get(
url, proxies=archiving_proxies, headers=headers, verify=False)
@ -1924,6 +1999,155 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert extra_info['contentSize'] == 38
assert extra_info['method'] == 'WARCPROX_WRITE_RECORD'
#Empty spae for Content Type
url = 'http://localhost:%s/space_in_content_type' % http_daemon.server_port
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_5'})}
response = requests.get(url, proxies=archiving_proxies, headers=headers)
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 6)
file = os.path.join(
warcprox_.options.crawl_log_dir,
'test_crawl_log_5-%s-%s.log' % (hostname, port))
assert os.path.exists(file)
crawl_log_5 = open(file, 'rb').read()
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_5)
assert crawl_log_5[24:31] == b' 200 '
assert crawl_log_5[31:42] == b' 4 '
fields = crawl_log_5.split()
assert len(fields) == 13
assert fields[3].endswith(b'/space_in_content_type')
assert fields[4] == b'-'
assert fields[5] == b'-'
assert fields[6] == b'-'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:a94a8fe5ccb19ba61c4c0873d391e987982fbbd3'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert set(extra_info.keys()) == {
'contentSize', 'warcFilename', 'warcFileOffset'}
assert extra_info['contentSize'] == 59
#Fetch Exception
url = 'http://localhost-doesnt-exist:%s/connection-error' % http_daemon.server_port
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_6'})}
response = requests.get(url, proxies=archiving_proxies, headers=headers)
#Verify the connection is cleaned up properly after the exception
url = 'http://localhost:%s/b/aa' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies)
assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 7)
file = os.path.join(
warcprox_.options.crawl_log_dir,
'test_crawl_log_6-%s-%s.log' % (hostname, port))
assert os.path.exists(file)
crawl_log_6 = open(file, 'rb').read()
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_6)
#seems to vary depending on the environment
assert crawl_log_6[24:31] == b' -6 ' or crawl_log_6[24:31] == b' -2 '
assert crawl_log_6[31:42] == b' 0 '
fields = crawl_log_6.split()
assert len(fields) == 13
assert fields[3].endswith(b'/connection-error')
assert fields[4] == b'-'
assert fields[5] == b'-'
assert fields[6] == b'-'
assert fields[7] == b'-'
assert fields[8] == b'-'
assert fields[9] == b'-'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert set(extra_info.keys()) == {'exception'}
#Test the same bad server to check for -404
url = 'http://localhost-doesnt-exist:%s/connection-error' % http_daemon.server_port
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_7'})}
response = requests.get(url, proxies=archiving_proxies, headers=headers)
#Verify the connection is cleaned up properly after the exception
url = 'http://localhost:%s/b/aa' % http_daemon.server_port
response = requests.get(url, proxies=archiving_proxies)
assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 8)
file = os.path.join(
warcprox_.options.crawl_log_dir,
'test_crawl_log_7-%s-%s.log' % (hostname, port))
assert os.path.exists(file)
crawl_log_7 = open(file, 'rb').read()
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_7)
assert crawl_log_7[24:31] == b' -404 '
assert crawl_log_7[31:42] == b' 0 '
fields = crawl_log_7.split()
assert len(fields) == 13
assert fields[3].endswith(b'/connection-error')
assert fields[4] == b'-'
assert fields[5] == b'-'
assert fields[6] == b'-'
assert fields[7] == b'-'
assert fields[8] == b'-'
assert fields[9] == b'-'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert set(extra_info.keys()) == {'exception'}
#Verify non-ascii urls are encoded properly
url = 'http://localhost:%s/b/¶-non-ascii' % http_daemon.server_port
headers = {
"Warcprox-Meta": json.dumps({"warc-prefix":"test_crawl_log_8",
"metadata":{'seed': 'http://example.com/¶-non-ascii', 'hop_path': 'L', 'brozzled_url': 'http://localhost:%s/b/¶-non-ascii' % http_daemon.server_port, 'hop_via_url': 'http://чунджа.kz/b/¶-non-ascii'}}),
}
response = requests.get(url, proxies=archiving_proxies, headers=headers)
assert response.status_code == 200
# wait for postfetch chain
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 9)
file = os.path.join(
warcprox_.options.crawl_log_dir,
'test_crawl_log_8-%s-%s.log' % (hostname, port))
assert os.path.exists(file)
crawl_log_8 = open(file, 'rb').read()
assert re.match(br'\A2[^\n]+\n\Z', crawl_log_8)
assert crawl_log_8[24:31] == b' 200 '
assert crawl_log_8[31:42] == b' 154 '
fields = crawl_log_8.split()
assert len(fields) == 13
assert fields[3].endswith(b'/b/%C2%B6-non-ascii')
assert fields[4] == b'L'
assert fields[5].endswith(b'http://xn--80ahg0a3ax.kz/b/%C2%B6-non-ascii')
assert fields[6] == b'text/plain'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:cdd841ea7c5e46fde3fba56b2e45e4df5aeec439'
assert fields[10].endswith('/¶-non-ascii'.encode('utf-8'))
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
def test_crawl_log_canonicalization():
assert crawl_log.canonicalize_url(None) is None
assert crawl_log.canonicalize_url("") is ''
assert crawl_log.canonicalize_url("-") == '-'
assert crawl_log.canonicalize_url("http://чунджа.kz/b/¶-non-ascii") == "http://xn--80ahg0a3ax.kz/b/%C2%B6-non-ascii"
assert crawl_log.canonicalize_url("Not a URL") == "Not a URL"
def test_long_warcprox_meta(
warcprox_, http_daemon, archiving_proxies, playback_proxies):
urls_before = warcprox_.proxy.running_stats.urls
@ -2064,24 +2288,6 @@ def test_payload_digest(warcprox_, http_daemon):
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
def test_trough_segment_promotion(warcprox_):
if not warcprox_.options.rethinkdb_trough_db_url:
return
cli = warcprox.trough.TroughClient(
warcprox_.options.rethinkdb_trough_db_url, 3)
promoted = []
def mock(segment_id):
promoted.append(segment_id)
cli.promote = mock
cli.register_schema('default', 'create table foo (bar varchar(100))')
cli.write('my_seg', 'insert into foo (bar) values ("boof")')
assert promoted == []
time.sleep(3)
assert promoted == ['my_seg']
promoted = []
time.sleep(3)
assert promoted == []
def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies):
"""We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we
try to download content smaller than these limits to make sure that it is

View File

@ -1,7 +1,7 @@
"""
warcprox/__init__.py - warcprox package main file, contains some utility code
Copyright (C) 2013-2019 Internet Archive
Copyright (C) 2013-2021 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -175,8 +175,10 @@ class BaseStandardPostfetchProcessor(BasePostfetchProcessor):
class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
MAX_BATCH_SIZE = 500
MAX_BATCH_SEC = 10
MIN_BATCH_SEC = 2.0
MAX_BATCH_SEC = 60
MIN_BATCH_SEC = 30
# these updated batch seconds values have resulted in fewer reported dedup
# errors and otherwise have worked well in qa
def _get_process_put(self):
batch = []

View File

@ -33,7 +33,7 @@ import hashlib
import threading
import datetime
import doublethink
import rethinkdb as r
from rethinkdb import RethinkDB; r = RethinkDB()
from warcprox.dedup import DedupableMixin
class RethinkCaptures:
@ -157,8 +157,11 @@ class RethinkCaptures:
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
if (recorded_url.warcprox_meta
and "dedup-bucket" in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta["dedup-bucket"]
and "dedup-buckets" in recorded_url.warcprox_meta):
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
# maybe this is the right thing to do here? or should we return an entry for each? or ?
break
else:
bucket = "__unspecified__"

278
warcprox/certauth.py Normal file
View File

@ -0,0 +1,278 @@
import logging
import os
import random
from argparse import ArgumentParser
from datetime import datetime, timedelta
import threading
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
# =================================================================
# Valid for 3 years from now
# Max validity is 39 months:
# https://casecurity.org/2015/02/19/ssl-certificate-validity-periods-limited-to-39-months-starting-in-april/
CERT_NOT_AFTER = 3 * 365 * 24 * 60 * 60
CERTS_DIR = './ca/certs/'
CERT_NAME = 'certauth sample CA'
DEF_HASH_FUNC = hashes.SHA256()
# =================================================================
class CertificateAuthority(object):
"""
Utility class for signing individual certificate
with a root cert.
Static generate_ca_root() method for creating the root cert
All certs saved on filesystem. Individual certs are stored
in specified certs_dir and reused if previously created.
"""
def __init__(self, ca_file, certs_dir, ca_name,
overwrite=False,
cert_not_before=0,
cert_not_after=CERT_NOT_AFTER):
assert(ca_file)
self.ca_file = ca_file
assert(certs_dir)
self.certs_dir = certs_dir
assert(ca_name)
self.ca_name = ca_name
self._file_created = False
self.cert_not_before = cert_not_before
self.cert_not_after = cert_not_after
if not os.path.exists(certs_dir):
os.makedirs(certs_dir)
# if file doesn't exist or overwrite is true
# create new root cert
if (overwrite or not os.path.isfile(ca_file)):
self.cert, self.key = self.generate_ca_root(ca_file, ca_name)
self._file_created = True
# read previously created root cert
else:
self.cert, self.key = self.read_pem(ca_file)
self._lock = threading.Lock()
def cert_for_host(self, host, overwrite=False, wildcard=False):
with self._lock:
host_filename = os.path.join(self.certs_dir, host) + '.pem'
if not overwrite and os.path.exists(host_filename):
self._file_created = False
return host_filename
self.generate_host_cert(host, self.cert, self.key, host_filename,
wildcard)
self._file_created = True
return host_filename
def get_wildcard_cert(self, cert_host):
host_parts = cert_host.split('.', 1)
if len(host_parts) == 2 and '.' in host_parts[1]:
cert_host = host_parts[1]
certfile = self.cert_for_host(cert_host,
wildcard=True)
return certfile
def get_root_PKCS12(self):
return serialization.pkcs12.serialize_key_and_certificates(
name=b"root",
key=self.key,
cert=self.cert,
cas=None,
encryption_algorithm=serialization.NoEncryption()
)
def _make_cert(self, certname):
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, certname),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
self.key.public_key()
).serial_number(
random.randint(0, 2**64 - 1)
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(seconds=self.cert_not_after)
).add_extension(
x509.BasicConstraints(ca=True, path_length=0), critical=True,
).add_extension(
x509.KeyUsage(key_cert_sign=True, crl_sign=True, digital_signature=False,
content_commitment=False, key_encipherment=False,
data_encipherment=False, key_agreement=False, encipher_only=False,
decipher_only=False), critical=True
).add_extension(
x509.SubjectKeyIdentifier.from_public_key(self.key.public_key()), critical=False
).sign(self.key, DEF_HASH_FUNC, default_backend())
return cert
def generate_ca_root(self, ca_file, ca_name, hash_func=DEF_HASH_FUNC):
# Generate key
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Generate cert
self.key = key
cert = self._make_cert(ca_name)
# Write cert + key
self.write_pem(ca_file, cert, key)
return cert, key
def generate_host_cert(self, host, root_cert, root_key, host_filename,
wildcard=False, hash_func=DEF_HASH_FUNC):
host = host.encode('utf-8')
# Generate CSR
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, host.decode('utf-8')),
])
).sign(self.key, hash_func, default_backend())
# Generate Cert
cert_builder = x509.CertificateBuilder().subject_name(
csr.subject
).issuer_name(
root_cert.subject
).public_key(
csr.public_key()
).serial_number(
random.randint(0, 2**64 - 1)
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(seconds=self.cert_not_after)
)
if wildcard:
cert_builder = cert_builder.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(host.decode('utf-8')),
x509.DNSName('*.' + host.decode('utf-8')),
]),
critical=False,
)
cert = cert_builder.sign(root_key, hash_func, default_backend())
# Write cert + key
self.write_pem(host_filename, cert, self.key)
return cert, self.key
def write_pem(self, filename, cert, key):
with open(filename, 'wb+') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
f.write(cert.public_bytes(serialization.Encoding.PEM))
def read_pem(self, filename):
with open(filename, 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read(), default_backend())
f.seek(0)
key = serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
return cert, key
# =================================================================
def main(args=None):
parser = ArgumentParser(description='Certificate Authority Cert Maker Tools')
parser.add_argument('root_ca_cert',
help='Path to existing or new root CA file')
parser.add_argument('-c', '--certname', action='store', default=CERT_NAME,
help='Name for root certificate')
parser.add_argument('-n', '--hostname',
help='Hostname certificate to create')
parser.add_argument('-d', '--certs-dir', default=CERTS_DIR,
help='Directory for host certificates')
parser.add_argument('-f', '--force', action='store_true',
help='Overwrite certificates if they already exist')
parser.add_argument('-w', '--wildcard_cert', action='store_true',
help='add wildcard SAN to host: *.<host>, <host>')
r = parser.parse_args(args=args)
certs_dir = r.certs_dir
wildcard = r.wildcard_cert
root_cert = r.root_ca_cert
hostname = r.hostname
if not hostname:
overwrite = r.force
else:
overwrite = False
ca = CertificateAuthority(ca_file=root_cert,
certs_dir=r.certs_dir,
ca_name=r.certname,
overwrite=overwrite)
# Just creating the root cert
if not hostname:
if ca._file_created:
print('Created new root cert: "' + root_cert + '"')
return 0
else:
print('Root cert "' + root_cert +
'" already exists,' + ' use -f to overwrite')
return 1
# Sign a certificate for a given host
overwrite = r.force
host_filename = ca.cert_for_host(hostname,
overwrite, wildcard)
if ca._file_created:
print('Created new cert "' + hostname +
'" signed by root cert ' +
root_cert)
return 0
else:
print('Cert for "' + hostname + '" already exists,' +
' use -f to overwrite')
return 1
if __name__ == "__main__": #pragma: no cover
main()

View File

@ -31,12 +31,12 @@ import sys
import gc
import datetime
import warcprox
import certauth
import functools
import doublethink
import importlib
import queue
import socket
import os
class Factory:
@staticmethod
@ -110,7 +110,7 @@ class Factory:
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
return plugin
except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e)
logging.fatal('problem with plugin class %r', qualname, exc_info=1)
sys.exit(1)
@staticmethod

View File

@ -25,6 +25,8 @@ import json
import os
import warcprox
import socket
import rfc3986
from urllib3.exceptions import TimeoutError, HTTPError, NewConnectionError, MaxRetryError
class CrawlLogger(object):
def __init__(self, dir_, options=warcprox.Options()):
@ -40,7 +42,12 @@ class CrawlLogger(object):
def notify(self, recorded_url, records):
# 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"}
now = datetime.datetime.utcnow()
extra_info = {'contentSize': recorded_url.size,}
status = self.get_artificial_status(recorded_url)
extra_info = {'contentSize': recorded_url.size,} if recorded_url.size is not None and recorded_url.size > 0 else {}
if hasattr(recorded_url, 'exception') and recorded_url.exception is not None:
extra_info['exception'] = str(recorded_url.exception).replace(" ", "_")
if(hasattr(recorded_url, 'message') and recorded_url.message is not None):
extra_info['exceptionMessage'] = str(recorded_url.message).replace(" ", "_")
if records:
extra_info['warcFilename'] = records[0].warc_filename
extra_info['warcFileOffset'] = records[0].offset
@ -51,23 +58,50 @@ class CrawlLogger(object):
payload_digest = warcprox.digest_str(
recorded_url.payload_digest,
self.options.base32)
else:
elif records is not None and len(records) > 0:
# WARCPROX_WRITE_RECORD request
content_length = int(records[0].get_header(b'Content-Length'))
payload_digest = records[0].get_header(b'WARC-Payload-Digest')
else:
content_length = 0
payload_digest = '-'
logging.info('warcprox_meta %s' , recorded_url.warcprox_meta)
hop_path = recorded_url.warcprox_meta.get('metadata', {}).get('hop_path')
#URLs are url encoded into plain ascii urls by HTTP spec. Since we're comparing against those, our urls sent over the json blob need to be encoded similarly
brozzled_url = canonicalize_url(recorded_url.warcprox_meta.get('metadata', {}).get('brozzled_url'))
hop_via_url = canonicalize_url(recorded_url.warcprox_meta.get('metadata', {}).get('hop_via_url'))
if hop_path is None and brozzled_url is None and hop_via_url is None:
#No hop info headers provided
hop_path = "-"
via_url = recorded_url.referer or '-'
else:
if hop_path is None:
hop_path = "-"
if hop_via_url is None:
hop_via_url = "-"
#Prefer referer header. Otherwise use provided via_url
via_url = recorded_url.referer or hop_via_url if hop_path != "-" else "-"
logging.info('brozzled_url:%s recorded_url:%s' , brozzled_url, recorded_url.url)
if brozzled_url != recorded_url.url.decode('ascii') and "brozzled_url" in recorded_url.warcprox_meta.get('metadata', {}).keys():
#Requested page is not the Brozzled url, thus we are an embed or redirect.
via_url = brozzled_url
hop_path = "B" if hop_path == "-" else "".join([hop_path,"B"])
fields = [
'{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'% 5s' % recorded_url.status,
'% 5s' % status,
'% 10s' % content_length,
recorded_url.url,
'-', # hop path
recorded_url.referer or '-',
recorded_url.mimetype or '-',
hop_path,
via_url,
recorded_url.mimetype if recorded_url.mimetype is not None and recorded_url.mimetype.strip() else '-',
'-',
'{:%Y%m%d%H%M%S}{:03d}+{:03d}'.format(
recorded_url.timestamp,
recorded_url.timestamp.microsecond//1000,
recorded_url.duration.microseconds//1000),
recorded_url.duration.microseconds//1000) if (recorded_url.timestamp is not None and recorded_url.duration is not None) else '-',
payload_digest,
recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'),
'duplicate:digest' if records and records[0].type == b'revisit' else '-',
@ -80,7 +114,6 @@ class CrawlLogger(object):
except:
pass
line = b' '.join(fields) + b'\n'
prefix = recorded_url.warcprox_meta.get('warc-prefix', 'crawl')
filename = '%s-%s-%s.log' % (
prefix, self.hostname, self.options.server_port)
@ -89,3 +122,43 @@ class CrawlLogger(object):
with open(crawl_log_path, 'ab') as f:
f.write(line)
def get_artificial_status(self, recorded_url):
# urllib3 Does not specify DNS errors. We must parse them from the exception string.
# Unfortunately, the errors are reported differently on different systems.
# https://stackoverflow.com/questions/40145631
if hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (MaxRetryError, )):
return '-8'
elif hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (NewConnectionError, )):
exception_string=str(recorded_url.exception)
if ("[Errno 11001] getaddrinfo failed" in exception_string or # Windows
"[Errno -2] Name or service not known" in exception_string or # Linux
"[Errno -3] Temporary failure in name resolution" in exception_string or # Linux
"[Errno 8] nodename nor servname " in exception_string): # OS X
return '-6' # DNS Failure
else:
return '-2' # Other Connection Failure
elif hasattr(recorded_url, 'exception') and isinstance(recorded_url.exception, (socket.timeout, TimeoutError, )):
return '-2' # Connection Timeout
elif isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
# synthetic status, used when some other status (such as connection-lost)
# is considered by policy the same as a document-not-found
# Cached failures result in FailedUrl with no Exception
return '-404'
else:
return recorded_url.status
def canonicalize_url(url):
#URL needs to be split out to separately encode the hostname from the rest of the path.
#hostname will be idna encoded (punycode)
#The rest of the URL will be urlencoded, but browsers only encode "unsafe" and not "reserved" characters, so ignore the reserved chars.
if url is None or url == '-' or url == '':
return url
try:
parsed_url=rfc3986.urlparse(url)
encoded_url=parsed_url.copy_with(host=parsed_url.host.encode('idna'))
return encoded_url.unsplit()
except (TypeError, ValueError, AttributeError) as e:
logging.warning("URL Canonicalization failure. Returning raw url: rfc3986 %s - %s", url, e)
return url

View File

@ -1,7 +1,7 @@
'''
warcprox/dedup.py - identical payload digest deduplication using sqlite db
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2021 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -26,7 +26,6 @@ import os
import json
from hanzo import warctools
import warcprox
import warcprox.trough
import sqlite3
import doublethink
import datetime
@ -47,11 +46,15 @@ class DedupableMixin(object):
def should_dedup(self, recorded_url):
"""Check if we should try to run dedup on resource based on payload
size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-bucket` is required
When we use option --dedup-only-with-bucket, `dedup-buckets` is required
in Warcprox-Meta to perform dedup.
If recorded_url.do_not_archive is True, we skip dedup. This record will
not be written to WARC anyway.
Return Boolean.
"""
if self.dedup_only_with_bucket and "dedup-bucket" not in recorded_url.warcprox_meta:
if recorded_url.do_not_archive:
return False
if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta:
return False
if recorded_url.is_text():
return recorded_url.response_recorder.payload_size() > self.min_text_size
@ -65,14 +68,19 @@ class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin):
self.dedup_db = dedup_db
def _process_url(self, recorded_url):
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
return
if (recorded_url.response_recorder
and recorded_url.payload_digest
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, recorded_url.warcprox_meta["dedup-bucket"],
recorded_url.url)
if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, bucket, recorded_url.url)
if recorded_url.dedup_info:
# we found an existing capture
break
else:
recorded_url.dedup_info = self.dedup_db.lookup(
digest_key, url=recorded_url.url)
@ -148,10 +156,12 @@ class DedupDb(DedupableMixin):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta["dedup-bucket"])
if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == "ro":
self.save(
digest_key, records[0],
bucket=bucket)
else:
self.save(digest_key, records[0])
@ -213,8 +223,10 @@ class RethinkDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "dedup-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["dedup-bucket"])
if recorded_url.warcprox_meta and "dedup-buckets" in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
self.save(digest_key, records[0], bucket=bucket)
else:
self.save(digest_key, records[0])
@ -259,6 +271,9 @@ class CdxServerDedup(DedupDb):
performance optimisation to handle that. limit < 0 is very inefficient
in general. Maybe it could be configurable in the future.
Skip dedup for URLs with session params. These URLs are certainly
unique and highly volatile, we cannot dedup them.
:param digest_key: b'sha1:<KEY-VALUE>' (prefix is optional).
Example: b'sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A'
:param url: Target URL string
@ -267,6 +282,8 @@ class CdxServerDedup(DedupDb):
"""
u = url.decode("utf-8") if isinstance(url, bytes) else url
try:
if any(s in u for s in ('JSESSIONID=', 'session=', 'sess=')):
return None
result = self.http_pool.request('GET', self.cdx_url, fields=dict(
url=u, fl="timestamp,digest", filter="!mimetype:warc/revisit",
limit=-1))
@ -347,11 +364,12 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
and recorded_url.warc_records[0].type == b'response'
and self.trough_dedup_db.should_dedup(recorded_url)):
if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket']
and 'dedup-buckets' in recorded_url.warcprox_meta):
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
buckets[bucket].append(recorded_url)
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
buckets['__unspecified__'].append(recorded_url)
return buckets
def _process_batch(self, batch):
@ -366,6 +384,9 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
self.trough_dedup_db.batch_save,
buckets[bucket], bucket)
fs[future] = bucket
logging.debug(
'storing dedup info for %s urls '
'in bucket %s', len(buckets[bucket]), bucket)
# wait for results
try:
@ -394,21 +415,32 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
'''
buckets = collections.defaultdict(list)
discards = []
# for duplicate checks, see https://webarchive.jira.com/browse/WT-31
hash_plus_urls = set()
for recorded_url in batch:
if not recorded_url.payload_digest:
discards.append('n/a')
continue
payload_hash = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
hash_plus_url = b''.join((payload_hash, recorded_url.url))
if (recorded_url.response_recorder
and recorded_url.payload_digest
and hash_plus_url not in hash_plus_urls
and self.trough_dedup_db.should_dedup(recorded_url)):
hash_plus_urls.add(hash_plus_url)
if (recorded_url.warcprox_meta
and 'dedup-bucket' in recorded_url.warcprox_meta):
bucket = recorded_url.warcprox_meta['dedup-bucket']
and 'dedup-buckets' in recorded_url.warcprox_meta):
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
buckets[bucket].append(recorded_url)
else:
bucket = '__unspecified__'
buckets[bucket].append(recorded_url)
buckets['__unspecified__'].append(recorded_url)
else:
discards.append(
warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.payload_digest else 'n/a')
if hash_plus_url in hash_plus_urls:
self.logger.debug(
'discarding duplicate and setting do_not_archive for %s, hash %s',
recorded_url.url, payload_hash)
recorded_url.do_not_archive = True
discards.append(payload_hash)
self.logger.debug(
'len(batch)=%s len(discards)=%s buckets=%s',
len(batch), len(discards),
@ -487,16 +519,24 @@ class TroughDedupDb(DedupDb, DedupableMixin):
SCHEMA_SQL = ('create table dedup (\n'
' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' date varchar(100) not null,\n'
' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n'
'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()):
try:
import trough.client
except ImportError as e:
logging.critical(
'%s: %s\n\nYou might need to run "pip install '
'warcprox[trough]".', type(e).__name__, e)
sys.exit(1)
DedupableMixin.__init__(self, options)
self.options = options
self._trough_cli = warcprox.trough.TroughClient(
self._trough_cli = trough.client.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60)
def loader(self, *args, **kwargs):
@ -518,9 +558,13 @@ class TroughDedupDb(DedupDb, DedupableMixin):
record_id = response_record.get_header(warctools.WarcRecord.ID)
url = response_record.get_header(warctools.WarcRecord.URL)
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
self._trough_cli.write(
bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
try:
self._trough_cli.write(
bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
except:
self.logger.warning(
'problem posting dedup data to trough', exc_info=True)
def batch_save(self, batch, bucket='__unspecified__'):
sql_tmpl = ('insert or ignore into dedup\n'
@ -535,12 +579,22 @@ class TroughDedupDb(DedupDb, DedupableMixin):
recorded_url.url,
recorded_url.warc_records[0].date,
recorded_url.warc_records[0].id,])
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID)
try:
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID)
except:
self.logger.warning(
'problem posting dedup data to trough', exc_info=True)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;',
(digest_key,))
try:
results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;',
(digest_key,))
except:
self.logger.warning(
'problem reading dedup data from trough', exc_info=True)
return None
if results:
assert len(results) == 1 # sanity check (digest_key is primary key)
result = results[0]
@ -557,7 +611,14 @@ class TroughDedupDb(DedupDb, DedupableMixin):
'''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]'''
sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
','.join('%s' for i in range(len(digest_keys))))
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
try:
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
except:
self.logger.warning(
'problem reading dedup data from trough', exc_info=True)
results = None
if results is None:
return []
self.logger.debug(
@ -576,9 +637,11 @@ class TroughDedupDb(DedupDb, DedupableMixin):
and self.should_dedup(recorded_url)):
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and 'dedup-bucket' in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
bucket=recorded_url.warcprox_meta['dedup-bucket'])
if recorded_url.warcprox_meta and 'dedup-buckets' in recorded_url.warcprox_meta:
for bucket, bucket_mode in recorded_url.warcprox_meta["dedup-buckets"].items():
if not bucket_mode == 'ro':
self.save(
digest_key, records[0],
bucket=bucket)
else:
self.save(digest_key, records[0])

View File

@ -39,7 +39,6 @@ import socket
import traceback
import signal
import threading
import certauth.certauth
import yaml
import warcprox
import doublethink
@ -91,9 +90,11 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
help='where to store and load generated certificates')
arg_parser.add_argument('-d', '--dir', dest='directory',
default='./warcs', help='where to write warcs')
arg_parser.add_argument('--subdir-prefix', dest='subdir_prefix', action='store_true',
help='write warcs to --dir subdir equal to the current warc-prefix'),
arg_parser.add_argument('--warc-filename', dest='warc_filename',
default='{prefix}-{timestamp17}-{serialno}-{randomtoken}',
help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}')
help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}, {port}')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records')
hidden.add_argument(
@ -207,6 +208,15 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
default=None, help=(
'host:port of tor socks proxy, used only to connect to '
'.onion sites'))
arg_parser.add_argument(
'--socks-proxy', dest='socks_proxy',
default=None, help='host:port of socks proxy, used for all traffic if activated')
arg_parser.add_argument(
'--socks-proxy-username', dest='socks_proxy_username',
default=None, help='optional socks proxy username')
arg_parser.add_argument(
'--socks-proxy-password', dest='socks_proxy_password',
default=None, help='optional socks proxy password')
hidden.add_argument(
'--socket-timeout', dest='socket_timeout', type=float, default=60,
help=suppress(
@ -302,6 +312,7 @@ def main(argv=None):
else:
loglevel = logging.INFO
logging.root.handlers = []
logging.basicConfig(
stream=sys.stdout, level=loglevel, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s '

View File

@ -64,6 +64,7 @@ import ssl
import warcprox
import threading
import datetime
import random
import socks
import tempfile
import hashlib
@ -78,11 +79,12 @@ import collections
import cProfile
from urllib3 import PoolManager
from urllib3.util import is_connection_dropped
from urllib3.exceptions import TimeoutError, HTTPError
from urllib3.exceptions import TimeoutError, HTTPError, NewConnectionError
import doublethink
from cachetools import TTLCache
from threading import RLock
from certauth.certauth import CertificateAuthority
from .certauth import CertificateAuthority
class ProxyingRecorder(object):
"""
@ -220,6 +222,28 @@ def via_header_value(orig, request_version):
via = via + '%s %s' % (request_version, 'warcprox')
return via
# Ref and detailed description about cipher selection at
# https://github.com/urllib3/urllib3/blob/f070ec2e6f6c545f40d9196e5246df10c72e48e1/src/urllib3/util/ssl_.py#L170
SSL_CIPHERS = [
"ECDHE+AESGCM",
"ECDHE+CHACHA20",
"DH+AESGCM",
"ECDH+AES",
"DH+AES",
"RSA+AESGCM",
"RSA+AES",
"!aNULL",
"!eNULL",
"!MD5",
"!DSS",
"!AESCCM",
"DHE+AESGCM",
"DHE+CHACHA20",
"ECDH+AESGCM",
]
class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
'''
An http proxy implementation of BaseHTTPRequestHandler, that acts as a
@ -233,6 +257,10 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
_tmp_file_max_memory_size = 512 * 1024
onion_tor_socks_proxy_host = None
onion_tor_socks_proxy_port = None
socks_proxy_host = None
socks_proxy_port = None
socks_proxy_username = None
socks_proxy_password = None
def __init__(self, request, client_address, server):
threading.current_thread().name = 'MitmProxyHandler(tid={},started={},client={}:{})'.format(warcprox.gettid(), datetime.datetime.utcnow().isoformat(), client_address[0], client_address[1])
@ -276,6 +304,8 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
host=self.hostname, port=int(self.port), scheme='http',
pool_kwargs={'maxsize': 12, 'timeout': self._socket_timeout})
remote_ip = None
self._remote_server_conn = self._conn_pool._get_conn()
if is_connection_dropped(self._remote_server_conn):
if self.onion_tor_socks_proxy_host and self.hostname.endswith('.onion'):
@ -289,8 +319,21 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
port=self.onion_tor_socks_proxy_port, rdns=True)
self._remote_server_conn.sock.settimeout(self._socket_timeout)
self._remote_server_conn.sock.connect((self.hostname, int(self.port)))
elif self.socks_proxy_host and self.socks_proxy_port:
self.logger.info(
"using socks proxy at %s:%s to connect to %s",
self.socks_proxy_host, self.socks_proxy_port, self.hostname)
self._remote_server_conn.sock = socks.socksocket()
self._remote_server_conn.sock.set_proxy(
socks.SOCKS5, addr=self.socks_proxy_host,
port=self.socks_proxy_port, rdns=True,
username=self.socks_proxy_username,
password=self.socks_proxy_password)
self._remote_server_conn.sock.settimeout(self._socket_timeout)
self._remote_server_conn.sock.connect((self.hostname, int(self.port)))
else:
self._remote_server_conn.connect()
remote_ip = self._remote_server_conn.sock.getpeername()[0]
# Wrap socket if SSL is required
if self.is_connect:
@ -298,6 +341,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# randomize TLS fingerprint to evade anti-web-bot systems
random.shuffle(SSL_CIPHERS)
context.set_ciphers(":".join(SSL_CIPHERS))
self._remote_server_conn.sock = context.wrap_socket(
self._remote_server_conn.sock,
server_hostname=self.hostname)
@ -312,6 +358,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
"consider upgrading to python 2.7.9+ or 3.4+",
self.hostname)
raise
except ssl.SSLError as e:
self.logger.error(
'error connecting to %s (%s) port %s: %s',
self.hostname, remote_ip, self.port, e)
raise
return self._remote_server_conn.sock
def _transition_to_ssl(self):
@ -351,7 +402,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error(
"problem handling %r: %r", self.requestline, e)
if type(e) is socket.timeout:
self.send_error(504, str(e))
self.send_error(504, str(e), exception=e)
else:
self.send_error(500, str(e))
except Exception as f:
@ -399,7 +450,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
cached = self.server.bad_hostnames_ports.get(hostname_port)
if cached:
self.logger.info('Cannot connect to %s (cache)', hostname_port)
self.send_error(cached)
self.send_error(cached, exception=Exception('Cached Failed Connection'))
return
# Connect to destination
self._connect_to_remote_server()
@ -432,7 +483,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.logger.error(
"problem processing request %r: %r",
self.requestline, e, exc_info=True)
self.send_error(response_code)
self.send_error(response_code, exception=e)
return
try:
@ -450,7 +501,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.send_error(502)
return
def send_error(self, code, message=None, explain=None):
def send_error(self, code, message=None, explain=None, exception=None):
# BaseHTTPRequestHandler.send_response_only() in http/server.py
# does this:
# if not hasattr(self, '_headers_buffer'):
@ -481,6 +532,33 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
self.server.unregister_remote_server_sock(
self._remote_server_conn.sock)
def _swallow_hop_by_hop_headers(self):
'''
Swallow headers that don't make sense to forward on, i.e.
most hop-by-hop headers.
http://tools.ietf.org/html/rfc2616#section-13.5.
'''
# self.headers is an email.message.Message, which is case-insensitive
# and doesn't throw KeyError in __delitem__
for key in (
'Warcprox-Meta', 'Connection', 'Proxy-Connection', 'Keep-Alive',
'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'):
del self.headers[key]
def _build_request(self):
req_str = '{} {} {}\r\n'.format(
self.command, self.path, self.request_version)
# Add headers to the request
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join(
'{}: {}'.format(k,v) for (k,v) in self.headers.items())
req = req_str.encode('latin1') + b'\r\n\r\n'
return req
def _inner_proxy_request(self, extra_response_headers={}):
'''
Sends the request to the remote server, then uses a ProxyingRecorder to
@ -492,29 +570,11 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
It may contain extra HTTP headers such as ``Warcprox-Meta`` which
are written in the WARC record for this request.
'''
# Build request
req_str = '{} {} {}\r\n'.format(
self.command, self.path, self.request_version)
# Swallow headers that don't make sense to forward on, i.e. most
# hop-by-hop headers. http://tools.ietf.org/html/rfc2616#section-13.5.
# self.headers is an email.message.Message, which is case-insensitive
# and doesn't throw KeyError in __delitem__
for key in (
'Connection', 'Proxy-Connection', 'Keep-Alive',
'Proxy-Authenticate', 'Proxy-Authorization', 'Upgrade'):
del self.headers[key]
self._swallow_hop_by_hop_headers()
self.headers['Via'] = via_header_value(
self.headers.get('Via'),
self.request_version.replace('HTTP/', ''))
# Add headers to the request
# XXX in at least python3.3 str(self.headers) uses \n not \r\n :(
req_str += '\r\n'.join(
'{}: {}'.format(k,v) for (k,v) in self.headers.items())
req = req_str.encode('latin1') + b'\r\n\r\n'
req = self._build_request()
# Append message body if present to the request
if 'Content-Length' in self.headers:
@ -540,7 +600,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
try:
buf = prox_rec_res.read(65536)
except http_client.IncompleteRead as e:
self.logger.warn('%s from %s', e, self.url)
self.logger.warning('%s from %s', e, self.url)
buf = e.partial
if (self._max_resource_size and
@ -751,7 +811,7 @@ class PooledMitmProxy(PooledMixIn, MitmProxy):
Abort active connections to remote servers to achieve prompt shutdown.
'''
self.shutting_down = True
for sock in self.remote_server_socks:
for sock in list(self.remote_server_socks):
self.shutdown_request(sock)
class SingleThreadedMitmProxy(http_server.HTTPServer):
@ -769,7 +829,7 @@ class SingleThreadedMitmProxy(http_server.HTTPServer):
self.bad_hostnames_ports_lock = RLock()
self.remote_connection_pool = PoolManager(
num_pools=max((options.max_threads or 0) // 6, 400))
num_pools=max((options.max_threads or 0) // 6, 400), maxsize=6)
if options.onion_tor_socks_proxy:
try:
@ -779,6 +839,14 @@ class SingleThreadedMitmProxy(http_server.HTTPServer):
except ValueError:
MitmProxyHandlerClass.onion_tor_socks_proxy_host = options.onion_tor_socks_proxy
MitmProxyHandlerClass.onion_tor_socks_proxy_port = None
if options.socks_proxy:
host, port = options.socks_proxy.split(':')
MitmProxyHandlerClass.socks_proxy_host = host
MitmProxyHandlerClass.socks_proxy_port = int(port)
if options.socks_proxy_username:
MitmProxyHandlerClass.socks_proxy_username = options.socks_proxy_username
if options.socks_proxy_password:
MitmProxyHandlerClass.socks_proxy_password = options.socks_proxy_password
if options.socket_timeout:
MitmProxyHandlerClass._socket_timeout = options.socket_timeout

View File

@ -29,7 +29,7 @@ import doublethink
import json
import logging
import os
import rethinkdb as r
from rethinkdb import RethinkDB; r = RethinkDB()
import sqlite3
import threading
import time
@ -162,6 +162,8 @@ class StatsProcessor(warcprox.BaseBatchPostfetchProcessor):
def _tally_batch(self, batch):
batch_buckets = {}
for recorded_url in batch:
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
continue
for bucket in self.buckets(recorded_url):
bucket_stats = batch_buckets.get(bucket)
if not bucket_stats:
@ -297,6 +299,8 @@ class RunningStats:
(self.first_snap_time - 120 + i * 10, 0, 0))
def notify(self, recorded_url, records):
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
return
with self._lock:
self.urls += 1
if records:

View File

@ -1,246 +0,0 @@
'''
warcprox/trough.py - trough client code
Copyright (C) 2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import os
import json
import requests
import doublethink
import rethinkdb as r
import datetime
import threading
import time
class TroughClient(object):
logger = logging.getLogger("warcprox.trough.TroughClient")
def __init__(self, rethinkdb_trough_db_url, promotion_interval=None):
'''
TroughClient constructor
Args:
rethinkdb_trough_db_url: url with schema rethinkdb:// pointing to
trough configuration database
promotion_interval: if specified, `TroughClient` will spawn a
thread that "promotes" (pushed to hdfs) "dirty" trough segments
(segments that have received writes) periodically, sleeping for
`promotion_interval` seconds between cycles (default None)
'''
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
self._write_url_cache = {}
self._read_url_cache = {}
self._dirty_segments = set()
self._dirty_segments_lock = threading.RLock()
self.promotion_interval = promotion_interval
self._promoter_thread = None
if promotion_interval:
self._promoter_thread = threading.Thread(
target=self._promotrix, name='TroughClient-promoter')
self._promoter_thread.setDaemon(True)
self._promoter_thread.start()
def _promotrix(self):
while True:
time.sleep(self.promotion_interval)
try:
with self._dirty_segments_lock:
dirty_segments = list(self._dirty_segments)
self._dirty_segments.clear()
logging.info(
'promoting %s trough segments', len(dirty_segments))
for segment_id in dirty_segments:
try:
self.promote(segment_id)
except:
logging.error(
'problem promoting segment %s', segment_id,
exc_info=True)
except:
logging.error(
'caught exception doing segment promotion',
exc_info=True)
def promote(self, segment_id):
url = os.path.join(self.segment_manager_url(), 'promote')
payload_dict = {'segment': segment_id}
response = requests.post(url, json=payload_dict, timeout=21600)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, url,
json.dumps(payload_dict)))
@staticmethod
def sql_value(x):
if x is None:
return 'null'
elif isinstance(x, datetime.datetime):
return 'datetime(%r)' % x.isoformat()
elif isinstance(x, bool):
return int(x)
elif isinstance(x, str) or isinstance(x, bytes):
# the only character that needs escaped in sqlite string literals
# is single-quote, which is escaped as two single-quotes
if isinstance(x, bytes):
s = x.decode('utf-8')
else:
s = x
return "'" + s.replace("'", "''") + "'"
elif isinstance(x, (int, float)):
return x
else:
raise Exception(
"don't know how to make an sql value from %r (%r)" % (
x, type(x)))
def segment_manager_url(self):
master_node = self.svcreg.unique_service('trough-sync-master')
assert master_node
return master_node['url']
def write_url_nocache(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict, timeout=600)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url_nocache(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
self.logger.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def write_url(self, segment_id, schema_id='default'):
if not segment_id in self._write_url_cache:
self._write_url_cache[segment_id] = self.write_url_nocache(
segment_id, schema_id)
self.logger.info(
'segment %r write url is %r', segment_id,
self._write_url_cache[segment_id])
return self._write_url_cache[segment_id]
def read_url(self, segment_id):
if not self._read_url_cache.get(segment_id):
self._read_url_cache[segment_id] = self.read_url_nocache(segment_id)
self.logger.info(
'segment %r read url is %r', segment_id,
self._read_url_cache[segment_id])
return self._read_url_cache[segment_id]
def write(self, segment_id, sql_tmpl, values=(), schema_id='default'):
write_url = self.write_url(segment_id, schema_id)
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
sql_bytes = sql.encode('utf-8')
try:
response = requests.post(
write_url, sql_bytes, timeout=600,
headers={'content-type': 'application/sql;charset=utf-8'})
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %r' % (
response.status_code, response.text, write_url, sql))
if segment_id not in self._dirty_segments:
with self._dirty_segments_lock:
self._dirty_segments.add(segment_id)
except:
self._write_url_cache.pop(segment_id, None)
self.logger.error(
'problem with trough write url %r', write_url,
exc_info=True)
return
if response.status_code != 200:
self._write_url_cache.pop(segment_id, None)
self.logger.warning(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
write_url, sql)
return
self.logger.debug('posted to %s: %r', write_url, sql)
def read(self, segment_id, sql_tmpl, values=()):
read_url = self.read_url(segment_id)
if not read_url:
return None
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
sql_bytes = sql.encode('utf-8')
try:
response = requests.post(
read_url, sql_bytes, timeout=600,
headers={'content-type': 'application/sql;charset=utf-8'})
except:
self._read_url_cache.pop(segment_id, None)
self.logger.error(
'problem with trough read url %r', read_url, exc_info=True)
return None
if response.status_code != 200:
self._read_url_cache.pop(segment_id, None)
self.logger.warn(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
read_url, sql)
return None
self.logger.trace(
'got %r from posting query %r to %r', response.text, sql,
read_url)
results = json.loads(response.text)
return results
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url, timeout=60)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = os.path.join(
self.segment_manager_url(), 'schema', schema_id, 'sql')
response = requests.put(url, sql, timeout=600)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))

View File

@ -2,7 +2,7 @@
warcprox/warcproxy.py - recording proxy, extends mitmproxy to record traffic,
enqueue info on the recorded url queue
Copyright (C) 2013-2018 Internet Archive
Copyright (C) 2013-2022 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -46,6 +46,8 @@ import tempfile
import hashlib
import doublethink
import re
import zlib
import base64
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'''
@ -175,6 +177,13 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
warcprox_meta = json.loads(self.headers['Warcprox-Meta'])
self._security_check(warcprox_meta)
self._enforce_limits(warcprox_meta)
if 'compressed_blocks' in warcprox_meta:
# b64decode and decompress
blocks_decompressed = zlib.decompress(base64.b64decode(warcprox_meta['compressed_blocks']))
# decode() and json.loads
warcprox_meta['blocks'] = json.loads(blocks_decompressed.decode())
# delete compressed_blocks (just in case?)
del warcprox_meta['compressed_blocks']
self._enforce_blocks(warcprox_meta)
def _connect_to_remote_server(self):
@ -188,16 +197,21 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self._enforce_limits_and_blocks()
return warcprox.mitmproxy.MitmProxyHandler._connect_to_remote_server(self)
def _proxy_request(self):
warcprox_meta = None
def _parse_warcprox_meta(self):
'''
:return: Warcprox-Meta request header value as a dictionary, or None
'''
raw_warcprox_meta = self.headers.get('Warcprox-Meta')
self.logger.trace(
'request for %s Warcprox-Meta header: %s', self.url,
raw_warcprox_meta)
'request for %s Warcprox-Meta header: %s', self.url,
raw_warcprox_meta)
if raw_warcprox_meta:
warcprox_meta = json.loads(raw_warcprox_meta)
del self.headers['Warcprox-Meta']
return json.loads(raw_warcprox_meta)
else:
return None
def _proxy_request(self):
warcprox_meta = self._parse_warcprox_meta()
remote_ip = self._remote_server_conn.sock.getpeername()[0]
timestamp = doublethink.utcnow()
extra_response_headers = {}
@ -344,16 +358,43 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
self.logger.error("uncaught exception in do_WARCPROX_WRITE_RECORD", exc_info=True)
raise
def send_error(self, code, message=None, explain=None, exception=None):
super().send_error(code, message=message, explain=explain, exception=exception)
# If error happens during CONNECT handling and before the inner request, self.url
# is unset, and self.path is something like 'example.com:443'
urlish = self.url or self.path
warcprox_meta = self._parse_warcprox_meta()
self._swallow_hop_by_hop_headers()
request_data = self._build_request()
failed_url = FailedUrl(
url=urlish,
request_data=request_data,
warcprox_meta=warcprox_meta,
status=code,
client_ip=self.client_address[0],
method=self.command,
timestamp=doublethink.utcnow(),
host=self.hostname,
duration=None,
referer=self.headers.get('referer'),
do_not_archive=True,
message=message,
exception=exception)
self.server.recorded_url_q.put(failed_url)
def log_message(self, fmt, *args):
# logging better handled elsewhere?
pass
RE_MIMETYPE = re.compile(r'[;\s]')
class RecordedUrl:
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
def __init__(self, url, request_data, response_recorder, remote_ip,
class RequestedUrl:
logger = logging.getLogger("warcprox.warcproxy.RequestedUrl")
def __init__(self, url, request_data, response_recorder=None, remote_ip=None,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None,
@ -366,19 +407,20 @@ class RecordedUrl:
else:
self.url = url
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
self.request_data = request_data
self.response_recorder = response_recorder
if warcprox_meta:
if 'captures-bucket' in warcprox_meta:
# backward compatibility
warcprox_meta['dedup-bucket'] = warcprox_meta['captures-bucket']
warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['captures-bucket']] = 'rw'
del warcprox_meta['captures-bucket']
if 'dedup-bucket' in warcprox_meta:
# more backwards compatibility
warcprox_meta['dedup-buckets'] = {}
warcprox_meta['dedup-buckets'][warcprox_meta['dedup-bucket']] = 'rw'
del warcprox_meta['dedup-bucket']
self.warcprox_meta = warcprox_meta
else:
self.warcprox_meta = {}
@ -404,6 +446,43 @@ class RecordedUrl:
self.warc_records = warc_records
self.do_not_archive = do_not_archive
class FailedUrl(RequestedUrl):
logger = logging.getLogger("warcprox.warcproxy.FailedUrl")
def __init__(self, url, request_data, warcprox_meta=None, status=None,
client_ip=None, method=None, timestamp=None, host=None, duration=None,
referer=None, do_not_archive=True, message=None, exception=None):
super().__init__(url, request_data, warcprox_meta=warcprox_meta,
status=status, client_ip=client_ip, method=method,
timestamp=timestamp, host=host, duration=duration,
referer=referer, do_not_archive=do_not_archive)
self.message = message
self.exception = exception
class RecordedUrl(RequestedUrl):
logger = logging.getLogger("warcprox.warcproxy.RecordedUrl")
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None,
payload_digest=None, truncated=None, warc_records=None,
do_not_archive=False):
super().__init__(url, request_data, response_recorder=response_recorder,
warcprox_meta=warcprox_meta, content_type=content_type,
custom_type=custom_type, status=status, size=size, client_ip=client_ip,
method=method, timestamp=timestamp, host=host, duration=duration,
referer=referer, payload_digest=payload_digest, truncated=truncated,
warc_records=warc_records, do_not_archive=do_not_archive)
if type(remote_ip) is not bytes:
self.remote_ip = remote_ip.encode('ascii')
else:
self.remote_ip = remote_ip
def is_text(self):
"""Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Complete_list_of_MIME_types
Alternative method: try to decode('ascii') first N bytes to make sure

View File

@ -51,10 +51,14 @@ class WarcWriter:
self.finalname = None
self.gzip = options.gzip or False
self.prefix = options.prefix or 'warcprox'
self.port = options.port or 8000
self.open_suffix = '' if options.no_warc_open_suffix else '.open'
self.rollover_size = options.rollover_size or 1000000000
self.rollover_idle_time = options.rollover_idle_time or None
self.directory = options.directory or './warcs'
if options.subdir_prefix and options.prefix:
self.directory = os.path.sep.join([options.directory, options.prefix]) or './warcs'
else:
self.directory = options.directory or './warcs'
self.filename_template = options.warc_filename or \
'{prefix}-{timestamp17}-{randomtoken}-{serialno}'
self.last_activity = time.time()
@ -67,7 +71,7 @@ class WarcWriter:
"""WARC filename is configurable with CLI parameter --warc-filename.
Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
Available variables are: prefix, timestamp14, timestamp17, serialno,
randomtoken, hostname, shorthostname.
randomtoken, hostname, shorthostname, port.
Extension ``.warc`` or ``.warc.gz`` is appended automatically.
"""
hostname = socket.getfqdn()
@ -77,7 +81,7 @@ class WarcWriter:
timestamp17=warcprox.timestamp17(),
serialno='{:05d}'.format(serial),
randomtoken=self.randomtoken, hostname=hostname,
shorthostname=shorthostname)
shorthostname=shorthostname, port=self.port)
if self.gzip:
fname = fname + '.warc.gz'
else:
@ -167,14 +171,17 @@ class WarcWriter:
if self.open_suffix == '':
try:
fcntl.lockf(self.f, fcntl.LOCK_UN)
except IOError as exc:
except Exception as exc:
self.logger.error(
'could not unlock file %s (%s)', self.path, exc)
self.f.close()
finalpath = os.path.sep.join(
[self.directory, self.finalname])
os.rename(self.path, finalpath)
try:
self.f.close()
finalpath = os.path.sep.join(
[self.directory, self.finalname])
os.rename(self.path, finalpath)
except Exception as exc:
self.logger.error(
'could not close and rename file %s (%s)', self.path, exc)
self.path = None
self.f = None

View File

@ -72,6 +72,8 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
self.close_prefix_reqs.put(prefix)
def _process_url(self, recorded_url):
if isinstance(recorded_url, warcprox.warcproxy.FailedUrl):
return
try:
records = []
if self._should_archive(recorded_url):