Merge branch 'trough-dedup' into qa

* trough-dedup:
  py2 fix
  automatic segment promotion every hour
  move trough client into separate module
  pypy and pypy3 are passing at the moment, so why not :)
  more cleanly separate trough client code from the rest of TroughDedup
  update payload_digest reference in trough dedup for changes in commit 3a0f6e0947
  hopefully fix test failing occasionally apparently due to race condition by checking that the file we're waiting for has some content
  fix payload digest by pulling calculation up one level where content has already been transfer-decoded
  new failing test for correct calculation of payload digest
  missed a spot handling case of no warc records written
  eh, don't prefix sqlite filenames with 'warcprox-trough-'; logging tweaks
  not gonna bother figuring out why pypy regex is not matching https://travis-ci.org/internetarchive/warcprox/jobs/299864258#L615
  fix failing test just committed, which involves running "listeners" for all urls, including those not archived; make adjustments accordingly
  make test_crawl_log expect HEAD request to be logged
  fix crawl log handling of WARCPROX_WRITE_RECORD request
  modify test_crawl_log to expect crawl log to honor --base32 setting and add tests of WARCPROX_WRITE_RECORD request and HEAD request (not written to warc)
  bump dev version number
  add --crawl-log-dir option to fix failing test
This commit is contained in:
Noah Levitt 2017-11-15 17:29:53 -08:00
commit bf0f27c364
16 changed files with 610 additions and 244 deletions

View File

@ -13,7 +13,6 @@ python:
matrix:
allow_failures:
- python: pypy3
- python: nightly
- python: 3.7-dev
@ -40,7 +39,7 @@ before_install:
- ping -c2 trough
install:
- pip install . pytest requests warcio
- pip install . pytest requests warcio mock
before_script:
- ps ww -fHe

View File

@ -52,7 +52,7 @@ except:
setuptools.setup(
name='warcprox',
version='2.2.1b2.dev107',
version='2.2.1b2.dev115',
description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox',
author='Noah Levitt',

View File

@ -3,7 +3,7 @@
tests/single-threaded-proxy.py - single-threaded MITM proxy, useful for
debugging, does not write warcs
Copyright (C) 2015-2016 Internet Archive
Copyright (C) 2015-2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -46,7 +46,7 @@ class FakeQueue(object):
logging.info("{} {} {} {} {} size={} {}".format(
recorded_url.client_ip, recorded_url.status, recorded_url.method,
recorded_url.url.decode("utf-8"), recorded_url.mimetype,
recorded_url.size, warcprox.digest_str(recorded_url.response_recorder.payload_digest, False).decode('utf-8')))
recorded_url.size, warcprox.digest_str(recorded_url.payload_digest, False).decode('utf-8')))
def parse_args():
prog = os.path.basename(sys.argv[0])

View File

@ -46,6 +46,10 @@ from collections import Counter
import socket
import datetime
import warcio.archiveiterator
import io
import gzip
import mock
import email.message
try:
import http.server as http_server
@ -135,6 +139,24 @@ def dump_state(signum=None, frame=None):
signal.signal(signal.SIGQUIT, dump_state)
def chunkify(buf, chunk_size=13):
i = 0
result = b''
while i < len(buf):
chunk_len = min(len(buf) - i, chunk_size)
result += ('%x\r\n' % chunk_len).encode('ascii')
result += buf[i:i+chunk_len]
result += b'\r\n'
i += chunk_size
result += b'0\r\n\r\n'
return result
# def gzipify(buf):
# with io.BytesIO() as outbuf:
# with gzip.GzipFile(fileobj=outbuf, mode='wb') as gz:
# gz.write(buf)
# return outbuf.getvalue()
class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
def build_response(self):
m = re.match(r'^/([^/]+)/([^/]+)$', self.path)
@ -151,6 +173,71 @@ class _TestHttpRequestHandler(http_server.BaseHTTPRequestHandler):
+ b'Content-Type: text/plain\r\n'
+ b'\r\n')
payload = b'This response is missing a Content-Length http header.'
elif self.path.startswith('/test_payload_digest-'):
content_body = (
b'Hello. How are you. I am the test_payload_digest '
b'content body. The entity body is a possibly content-'
b'encoded version of me. The message body is a possibly '
b'transfer-encoded version of the entity body.\n')
gzipped = (
b"\x1f\x8b\x08\x00jA\x06Z\x02\xffm\x8d1\x0e\xc20\x10\x04{^"
b"\xb1\x1f\xc0\xef\x08=}t\x897\xc1\x92\xed\x8b|\x07\xc8"
b"\xbf'\n\xa2@J9\xab\x19\xed\xc0\x9c5`\xd07\xa4\x11]\x9f"
b"\x017H\x81?\x08\xa7\xf9\xb8I\xcf*q\x8ci\xdd\x11\xb3VguL"
b"\x1a{\xc0}\xb7vJ\xde\x8f\x01\xc9 \xd8\xd4,M\xb9\xff\xdc"
b"+\xeb\xac\x91\x11/6KZ\xa1\x0b\n\xbfq\xa1\x99\xac<\xab"
b"\xbdI\xb5\x85\xed,\xf7\xff\xdfp\xf9\x00\xfc\t\x02\xb0"
b"\xc8\x00\x00\x00")
double_gzipped = (
b"\x1f\x8b\x08\x00jA\x06Z\x02\xff\x01\x89\x00v\xff\x1f\x8b"
b"\x08\x00jA\x06Z\x02\xffm\x8d1\x0e\xc20\x10\x04{^\xb1\x1f"
b"\xc0\xef\x08=}t\x897\xc1\x92\xed\x8b|\x07\xc8\xbf'\n\xa2"
b"@J9\xab\x19\xed\xc0\x9c5`\xd07\xa4\x11]\x9f\x017H\x81?"
b"\x08\xa7\xf9\xb8I\xcf*q\x8ci\xdd\x11\xb3VguL\x1a{\xc0}"
b"\xb7vJ\xde\x8f\x01\xc9 \xd8\xd4,M\xb9\xff\xdc+\xeb\xac"
b"\x91\x11/6KZ\xa1\x0b\n\xbfq\xa1\x99\xac<\xab\xbdI\xb5"
b"\x85\xed,\xf7\xff\xdfp\xf9\x00\xfc\t\x02\xb0\xc8\x00\x00"
b"\x00\xf9\xdd\x8f\xed\x89\x00\x00\x00")
if self.path == '/test_payload_digest-plain':
payload = content_body
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-gzip':
payload = gzipped
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-ce-gzip':
payload = gzipped
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-gzip-ce-gzip':
payload = double_gzipped
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Content-Length: ' + str(len(payload)).encode('ascii') + b'\r\n')
elif self.path == '/test_payload_digest-te-chunked':
payload = chunkify(content_body)
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Transfer-Encoding: chunked\r\n')
elif self.path == '/test_payload_digest-gzip-te-chunked':
payload = chunkify(gzipped)
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Transfer-Encoding: chunked\r\n')
elif self.path == '/test_payload_digest-ce-gzip-te-chunked':
payload = chunkify(gzipped)
actual_headers = (b'Content-Type: text/plain\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Transfer-Encoding: chunked\r\n')
elif self.path == '/test_payload_digest-gzip-ce-gzip-te-chunked':
payload = chunkify(double_gzipped)
actual_headers = (b'Content-Type: application/gzip\r\n'
+ b'Content-Encoding: gzip\r\n'
+ b'Transfer-Encoding: chunked\r\n')
else:
raise Exception('bad path')
headers = b'HTTP/1.1 200 OK\r\n' + actual_headers + b'\r\n'
logging.info('headers=%r payload=%r', headers, payload)
else:
payload = b'404 Not Found\n'
headers = (b'HTTP/1.1 404 Not Found\r\n'
@ -1372,11 +1459,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert response.status_code == 200
start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')
while time.time() - start < 10:
if os.path.exists(os.path.join(
warcprox_.options.crawl_log_dir, 'test_crawl_log_1.log')):
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
assert os.path.exists(os.path.join(
warcprox_.options.crawl_log_dir, 'crawl.log'))
crawl_log = open(os.path.join(
warcprox_.options.crawl_log_dir, 'crawl.log'), 'rb').read()
@ -1392,7 +1482,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert fields[6] == b'text/plain'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:NHKRURXEJICOQEINUDERRF6OZ2LZ7JYP'
assert fields[9] == b'sha1:69d51a46e44a04e8110da0c91897cece979fa70f'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
@ -1413,7 +1503,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert fields[6] == b'text/plain'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:TKXGVS3ZPR24VDVV3XWZXYQSPTDBWP53'
assert fields[9] == b'sha1:9aae6acb797c75ca8eb5dded9be2127cc61b3fbb'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
@ -1430,14 +1520,14 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert response.status_code == 200
start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')
while time.time() - start < 10:
if os.path.exists(os.path.join(
warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log')):
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
crawl_log_2 = open(os.path.join(
warcprox_.options.crawl_log_dir, 'test_crawl_log_2.log'), 'rb').read()
crawl_log_2 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_2)
assert crawl_log_2[24:31] == b' 200 '
@ -1450,7 +1540,7 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
assert fields[6] == b'text/plain'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:NHKRURXEJICOQEINUDERRF6OZ2LZ7JYP'
assert fields[9] == b'sha1:69d51a46e44a04e8110da0c91897cece979fa70f'
assert fields[10] == b'http://example.com/seed'
assert fields[11] == b'duplicate:digest'
extra_info = json.loads(fields[12].decode('utf-8'))
@ -1458,6 +1548,81 @@ def test_crawl_log(warcprox_, http_daemon, archiving_proxies):
'contentSize', 'warcFilename', 'warcFileOffset'}
assert extra_info['contentSize'] == 145
# a request that is not saved to a warc (because of --method-filter)
# currently not logged at all (XXX maybe it should be)
url = 'http://localhost:%s/b/cc' % http_daemon.server_port
headers = {'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_3'})}
response = requests.head(url, proxies=archiving_proxies, headers=headers)
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_3.log')
start = time.time()
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
crawl_log_3 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_3)
assert crawl_log_3[24:31] == b' 200 '
assert crawl_log_3[31:42] == b' 0 '
fields = crawl_log_3.split()
assert len(fields) == 13
assert fields[3].endswith(b'/b/cc')
assert fields[4] == b'-'
assert fields[5] == b'-'
assert fields[6] == b'text/plain'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:da39a3ee5e6b4b0d3255bfef95601890afd80709'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert extra_info == {'contentSize': 91}
# WARCPROX_WRITE_RECORD
url = 'http://fakeurl/'
payload = b'I am the WARCPROX_WRITE_RECORD payload'
headers = {
'Content-Type': 'text/plain',
'WARC-Type': 'metadata',
'Host': 'N/A',
'Warcprox-Meta': json.dumps({'warc-prefix': 'test_crawl_log_4'}),
}
response = requests.request(
method='WARCPROX_WRITE_RECORD', url=url, data=payload,
headers=headers, proxies=archiving_proxies)
assert response.status_code == 204
start = time.time()
file = os.path.join(warcprox_.options.crawl_log_dir, 'test_crawl_log_4.log')
while time.time() - start < 10:
if os.path.exists(file) and os.stat(file).st_size > 0:
break
time.sleep(0.5)
assert os.path.exists(file)
crawl_log_4 = open(file, 'rb').read()
assert re.match(b'\A2[^\n]+\n\Z', crawl_log_4)
assert crawl_log_4[24:31] == b' 204 '
assert crawl_log_4[31:42] == b' 38 '
fields = crawl_log_4.split()
assert len(fields) == 13
assert fields[3] == b'http://fakeurl/'
assert fields[4] == b'-'
assert fields[5] == b'-'
assert fields[6] == b'text/plain'
assert fields[7] == b'-'
assert re.match(br'^\d{17}[+]\d{3}', fields[8])
assert fields[9] == b'sha1:bb56497c17d2684f5eca4af9df908c78ba74ca1c'
assert fields[10] == b'-'
assert fields[11] == b'-'
extra_info = json.loads(fields[12].decode('utf-8'))
assert set(extra_info.keys()) == {
'contentSize', 'warcFilename', 'warcFileOffset'}
assert extra_info['contentSize'] == 38
def test_long_warcprox_meta(
warcprox_, http_daemon, archiving_proxies, playback_proxies):
url = 'http://localhost:%s/b/g' % http_daemon.server_port
@ -1496,6 +1661,84 @@ def test_long_warcprox_meta(
with pytest.raises(StopIteration):
next(rec_iter)
def test_payload_digest(warcprox_, http_daemon):
'''
Tests that digest is of RFC2616 "entity body"
(transfer-decoded but not content-decoded)
'''
class HalfMockedMitm(warcprox.mitmproxy.MitmProxyHandler):
def __init__(self, url):
self.path = url
self.request_version = 'HTTP/1.1'
self.client_address = mock.MagicMock()
self.headers = email.message.Message()
self.headers.add_header('Host', 'localhost:%s' % http_daemon.server_port)
self.server = warcprox_.proxy
self.command = 'GET'
self.connection = mock.Mock()
PLAIN_SHA1 = b'sha1:881289333370aa4e3214505f1173423cc5a896b7'
GZIP_SHA1 = b'sha1:634e25de71ae01edb5c5d9e2e99c4836bbe94129'
GZIP_GZIP_SHA1 = b'sha1:cecbf3a5c4975072f5e4c5e0489f808ef246c2b4'
# plain
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-plain' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == PLAIN_SHA1
# content-type: application/gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-ce-gzip' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# content-type: application/gzip && content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-ce-gzip' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
# chunked plain
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == PLAIN_SHA1
# chunked content-type: application/gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# chunked content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-ce-gzip-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_SHA1
# chunked content-type: application/gzip && content-encoding: gzip
mitm = HalfMockedMitm('http://localhost:%s/test_payload_digest-gzip-ce-gzip-te-chunked' % http_daemon.server_port)
req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
def test_trough_segment_promotion(warcprox_):
if not warcprox_.options.rethinkdb_trough_db_url:
return
cli = warcprox.trough.TroughClient(
warcprox_.options.rethinkdb_trough_db_url, 3)
promoted = []
def mock(segment_id):
promoted.append(segment_id)
cli.promote = mock
cli.register_schema('default', 'create table foo (bar varchar(100))')
cli.write('my_seg', 'insert into foo (bar) values ("boof")')
assert promoted == []
time.sleep(3)
assert promoted == ['my_seg']
promoted = []
time.sleep(3)
assert promoted == []
if __name__ == '__main__':
pytest.main()

View File

@ -28,7 +28,7 @@ except ImportError:
import Queue as queue
import datetime
def digest_str(hash_obj, base32):
def digest_str(hash_obj, base32=False):
import base64
return hash_obj.name.encode('utf-8') + b':' + (
base64.b32encode(hash_obj.digest()) if base32

View File

@ -141,16 +141,16 @@ class RethinkCaptures:
return result
def _assemble_entry(self, recorded_url, records):
if recorded_url.response_recorder:
if recorded_url.response_recorder.payload_digest.name == "sha1":
if recorded_url.payload_digest:
if recorded_url.payload_digest.name == "sha1":
sha1base32 = base64.b32encode(
recorded_url.response_recorder.payload_digest.digest()
recorded_url.payload_digest.digest()
).decode("utf-8")
else:
self.logger.warn(
"digest type is %r but big captures table is indexed "
"by sha1",
recorded_url.response_recorder.payload_digest.name)
recorded_url.payload_digest.name)
else:
digest = hashlib.new("sha1", records[0].content[1])
sha1base32 = base64.b32encode(digest.digest()).decode("utf-8")
@ -201,6 +201,7 @@ class RethinkCaptures:
return entry
def notify(self, recorded_url, records):
if records:
entry = self._assemble_entry(recorded_url, records)
with self._batch_lock:
self._batch.append(entry)

View File

@ -26,8 +26,9 @@ import os
import warcprox
class CrawlLogger(object):
def __init__(self, dir_):
def __init__(self, dir_, options=warcprox.Options()):
self.dir = dir_
self.options = options
if not os.path.exists(self.dir):
logging.info('creating directory %r', self.dir)
os.mkdir(self.dir)
@ -35,15 +36,23 @@ class CrawlLogger(object):
def notify(self, recorded_url, records):
# 2017-08-03T21:45:24.496Z 200 2189 https://autismcouncil.wisconsin.gov/robots.txt P https://autismcouncil.wisconsin.gov/ text/plain #001 20170803214523617+365 sha1:PBS2CEF7B4OSEXZZF3QE2XN2VHYCPNPX https://autismcouncil.wisconsin.gov/ duplicate:digest {"warcFileOffset":942,"contentSize":2495,"warcFilename":"ARCHIVEIT-2159-TEST-JOB319150-20170803214522386-00000.warc.gz"}
now = datetime.datetime.utcnow()
extra_info = {
'contentSize': recorded_url.size,
'warcFilename': records[0].warc_filename,
'warcFileOffset': records[0].offset,
}
extra_info = {'contentSize': recorded_url.size,}
if records:
extra_info['warcFilename'] = records[0].warc_filename
extra_info['warcFileOffset'] = records[0].offset
if recorded_url.response_recorder:
content_length = recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset
payload_digest = warcprox.digest_str(
recorded_url.payload_digest,
self.options.base32)
else:
# WARCPROX_WRITE_RECORD request
content_length = len(recorded_url.request_data)
payload_digest = records[0].get_header(b'WARC-Payload-Digest')
fields = [
'{:%Y-%m-%dT%H:%M:%S}.{:03d}Z'.format(now, now.microsecond//1000),
'% 5s' % recorded_url.status,
'% 10s' % (recorded_url.response_recorder.len - recorded_url.response_recorder.payload_offset),
'% 10s' % content_length,
recorded_url.url,
'-', # hop path
recorded_url.referer or '-',
@ -53,10 +62,9 @@ class CrawlLogger(object):
recorded_url.timestamp,
recorded_url.timestamp.microsecond//1000,
recorded_url.duration.microseconds//1000),
warcprox.digest_str(
recorded_url.response_recorder.payload_digest, True),
payload_digest,
recorded_url.warcprox_meta.get('metadata', {}).get('seed', '-'),
'duplicate:digest' if records[0].type == b'revisit' else '-',
'duplicate:digest' if records and records[0].type == b'revisit' else '-',
json.dumps(extra_info, separators=(',',':')),
]
for i in range(len(fields)):

View File

@ -26,10 +26,9 @@ import os
import json
from hanzo import warctools
import warcprox
import warcprox.trough
import sqlite3
import requests
import doublethink
import rethinkdb as r
import datetime
import urllib3
from urllib3.exceptions import HTTPError
@ -96,11 +95,10 @@ class DedupDb(object):
return result
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(
recorded_url.response_recorder.payload_digest,
self.options.base32)
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(
digest_key, records[0],
@ -111,9 +109,9 @@ class DedupDb(object):
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if (recorded_url.response_recorder
and recorded_url.response_recorder.payload_digest
and recorded_url.payload_digest
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
digest_key = warcprox.digest_str(recorded_url.payload_digest, base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"],
recorded_url.url)
@ -174,10 +172,10 @@ class RethinkDedupDb:
return result
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
if (records and records[0].type == b'response'
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.options.base32)
digest_key = warcprox.digest_str(
recorded_url.payload_digest, self.options.base32)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
else:
@ -246,65 +244,6 @@ class CdxServerDedup(object):
"""
pass
class TroughClient(object):
logger = logging.getLogger("warcprox.dedup.TroughClient")
def __init__(self, rethinkdb_trough_db_url):
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
def segment_manager_url(self):
master_node = self.svcreg.unique_service('trough-sync-master')
assert master_node
return master_node['url']
def write_url(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
logging.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = '%s/schema/%s/sql' % (self.segment_manager_url(), schema_id)
response = requests.put(url, sql)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))
class TroughDedupDb(object):
'''
https://github.com/internetarchive/trough
@ -317,106 +256,31 @@ class TroughDedupDb(object):
' url varchar(2100) not null,\n'
' date datetime not null,\n'
' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert into dedup (digest_key, url, date, id) '
'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()):
self.options = options
self._trough_cli = TroughClient(options.rethinkdb_trough_db_url)
self._write_url_cache = {}
self._read_url_cache = {}
self._trough_cli = warcprox.trough.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60)
def start(self):
self._trough_cli.register_schema(self.SCHEMA_ID, self.SCHEMA_SQL)
def _write_url(self, bucket):
if not bucket in self._write_url_cache:
segment_id = 'warcprox-trough-%s' % bucket
self._write_url_cache[bucket] = self._trough_cli.write_url(
segment_id, self.SCHEMA_ID)
logging.info(
'bucket %r write url is %r', bucket,
self._write_url_cache[bucket])
return self._write_url_cache[bucket]
def _read_url(self, bucket):
if not self._read_url_cache.get(bucket):
segment_id = 'warcprox-trough-%s' % bucket
self._read_url_cache[bucket] = self._trough_cli.read_url(segment_id)
logging.info(
'bucket %r read url is %r', bucket,
self._read_url_cache[bucket])
return self._read_url_cache[bucket]
def sql_value(self, x):
if x is None:
return 'null'
elif isinstance(x, datetime.datetime):
return 'datetime(%r)' % x.isoformat()
elif isinstance(x, bool):
return int(x)
elif isinstance(x, str) or isinstance(x, bytes):
# py3: repr(u'abc') => 'abc'
# repr(b'abc') => b'abc'
# py2: repr(u'abc') => u'abc'
# repr(b'abc') => 'abc'
# Repr gives us a prefix we don't want in different situations
# depending on whether this is py2 or py3. Chop it off either way.
r = repr(x)
if r[:1] == "'":
return r
else:
return r[1:]
else:
raise Exception("don't know how to make an sql value from %r" % x)
def save(self, digest_key, response_record, bucket='__unspecified__'):
write_url = self._write_url(bucket)
record_id = response_record.get_header(warctools.WarcRecord.ID)
url = response_record.get_header(warctools.WarcRecord.URL)
warc_date = response_record.get_header(warctools.WarcRecord.DATE)
sql = ('insert into dedup (digest_key, url, date, id) '
'values (%s, %s, %s, %s);') % (
self.sql_value(digest_key), self.sql_value(url),
self.sql_value(warc_date), self.sql_value(record_id))
try:
response = requests.post(write_url, sql)
except:
logging.error(
'problem with trough write url %r', write_url,
exc_info=True)
del self._write_url_cache[bucket]
return
if response.status_code != 200:
del self._write_url_cache[bucket]
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
else:
logging.trace('posted %r to %s', sql, write_url)
self._trough_cli.write(
bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
def lookup(self, digest_key, bucket='__unspecified__', url=None):
read_url = self._read_url(bucket)
if not read_url:
return None
sql = 'select * from dedup where digest_key=%s;' % (
self.sql_value(digest_key))
try:
response = requests.post(read_url, sql)
except:
logging.error(
'problem with trough read url %r', read_url, exc_info=True)
del self._read_url_cache[bucket]
return None
if response.status_code != 200:
del self._read_url_cache[bucket]
logging.warn(
'unexpected response %r %r %r to sql=%r',
response.status_code, response.reason, response.text, sql)
return None
logging.debug('got %r from query %r', response.text, sql)
results = json.loads(response.text)
assert len(results) <= 1 # sanity check (digest_key is primary key)
results = self._trough_cli.read(
bucket, 'select * from dedup where digest_key=%s;',
(digest_key,))
if results:
assert len(results) == 1 # sanity check (digest_key is primary key)
result = results[0]
result['id'] = result['id'].encode('ascii')
result['url'] = result['url'].encode('ascii')
@ -431,7 +295,7 @@ class TroughDedupDb(object):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
digest_key = warcprox.digest_str(
recorded_url.response_recorder.payload_digest,
recorded_url.payload_digest,
self.options.base32)
if recorded_url.warcprox_meta and 'captures-bucket' in recorded_url.warcprox_meta:
self.save(

View File

@ -254,7 +254,8 @@ def init_controller(args):
playback_proxy = None
if args.crawl_log_dir:
listeners.append(warcprox.crawl_log.CrawlLogger(args.crawl_log_dir))
listeners.append(warcprox.crawl_log.CrawlLogger(
args.crawl_log_dir, options=options))
for qualname in args.plugins or []:
try:

View File

@ -66,7 +66,7 @@ import time
class ProxyingRecorder(object):
"""
Wraps a socket._fileobject, recording the bytes as they are read,
calculating digests, and sending them on to the proxy client.
calculating the block digest, and sending them on to the proxy client.
"""
logger = logging.getLogger("warcprox.mitmproxy.ProxyingRecorder")
@ -78,27 +78,19 @@ class ProxyingRecorder(object):
self.digest_algorithm = digest_algorithm
self.block_digest = hashlib.new(digest_algorithm)
self.payload_offset = None
self.payload_digest = None
self.proxy_client = proxy_client
self._proxy_client_conn_open = True
self.len = 0
self.url = url
def payload_starts_now(self):
self.payload_digest = hashlib.new(self.digest_algorithm)
self.payload_offset = self.len
def _update_payload_digest(self, hunk):
if self.payload_digest:
self.payload_digest.update(hunk)
def _update(self, hunk):
self._update_payload_digest(hunk)
self.block_digest.update(hunk)
self.tempfile.write(hunk)
if self.payload_digest and self._proxy_client_conn_open:
if self.payload_offset is not None and self._proxy_client_conn_open:
try:
self.proxy_client.sendall(hunk)
except BaseException as e:
@ -157,6 +149,7 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self, sock, debuglevel=debuglevel, method=method)
self.proxy_client = proxy_client
self.url = url
self.digest_algorithm = digest_algorithm
# Keep around extra reference to self.fp because HTTPResponse sets
# self.fp=None after it finishes reading, but we still need it
@ -164,6 +157,8 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.fp, proxy_client, digest_algorithm, url=url)
self.fp = self.recorder
self.payload_digest = None
def begin(self, extra_response_headers={}):
http_client.HTTPResponse.begin(self) # reads status line, headers
@ -185,6 +180,12 @@ class ProxyingRecordingHTTPResponse(http_client.HTTPResponse):
self.proxy_client.sendall(status_and_headers.encode('latin1'))
self.recorder.payload_starts_now()
self.payload_digest = hashlib.new(self.digest_algorithm)
def read(self, amt=None):
buf = http_client.HTTPResponse.read(self, amt)
self.payload_digest.update(buf)
return buf
def via_header_value(orig, request_version):
via = orig
@ -361,7 +362,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
return
try:
self._proxy_request()
return self._proxy_request()
except:
self.logger.error("exception proxying request", exc_info=True)
raise
@ -406,6 +407,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
if 'Content-Length' in self.headers:
req += self.rfile.read(int(self.headers['Content-Length']))
prox_rec_res = None
try:
self.logger.debug('sending to remote server req=%r', req)
@ -418,9 +420,9 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
url=self.url, method=self.command)
prox_rec_res.begin(extra_response_headers=extra_response_headers)
buf = prox_rec_res.read(8192)
buf = prox_rec_res.read(65536)
while buf != b'':
buf = prox_rec_res.read(8192)
buf = prox_rec_res.read(65536)
self.log_request(prox_rec_res.status, prox_rec_res.recorder.len)
except Exception as e:

View File

@ -259,6 +259,7 @@ class PlaybackIndexDb(object):
pass
def notify(self, recorded_url, records):
if records:
self.save(records[0].warc_filename, records, records[0].offset)
def save(self, warcfile, recordset, offset):

View File

@ -171,7 +171,8 @@ class StatsDb:
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
@ -306,8 +307,6 @@ class RethinkStatsDb(StatsDb):
def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url)
is_revisit = records[0].get_header(
warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT
with self._batch_lock:
for bucket in buckets:
bucket_stats = self._batch.setdefault(
@ -316,7 +315,8 @@ class RethinkStatsDb(StatsDb):
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if is_revisit:
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:

238
warcprox/trough.py Normal file
View File

@ -0,0 +1,238 @@
'''
warcprox/trough.py - trough client code
Copyright (C) 2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import os
import json
import requests
import doublethink
import rethinkdb as r
import datetime
import threading
import time
class TroughClient(object):
logger = logging.getLogger("warcprox.trough.TroughClient")
def __init__(self, rethinkdb_trough_db_url, promotion_interval=None):
'''
TroughClient constructor
Args:
rethinkdb_trough_db_url: url with schema rethinkdb:// pointing to
trough configuration database
promotion_interval: if specified, `TroughClient` will spawn a
thread that "promotes" (pushed to hdfs) "dirty" trough segments
(segments that have received writes) periodically, sleeping for
`promotion_interval` seconds between cycles (default None)
'''
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
self._write_url_cache = {}
self._read_url_cache = {}
self._dirty_segments = set()
self._dirty_segments_lock = threading.RLock()
self.promotion_interval = promotion_interval
self._promoter_thread = None
if promotion_interval:
self._promoter_thread = threading.Thread(
target=self._promotrix, name='TroughClient-promoter')
self._promoter_thread.setDaemon(True)
self._promoter_thread.start()
def _promotrix(self):
while True:
time.sleep(self.promotion_interval)
try:
with self._dirty_segments_lock:
dirty_segments = list(self._dirty_segments)
self._dirty_segments.clear()
logging.info('promoting %s trough segments')
for segment in dirty_segments:
try:
self.promote(segment)
except:
logging.error(
'problem promoting segment %s', exc_info=True)
except:
logging.error(
'caught exception doing segment promotion',
exc_info=True)
def promote(self, segment_id):
url = os.path.join(self.segment_manager_url(), 'promote')
payload_dict = {'segment': segment_id}
response = requests.post(url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, url,
json.dumps(payload_dict)))
@staticmethod
def sql_value(x):
if x is None:
return 'null'
elif isinstance(x, datetime.datetime):
return 'datetime(%r)' % x.isoformat()
elif isinstance(x, bool):
return int(x)
elif isinstance(x, str) or isinstance(x, bytes):
# py3: repr(u'abc') => 'abc'
# repr(b'abc') => b'abc'
# py2: repr(u'abc') => u'abc'
# repr(b'abc') => 'abc'
# Repr gives us a prefix we don't want in different situations
# depending on whether this is py2 or py3. Chop it off either way.
r = repr(x)
if r[:1] == "'":
return r
else:
return r[1:]
elif isinstance(x, (int, float)):
return x
else:
raise Exception(
"don't know how to make an sql value from %r (%r)" % (
x, type(x)))
def segment_manager_url(self):
master_node = self.svcreg.unique_service('trough-sync-master')
assert master_node
return master_node['url']
def write_url_nocache(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url_nocache(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
self.logger.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def write_url(self, segment_id, schema_id='default'):
if not segment_id in self._write_url_cache:
self._write_url_cache[segment_id] = self.write_url_nocache(
segment_id, schema_id)
self.logger.info(
'segment %r write url is %r', segment_id,
self._write_url_cache[segment_id])
return self._write_url_cache[segment_id]
def read_url(self, segment_id):
if not self._read_url_cache.get(segment_id):
self._read_url_cache[segment_id] = self.read_url_nocache(segment_id)
self.logger.info(
'segment %r read url is %r', segment_id,
self._read_url_cache[segment_id])
return self._read_url_cache[segment_id]
def write(self, segment_id, sql_tmpl, values=(), schema_id='default'):
write_url = self.write_url(segment_id, schema_id)
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
try:
response = requests.post(write_url, sql)
if segment_id not in self._dirty_segments:
with self._dirty_segments_lock:
self._dirty_segments.add(segment_id)
except:
del self._write_url_cache[segment_id]
self.logger.error(
'problem with trough write url %r', write_url,
exc_info=True)
return
if response.status_code != 200:
del self._write_url_cache[segment_id]
self.logger.warn(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
write_url, sql)
return
self.logger.debug('posted %r to %s', sql, write_url)
def read(self, segment_id, sql_tmpl, values=()):
read_url = self.read_url(segment_id)
if not read_url:
return None
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
try:
response = requests.post(read_url, sql)
except:
del self._read_url_cache[segment_id]
self.logger.error(
'problem with trough read url %r', read_url, exc_info=True)
return None
if response.status_code != 200:
del self._read_url_cache[segment_id]
self.logger.warn(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
read_url, sql)
return None
self.logger.trace(
'got %r from posting query %r to %r', response.text, sql,
read_url)
results = json.loads(response.text)
return results
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = os.path.join(
self.segment_manager_url(), 'schema', schema_id, 'sql')
response = requests.put(url, sql)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))

View File

@ -53,7 +53,8 @@ class WarcRecordBuilder:
refers_to=recorded_url.dedup_info.get('id'),
refers_to_target_uri=recorded_url.dedup_info['url'],
refers_to_date=recorded_url.dedup_info['date'],
payload_digest=warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.base32),
payload_digest=warcprox.digest_str(
recorded_url.payload_digest, self.base32),
profile=warctools.WarcRecord.PROFILE_IDENTICAL_PAYLOAD_DIGEST,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
@ -64,7 +65,9 @@ class WarcRecordBuilder:
recorder=recorded_url.response_recorder,
warc_type=warctools.WarcRecord.RESPONSE,
content_type=hanzo.httptools.ResponseMessage.CONTENT_TYPE,
remote_ip=recorded_url.remote_ip)
remote_ip=recorded_url.remote_ip,
payload_digest=warcprox.digest_str(
recorded_url.payload_digest, self.base32))
def build_warc_records(self, recorded_url):
"""Returns a tuple of hanzo.warctools.warc.WarcRecord (principal_record, ...)"""
@ -122,13 +125,8 @@ class WarcRecordBuilder:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(recorder)).encode('latin1')))
headers.append((warctools.WarcRecord.BLOCK_DIGEST,
warcprox.digest_str(recorder.block_digest, self.base32)))
if recorder.payload_digest is not None:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
warcprox.digest_str(recorder.payload_digest, self.base32)))
recorder.tempfile.seek(0)
record = warctools.WarcRecord(headers=headers, content_file=recorder.tempfile)
else:
headers.append((warctools.WarcRecord.CONTENT_LENGTH, str(len(data)).encode('latin1')))
digest = hashlib.new(self.digest_algorithm, data)
@ -137,7 +135,6 @@ class WarcRecordBuilder:
if not payload_digest:
headers.append((warctools.WarcRecord.PAYLOAD_DIGEST,
warcprox.digest_str(digest, self.base32)))
content_tuple = content_type, data
record = warctools.WarcRecord(headers=headers, content=content_tuple)

View File

@ -217,8 +217,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
client_ip=self.client_address[0],
content_type=content_type, method=self.command,
timestamp=timestamp, host=self.hostname,
duration=datetime.datetime.utcnow() - timestamp,
referer=self.headers.get('referer'))
duration=datetime.datetime.utcnow()-timestamp,
referer=self.headers.get('referer'),
payload_digest=prox_rec_res.payload_digest)
self.server.recorded_url_q.put(recorded_url)
return recorded_url
@ -293,7 +294,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
if raw_warcprox_meta:
warcprox_meta = json.loads(raw_warcprox_meta)
rec_custom = RecordedUrl(url=self.url,
rec_custom = RecordedUrl(
url=self.url,
request_data=request_data,
response_recorder=None,
remote_ip=b'',
@ -302,7 +304,9 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
custom_type=warc_type or self.headers['WARC-Type'].encode('utf-8'),
status=204, size=len(request_data),
client_ip=self.client_address[0],
method=self.command, timestamp=timestamp)
method=self.command,
timestamp=timestamp,
duration=datetime.datetime.utcnow()-timestamp)
self.server.recorded_url_q.put(rec_custom)
self.send_response(204, 'OK')
@ -325,7 +329,8 @@ class RecordedUrl:
def __init__(self, url, request_data, response_recorder, remote_ip,
warcprox_meta=None, content_type=None, custom_type=None,
status=None, size=None, client_ip=None, method=None,
timestamp=None, host=None, duration=None, referer=None):
timestamp=None, host=None, duration=None, referer=None,
payload_digest=None):
# XXX should test what happens with non-ascii url (when does
# url-encoding happen?)
if type(url) is not bytes:
@ -363,6 +368,7 @@ class RecordedUrl:
self.host = host
self.duration = duration
self.referer = referer
self.payload_digest = payload_digest
# inherit from object so that multiple inheritance from this class works
# properly in python 2

View File

@ -82,12 +82,14 @@ class WarcWriterThread(threading.Thread):
self.logger.info("%s urls left to write", qsize)
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
records = []
self.idle = None
if self._filter_accepts(recorded_url):
if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records)
# try to release resources in a timely fashion
@ -134,11 +136,15 @@ class WarcWriterThread(threading.Thread):
payload_digest = "-"
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
self.logger.info("{} {} {} {} {} size={} {} {} {} offset={}".format(
recorded_url.client_ip, recorded_url.status, recorded_url.method,
recorded_url.url.decode("utf-8"), recorded_url.mimetype,
recorded_url.size, payload_digest, records[0].type.decode("utf-8"),
records[0].warc_filename, records[0].offset))
type_ = records[0].type.decode("utf-8") if records else '-'
filename = records[0].warc_filename if records else '-'
offset = records[0].offset if records else '-'
self.logger.info(
"%s %s %s %s %s size=%s %s %s %s offset=%s",
recorded_url.client_ip, recorded_url.status,
recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset)
def _final_tasks(self, recorded_url, records):
if self.listeners: