support for deduplication buckets specified in warcprox-meta header {"captures-bucket":...,...}

This commit is contained in:
Noah Levitt 2015-08-27 20:09:21 +00:00
parent 6d673ee35f
commit 44a62111fb
8 changed files with 226 additions and 88 deletions

View File

@ -45,17 +45,14 @@ class RethinkCaptures:
self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db))
r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn) r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn)
r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn) r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run(conn)
# r.dself.b(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn)
# r.db(self.db).table(self.table).index_create("timestamp").run(conn)
# r.db(self.db).table(self.table).index_create("sha1base32").run(conn)
def find_response_by_digest(self, algo, raw_digest): def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"):
if algo != "sha1": if algo != "sha1":
raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo))
sha1base32 = base64.b32encode(raw_digest).decode("utf-8") sha1base32 = base64.b32encode(raw_digest).decode("utf-8")
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response"], index="sha1_warc_type").run(conn) cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run(conn)
results = list(cursor) results = list(cursor)
if len(results) > 1: if len(results) > 1:
raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32)
@ -67,9 +64,17 @@ class RethinkCaptures:
return result return result
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if not recorded_url.response_recorder:
return
if recorded_url.response_recorder.payload_digest.name != "sha1": if recorded_url.response_recorder.payload_digest.name != "sha1":
self.logger.warn("digest type is %s but big capture table is indexed by sha1", recorded_url.response_recorder.payload_digest.name) self.logger.warn("digest type is %s but big capture table is indexed by sha1", recorded_url.response_recorder.payload_digest.name)
if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
bucket = recorded_url.warcprox_meta["captures-bucket"]
else:
bucket = "__unspecified__"
canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False) canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False)
entry = { entry = {
# id only specified for rethinkdb partitioning # id only specified for rethinkdb partitioning
@ -86,13 +91,14 @@ class RethinkCaptures:
"content_type": recorded_url.content_type, "content_type": recorded_url.content_type,
"response_code": recorded_url.status, "response_code": recorded_url.status,
"http_method": recorded_url.method, "http_method": recorded_url.method,
"bucket": bucket,
} }
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
result = r.db(self.db).table(self.table).insert(entry).run(conn) result = r.db(self.db).table(self.table).insert(entry).run(conn)
if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]:
raise Exception("unexpected result %s saving %s", result, entry) raise Exception("unexpected result %s saving %s", result, entry)
self.logger.info('big capture table db saved %s', entry) self.logger.info("big capture table db saved %s", entry)
class RethinkCapturesDedup: class RethinkCapturesDedup:
logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup")
@ -101,7 +107,7 @@ class RethinkCapturesDedup:
self.captures_db = captures_db self.captures_db = captures_db
self.options = options self.options = options
def lookup(self, digest_key): def lookup(self, digest_key, bucket="__unspecified__"):
k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
algo, value_str = k.split(":") algo, value_str = k.split(":")
self.logger.info("(algo,value_str)=(%s,%s)", algo, value_str) self.logger.info("(algo,value_str)=(%s,%s)", algo, value_str)
@ -109,7 +115,7 @@ class RethinkCapturesDedup:
raw_digest = base64.b32decode(value_str, casefold=True) raw_digest = base64.b32decode(value_str, casefold=True)
else: else:
raw_digest = base64.b16decode(value_str, casefold=True) raw_digest = base64.b16decode(value_str, casefold=True)
entry = self.captures_db.find_response_by_digest(algo, raw_digest) entry = self.captures_db.find_response_by_digest(algo, raw_digest, bucket)
if entry: if entry:
dedup_info = {"url":entry["url"].encode("utf-8"), "date":entry["timestamp"].encode("utf-8"), "id":entry["warc_id"].encode("utf-8")} dedup_info = {"url":entry["url"].encode("utf-8"), "date":entry["timestamp"].encode("utf-8"), "id":entry["warc_id"].encode("utf-8")}
self.logger.info("returning %s for digest_key=%s", dedup_info, digest_key) self.logger.info("returning %s for digest_key=%s", dedup_info, digest_key)

View File

@ -40,19 +40,22 @@ class DedupDb(object):
except: except:
pass pass
def save(self, key, response_record): def save(self, digest_key, response_record, bucket=""):
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
key = digest_key + b"|" + bucket.encode("utf-8")
py_value = {'id':record_id, 'url':url, 'date':date} py_value = {'id':record_id, 'url':url, 'date':date}
json_value = json.dumps(py_value, separators=(',',':')) json_value = json.dumps(py_value, separators=(',',':'))
self.db[key] = json_value.encode('utf-8') self.db[key] = json_value.encode('utf-8')
self.logger.debug('dedup db saved %s:%s', key, json_value) self.logger.debug('dedup db saved %s:%s', key, json_value)
def lookup(self, key): def lookup(self, digest_key, bucket=""):
result = None result = None
key = digest_key + b"|" + bucket.encode("utf-8")
if key in self.db: if key in self.db:
json_result = self.db[key] json_result = self.db[key]
result = json.loads(json_result.decode('utf-8')) result = json.loads(json_result.decode('utf-8'))
@ -65,15 +68,21 @@ class DedupDb(object):
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.options.base32) self.options.base32)
self.save(key, records[0]) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
else:
self.save(digest_key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest: if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest:
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
recorded_url.dedup_info = dedup_db.lookup(key) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"])
else:
recorded_url.dedup_info = dedup_db.lookup(digest_key)
class RethinkDedupDb: class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
@ -114,8 +123,9 @@ class RethinkDedupDb:
def sync(self): def sync(self):
pass pass
def save(self, key, response_record): def save(self, digest_key, response_record, bucket=""):
k = key.decode("utf-8") if isinstance(key, bytes) else key k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1')
url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1')
date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1')
@ -124,21 +134,25 @@ class RethinkDedupDb:
result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn) result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn)
if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record) raise Exception("unexpected result %s saving %s", result, record)
self.logger.debug('dedup db saved %s:%s', key, record) self.logger.debug('dedup db saved %s:%s', k, record)
def lookup(self, key): def lookup(self, digest_key, bucket=""):
k = key.decode("utf-8") if isinstance(key, bytes) else key k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key
k = "{}|{}".format(k, bucket)
with self._random_server_connection() as conn: with self._random_server_connection() as conn:
result = r.db(self.db).table(self.table).get(k).run(conn) result = r.db(self.db).table(self.table).get(k).run(conn)
if result: if result:
for x in result: for x in result:
result[x] = result[x].encode("utf-8") result[x] = result[x].encode("utf-8")
self.logger.debug('dedup db lookup of key=%s returning %s', key, result) self.logger.debug('dedup db lookup of key=%s returning %s', k, result)
return result return result
def notify(self, recorded_url, records): def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0): and recorded_url.response_recorder.payload_size() > 0):
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.options.base32) self.options.base32)
self.save(key, records[0]) if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta:
self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"])
else:
self.save(digest_key, records[0])

View File

@ -43,10 +43,10 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser.add_argument('-n', '--prefix', dest='prefix', arg_parser.add_argument('-n', '--prefix', dest='prefix',
default='WARCPROX', help='WARC filename prefix') default='WARCPROX', help='WARC filename prefix')
arg_parser.add_argument('-s', '--size', dest='size', arg_parser.add_argument('-s', '--size', dest='size',
default=1000*1000*1000, default=1000*1000*1000, type=int,
help='WARC file rollover size threshold in bytes') help='WARC file rollover size threshold in bytes')
arg_parser.add_argument('--rollover-idle-time', arg_parser.add_argument('--rollover-idle-time',
dest='rollover_idle_time', default=None, dest='rollover_idle_time', default=None, type=int,
help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)")
try: try:
hash_algos = hashlib.algorithms_guaranteed hash_algos = hashlib.algorithms_guaranteed
@ -150,10 +150,7 @@ def main(argv=sys.argv):
ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir,
ca_name=ca_name) ca_name=ca_name)
proxy = warcprox.warcproxy.WarcProxy( proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q,
server_address=(args.address, args.port), ca=ca,
recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm,
stats_db=stats_db, options=options) stats_db=stats_db, options=options)
if args.playback_port is not None: if args.playback_port is not None:
@ -167,8 +164,7 @@ def main(argv=sys.argv):
playback_index_db = None playback_index_db = None
playback_proxy = None playback_proxy = None
default_warc_writer = warcprox.writer.WarcWriter(args.prefix, options=options) writer_pool = warcprox.writer.WarcWriterPool(options=options)
writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer, options=options)
warc_writer_thread = warcprox.writerthread.WarcWriterThread( warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool, recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=listeners, options=options) dedup_db=dedup_db, listeners=listeners, options=options)

View File

@ -27,6 +27,7 @@ import json
import traceback import traceback
import re import re
from warcprox.mitmproxy import MitmProxyHandler from warcprox.mitmproxy import MitmProxyHandler
import warcprox
class PlaybackProxyHandler(MitmProxyHandler): class PlaybackProxyHandler(MitmProxyHandler):
logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler")
@ -180,13 +181,14 @@ class PlaybackProxyHandler(MitmProxyHandler):
class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer):
logger = logging.getLogger("warcprox.playback.PlaybackProxy") logger = logging.getLogger("warcprox.playback.PlaybackProxy")
def __init__(self, server_address, req_handler_class=PlaybackProxyHandler,
bind_and_activate=True, ca=None, playback_index_db=None, def __init__(self, ca=None, playback_index_db=None, options=warcprox.Options()):
warcs_dir=None): server_address = (options.address or 'localhost', options.playback_port if options.playback_port is not None else 8001)
http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) http_server.HTTPServer.__init__(self, server_address, PlaybackProxyHandler, bind_and_activate=True)
self.ca = ca self.ca = ca
self.playback_index_db = playback_index_db self.playback_index_db = playback_index_db
self.warcs_dir = warcs_dir self.warcs_dir = options.directory
self.options = options
def server_activate(self): def server_activate(self):
http_server.HTTPServer.server_activate(self) http_server.HTTPServer.server_activate(self)

View File

@ -18,6 +18,7 @@ import json
import rethinkdb import rethinkdb
r = rethinkdb r = rethinkdb
import random import random
from hanzo import warctools
try: try:
import http.server as http_server import http.server as http_server
@ -166,11 +167,12 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db):
ddb = warcprox.dedup.RethinkDedupDb(servers, db) ddb = warcprox.dedup.RethinkDedupDb(servers, db)
def fin(): def fin():
if not captures_db: if rethinkdb_servers:
logging.info('dropping rethinkdb database {}'.format(db)) if not captures_db:
with ddb._random_server_connection() as conn: logging.info('dropping rethinkdb database {}'.format(db))
result = r.db_drop(db).run(conn) with ddb._random_server_connection() as conn:
logging.info("result=%s", result) result = r.db_drop(db).run(conn)
logging.info("result=%s", result)
request.addfinalizer(fin) request.addfinalizer(fin)
return ddb return ddb
@ -228,26 +230,27 @@ def warcprox_(request, captures_db, dedup_db, stats_db):
recorded_url_q = queue.Queue() recorded_url_q = queue.Queue()
proxy = warcprox.warcproxy.WarcProxy(server_address=('localhost', 0), ca=ca, options = warcprox.Options(port=0, playback_port=0)
recorded_url_q=recorded_url_q, stats_db=stats_db) proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q,
stats_db=stats_db, options=options)
options.port = proxy.server_port
warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-warcs-') options.directory = tempfile.mkdtemp(prefix='warcprox-test-warcs-')
f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False) f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False)
f.close() f.close()
playback_index_db_file = f.name playback_index_db_file = f.name
playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file) playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file)
playback_proxy = warcprox.playback.PlaybackProxy(server_address=('localhost', 0), ca=ca, playback_proxy = warcprox.playback.PlaybackProxy(ca=ca,
playback_index_db=playback_index_db, warcs_dir=warcs_dir) playback_index_db=playback_index_db, options=options)
options.playback_proxy = playback_proxy.server_port
default_warc_writer = warcprox.writer.WarcWriter(directory=warcs_dir, writer_pool = warcprox.writer.WarcWriterPool(options)
port=proxy.server_port)
writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer)
warc_writer_thread = warcprox.writerthread.WarcWriterThread( warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool, recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, listeners=[captures_db or dedup_db, playback_index_db, stats_db]) dedup_db=dedup_db, listeners=[captures_db or dedup_db, playback_index_db, stats_db])
warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy, options)
logging.info('starting warcprox') logging.info('starting warcprox')
warcprox_thread = threading.Thread(name='WarcproxThread', warcprox_thread = threading.Thread(name='WarcproxThread',
target=warcprox_.run_until_shutdown) target=warcprox_.run_until_shutdown)
@ -257,7 +260,7 @@ def warcprox_(request, captures_db, dedup_db, stats_db):
logging.info('stopping warcprox') logging.info('stopping warcprox')
warcprox_.stop.set() warcprox_.stop.set()
warcprox_thread.join() warcprox_thread.join()
for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file): for f in (ca_file, ca_dir, options.directory, playback_index_db_file):
if os.path.isdir(f): if os.path.isdir(f):
logging.info('deleting directory {}'.format(f)) logging.info('deleting directory {}'.format(f))
shutil.rmtree(f) shutil.rmtree(f)
@ -394,8 +397,9 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies)
assert response.headers['warcprox-test-header'] == 'e!' assert response.headers['warcprox-test-header'] == 'e!'
assert response.content == b'I am the warcprox test payload! ffffffffff!\n' assert response.content == b'I am the warcprox test payload! ffffffffff!\n'
# XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\ # wait for writer thread to process
time.sleep(2.0) while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
# check in dedup db (no change from prev) # check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc')
@ -455,8 +459,9 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.headers['warcprox-test-header'] == 'g!' assert response.headers['warcprox-test-header'] == 'g!'
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\ # wait for writer thread to process
time.sleep(2.0) while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
# check in dedup db (no change from prev) # check in dedup db (no change from prev)
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89')
@ -472,7 +477,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie
assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n'
# XXX how to check dedup was used? # XXX how to check dedup was used?
def test_limits(http_daemon, archiving_proxies): def test_limits(http_daemon, warcprox_, archiving_proxies):
url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) url = 'http://localhost:{}/i/j'.format(http_daemon.server_port)
request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}} request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}}
headers = {"Warcprox-Meta": json.dumps(request_meta)} headers = {"Warcprox-Meta": json.dumps(request_meta)}
@ -483,8 +488,9 @@ def test_limits(http_daemon, archiving_proxies):
assert response.headers['warcprox-test-header'] == 'i!' assert response.headers['warcprox-test-header'] == 'i!'
assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n'
# XXX give warc writer thread a chance to update stats # wait for writer thread to process
time.sleep(2.0) while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True)
assert response.status_code == 420 assert response.status_code == 420
@ -494,6 +500,124 @@ def test_limits(http_daemon, archiving_proxies):
assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.headers["content-type"] == "text/plain;charset=utf-8"
assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n" assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n"
def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies):
url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port)
url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port)
# archive url1 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
# check url1 in dedup db bucket_a
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a")
assert dedup_lookup['url'] == url1.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
record_id = dedup_lookup['id']
dedup_date = dedup_lookup['date']
# check url1 not in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup is None
# archive url2 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
# check url2 in dedup db bucket_b
dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b")
assert dedup_lookup['url'] == url2.encode('ascii')
assert re.match(br'^<urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}>$', dedup_lookup['id'])
assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date'])
record_id = dedup_lookup['id']
dedup_date = dedup_lookup['date']
# archive url2 bucket_a
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})}
response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# archive url1 bucket_b
headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})}
response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers)
assert response.status_code == 200
assert response.headers['warcprox-test-header'] == 'k!'
assert response.content == b'I am the warcprox test payload! llllllllll!\n'
# wait for writer thread to process
while not warcprox_.warc_writer_thread.idle:
time.sleep(0.5)
# close the warc
assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"]
warc_path = os.path.join(writer.directory, writer._f_finalname)
warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer()
assert os.path.exists(warc_path)
# read the warc
fh = warctools.ArchiveRecord.open_archive(warc_path)
record_iter = fh.read_records(limit=None, offsets=True)
try:
(offset, record, errors) = next(record_iter)
assert record.type == b'warcinfo'
# url1 bucket_a
(offset, record, errors) = next(record_iter)
assert record.type == b'response'
assert record.url == url1.encode('ascii')
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# url2 bucket_b
(offset, record, errors) = next(record_iter)
assert record.type == b'response'
assert record.url == url2.encode('ascii')
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# url2 bucket_a (revisit)
(offset, record, errors) = next(record_iter)
assert record.type == b'revisit'
assert record.url == url2.encode('ascii')
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# url1 bucket_b (revisit)
(offset, record, errors) = next(record_iter)
assert record.type == b'revisit'
assert record.url == url1.encode('ascii')
assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\n'
(offset, record, errors) = next(record_iter)
assert record.type == b'request'
# that's all folks
assert next(record_iter)[1] == None
assert next(record_iter, None) == None
finally:
fh.close()
if __name__ == '__main__': if __name__ == '__main__':
pytest.main() pytest.main()

View File

@ -349,13 +349,11 @@ class RecordedUrl:
class SingleThreadedWarcProxy(http_server.HTTPServer): class SingleThreadedWarcProxy(http_server.HTTPServer):
logger = logging.getLogger("warcprox.warcproxy.WarcProxy") logger = logging.getLogger("warcprox.warcproxy.WarcProxy")
def __init__(self, server_address=('localhost', 8000), def __init__(self, ca=None, recorded_url_q=None, stats_db=None, options=warcprox.Options()):
req_handler_class=WarcProxyHandler, bind_and_activate=True, server_address = (options.address or 'localhost', options.port if options.port is not None else 8000)
ca=None, recorded_url_q=None, digest_algorithm='sha1', http_server.HTTPServer.__init__(self, server_address, WarcProxyHandler, bind_and_activate=True)
stats_db=None, options=warcprox.Options()):
http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.digest_algorithm = digest_algorithm self.digest_algorithm = options.digest_algorithm or 'sha1'
if ca is not None: if ca is not None:
self.ca = ca self.ca = ca

View File

@ -13,25 +13,22 @@ import string
import random import random
class WarcWriter: class WarcWriter:
logger = logging.getLogger("warcprox.writer.WarcWriter") logger = logging.getLogger('warcprox.writer.WarcWriter')
# port is only used for warc filename def __init__(self, options=warcprox.Options()):
def __init__(self, prefix='WARCPROX', directory='./warcs',
rollover_size=1000000000, gzip=False, port=0,
digest_algorithm='sha1', base32=False, rollover_idle_time=None,
options=warcprox.Options()):
self.rollover_size = rollover_size self.rollover_size = options.rollover_size or 1000000000
self.rollover_idle_time = rollover_idle_time self.rollover_idle_time = options.rollover_idle_time or None
self._last_activity = time.time() self._last_activity = time.time()
self.gzip = gzip self.gzip = options.gzip or False
digest_algorithm = options.digest_algorithm or 'sha1'
base32 = options.base32
self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32) self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32)
# warc path and filename stuff # warc path and filename stuff
self.directory = directory self.directory = options.directory or './warcs'
self.prefix = prefix self.prefix = options.prefix or 'warcprox'
self.port = port
self._f = None self._f = None
self._fpath = None self._fpath = None
@ -40,9 +37,9 @@ class WarcWriter:
self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8)) self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8))
if not os.path.exists(directory): if not os.path.exists(self.directory):
self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) self.logger.info("warc destination directory {} doesn't exist, creating it".format(self.directory))
os.mkdir(directory) os.mkdir(self.directory)
def timestamp17(self): def timestamp17(self):
now = datetime.utcnow() now = datetime.utcnow()
@ -115,11 +112,8 @@ class WarcWriter:
class WarcWriterPool: class WarcWriterPool:
logger = logging.getLogger("warcprox.writer.WarcWriterPool") logger = logging.getLogger("warcprox.writer.WarcWriterPool")
def __init__(self, default_warc_writer=None, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
if default_warc_writer: self.default_warc_writer = WarcWriter(options=options)
self.default_warc_writer = default_warc_writer
else:
self.default_warc_writer = WarcWriter(options=options)
self.warc_writers = {} # {prefix:WarcWriter} self.warc_writers = {} # {prefix:WarcWriter}
self._last_sync = time.time() self._last_sync = time.time()
self.options = options self.options = options
@ -129,10 +123,11 @@ class WarcWriterPool:
w = self.default_warc_writer w = self.default_warc_writer
if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta: if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta:
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
prefix = recorded_url.warcprox_meta["warc-prefix"] options = warcprox.Options(**vars(self.options))
if not prefix in self.warc_writers: options.prefix = recorded_url.warcprox_meta["warc-prefix"]
self.warc_writers[prefix] = WarcWriter(prefix=prefix, options=self.options) if not options.prefix in self.warc_writers:
w = self.warc_writers[prefix] self.warc_writers[options.prefix] = WarcWriter(options=options)
w = self.warc_writers[options.prefix]
return w return w
def write_records(self, recorded_url): def write_records(self, recorded_url):

View File

@ -34,18 +34,21 @@ class WarcWriterThread(threading.Thread):
self.dedup_db = dedup_db self.dedup_db = dedup_db
self.listeners = listeners self.listeners = listeners
self.options = options self.options = options
self.idle = None
def run(self): def run(self):
try: try:
while not self.stop.is_set(): while not self.stop.is_set():
try: try:
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self.idle = None
if self.dedup_db: if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db, warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
recorded_url, base32=self.options.base32) recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url) records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records) self._final_tasks(recorded_url, records)
except queue.Empty: except queue.Empty:
self.idle = time.time()
self.writer_pool.maybe_idle_rollover() self.writer_pool.maybe_idle_rollover()
self.logger.info('WarcWriterThread shutting down') self.logger.info('WarcWriterThread shutting down')