From 44a62111fb90c9c20f9fad285261d809678d2052 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 27 Aug 2015 20:09:21 +0000 Subject: [PATCH] support for deduplication buckets specified in warcprox-meta header {"captures-bucket":...,...} --- warcprox/bigtable.py | 24 +++-- warcprox/dedup.py | 42 +++++--- warcprox/main.py | 12 +-- warcprox/playback.py | 12 ++- warcprox/tests/test_warcprox.py | 168 +++++++++++++++++++++++++++----- warcprox/warcproxy.py | 10 +- warcprox/writer.py | 43 ++++---- warcprox/writerthread.py | 3 + 8 files changed, 226 insertions(+), 88 deletions(-) diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index f1494d6..aecb4ed 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -45,17 +45,14 @@ class RethinkCaptures: self.logger.info("creating rethinkdb table %s in database %s", repr(self.table), repr(self.db)) r.db(self.db).table_create(self.table, shards=self.shards, replicas=self.replicas).run(conn) r.db(self.db).table(self.table).index_create("abbr_canon_surt_timesamp", [r.row["abbr_canon_surt"], r.row["timestamp"]]).run(conn) - r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"]]).run(conn) - # r.dself.b(self.db).table_create(self.table, primary_key="canon_surt", shards=self.shards, replicas=self.replicas).run(conn) - # r.db(self.db).table(self.table).index_create("timestamp").run(conn) - # r.db(self.db).table(self.table).index_create("sha1base32").run(conn) + r.db(self.db).table(self.table).index_create("sha1_warc_type", [r.row["sha1base32"], r.row["warc_type"], r.row["bucket"]]).run(conn) - def find_response_by_digest(self, algo, raw_digest): + def find_response_by_digest(self, algo, raw_digest, bucket="__unspecified__"): if algo != "sha1": raise Exception("digest type is {} but big capture table is indexed by sha1".format(algo)) sha1base32 = base64.b32encode(raw_digest).decode("utf-8") with self._random_server_connection() as conn: - cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response"], index="sha1_warc_type").run(conn) + cursor = r.db(self.db).table(self.table).get_all([sha1base32, "response", bucket], index="sha1_warc_type").run(conn) results = list(cursor) if len(results) > 1: raise Exception("expected 0 or 1 but found %s results for sha1base32=%s", len(results), sha1base32) @@ -67,9 +64,17 @@ class RethinkCaptures: return result def notify(self, recorded_url, records): + if not recorded_url.response_recorder: + return + if recorded_url.response_recorder.payload_digest.name != "sha1": self.logger.warn("digest type is %s but big capture table is indexed by sha1", recorded_url.response_recorder.payload_digest.name) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + bucket = recorded_url.warcprox_meta["captures-bucket"] + else: + bucket = "__unspecified__" + canon_surt = surt.surt(recorded_url.url.decode("utf-8"), trailing_comma=True, host_massage=False) entry = { # id only specified for rethinkdb partitioning @@ -86,13 +91,14 @@ class RethinkCaptures: "content_type": recorded_url.content_type, "response_code": recorded_url.status, "http_method": recorded_url.method, + "bucket": bucket, } with self._random_server_connection() as conn: result = r.db(self.db).table(self.table).insert(entry).run(conn) if result["inserted"] == 1 and sorted(result.values()) != [0,0,0,0,0,1]: raise Exception("unexpected result %s saving %s", result, entry) - self.logger.info('big capture table db saved %s', entry) + self.logger.info("big capture table db saved %s", entry) class RethinkCapturesDedup: logger = logging.getLogger("warcprox.dedup.RethinkCapturesDedup") @@ -101,7 +107,7 @@ class RethinkCapturesDedup: self.captures_db = captures_db self.options = options - def lookup(self, digest_key): + def lookup(self, digest_key, bucket="__unspecified__"): k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key algo, value_str = k.split(":") self.logger.info("(algo,value_str)=(%s,%s)", algo, value_str) @@ -109,7 +115,7 @@ class RethinkCapturesDedup: raw_digest = base64.b32decode(value_str, casefold=True) else: raw_digest = base64.b16decode(value_str, casefold=True) - entry = self.captures_db.find_response_by_digest(algo, raw_digest) + entry = self.captures_db.find_response_by_digest(algo, raw_digest, bucket) if entry: dedup_info = {"url":entry["url"].encode("utf-8"), "date":entry["timestamp"].encode("utf-8"), "id":entry["warc_id"].encode("utf-8")} self.logger.info("returning %s for digest_key=%s", dedup_info, digest_key) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 44c5503..7148773 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -40,19 +40,22 @@ class DedupDb(object): except: pass - def save(self, key, response_record): + def save(self, digest_key, response_record, bucket=""): record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') + key = digest_key + b"|" + bucket.encode("utf-8") + py_value = {'id':record_id, 'url':url, 'date':date} json_value = json.dumps(py_value, separators=(',',':')) self.db[key] = json_value.encode('utf-8') self.logger.debug('dedup db saved %s:%s', key, json_value) - def lookup(self, key): + def lookup(self, digest_key, bucket=""): result = None + key = digest_key + b"|" + bucket.encode("utf-8") if key in self.db: json_result = self.db[key] result = json.loads(json_result.decode('utf-8')) @@ -65,15 +68,21 @@ class DedupDb(object): def notify(self, recorded_url, records): if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): - key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, + digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.options.base32) - self.save(key, records[0]) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) + else: + self.save(digest_key, records[0]) def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest: - key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) - recorded_url.dedup_info = dedup_db.lookup(key) + digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + recorded_url.dedup_info = dedup_db.lookup(digest_key, recorded_url.warcprox_meta["captures-bucket"]) + else: + recorded_url.dedup_info = dedup_db.lookup(digest_key) class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") @@ -114,8 +123,9 @@ class RethinkDedupDb: def sync(self): pass - def save(self, key, response_record): - k = key.decode("utf-8") if isinstance(key, bytes) else key + def save(self, digest_key, response_record, bucket=""): + k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key + k = "{}|{}".format(k, bucket) record_id = response_record.get_header(warctools.WarcRecord.ID).decode('latin1') url = response_record.get_header(warctools.WarcRecord.URL).decode('latin1') date = response_record.get_header(warctools.WarcRecord.DATE).decode('latin1') @@ -124,21 +134,25 @@ class RethinkDedupDb: result = r.db(self.db).table(self.table).insert(record,conflict="replace").run(conn) if sorted(result.values()) != [0,0,0,0,0,1] and [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) - self.logger.debug('dedup db saved %s:%s', key, record) + self.logger.debug('dedup db saved %s:%s', k, record) - def lookup(self, key): - k = key.decode("utf-8") if isinstance(key, bytes) else key + def lookup(self, digest_key, bucket=""): + k = digest_key.decode("utf-8") if isinstance(digest_key, bytes) else digest_key + k = "{}|{}".format(k, bucket) with self._random_server_connection() as conn: result = r.db(self.db).table(self.table).get(k).run(conn) if result: for x in result: result[x] = result[x].encode("utf-8") - self.logger.debug('dedup db lookup of key=%s returning %s', key, result) + self.logger.debug('dedup db lookup of key=%s returning %s', k, result) return result def notify(self, recorded_url, records): if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE and recorded_url.response_recorder.payload_size() > 0): - key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, + digest_key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, self.options.base32) - self.save(key, records[0]) + if recorded_url.warcprox_meta and "captures-bucket" in recorded_url.warcprox_meta: + self.save(digest_key, records[0], bucket=recorded_url.warcprox_meta["captures-bucket"]) + else: + self.save(digest_key, records[0]) diff --git a/warcprox/main.py b/warcprox/main.py index eb20db6..397f4db 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -43,10 +43,10 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('-n', '--prefix', dest='prefix', default='WARCPROX', help='WARC filename prefix') arg_parser.add_argument('-s', '--size', dest='size', - default=1000*1000*1000, + default=1000*1000*1000, type=int, help='WARC file rollover size threshold in bytes') arg_parser.add_argument('--rollover-idle-time', - dest='rollover_idle_time', default=None, + dest='rollover_idle_time', default=None, type=int, help="WARC file rollover idle time threshold in seconds (so that Friday's last open WARC doesn't sit there all weekend waiting for more data)") try: hash_algos = hashlib.algorithms_guaranteed @@ -150,10 +150,7 @@ def main(argv=sys.argv): ca = certauth.certauth.CertificateAuthority(args.cacert, args.certs_dir, ca_name=ca_name) - proxy = warcprox.warcproxy.WarcProxy( - server_address=(args.address, args.port), ca=ca, - recorded_url_q=recorded_url_q, - digest_algorithm=args.digest_algorithm, + proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q, stats_db=stats_db, options=options) if args.playback_port is not None: @@ -167,8 +164,7 @@ def main(argv=sys.argv): playback_index_db = None playback_proxy = None - default_warc_writer = warcprox.writer.WarcWriter(args.prefix, options=options) - writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer, options=options) + writer_pool = warcprox.writer.WarcWriterPool(options=options) warc_writer_thread = warcprox.writerthread.WarcWriterThread( recorded_url_q=recorded_url_q, writer_pool=writer_pool, dedup_db=dedup_db, listeners=listeners, options=options) diff --git a/warcprox/playback.py b/warcprox/playback.py index 3424337..30a5cb8 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -27,6 +27,7 @@ import json import traceback import re from warcprox.mitmproxy import MitmProxyHandler +import warcprox class PlaybackProxyHandler(MitmProxyHandler): logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") @@ -180,13 +181,14 @@ class PlaybackProxyHandler(MitmProxyHandler): class PlaybackProxy(socketserver.ThreadingMixIn, http_server.HTTPServer): logger = logging.getLogger("warcprox.playback.PlaybackProxy") - def __init__(self, server_address, req_handler_class=PlaybackProxyHandler, - bind_and_activate=True, ca=None, playback_index_db=None, - warcs_dir=None): - http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) + + def __init__(self, ca=None, playback_index_db=None, options=warcprox.Options()): + server_address = (options.address or 'localhost', options.playback_port if options.playback_port is not None else 8001) + http_server.HTTPServer.__init__(self, server_address, PlaybackProxyHandler, bind_and_activate=True) self.ca = ca self.playback_index_db = playback_index_db - self.warcs_dir = warcs_dir + self.warcs_dir = options.directory + self.options = options def server_activate(self): http_server.HTTPServer.server_activate(self) diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index 477ce6f..e588754 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -18,6 +18,7 @@ import json import rethinkdb r = rethinkdb import random +from hanzo import warctools try: import http.server as http_server @@ -166,11 +167,12 @@ def rethink_dedup_db(request, rethinkdb_servers, captures_db): ddb = warcprox.dedup.RethinkDedupDb(servers, db) def fin(): - if not captures_db: - logging.info('dropping rethinkdb database {}'.format(db)) - with ddb._random_server_connection() as conn: - result = r.db_drop(db).run(conn) - logging.info("result=%s", result) + if rethinkdb_servers: + if not captures_db: + logging.info('dropping rethinkdb database {}'.format(db)) + with ddb._random_server_connection() as conn: + result = r.db_drop(db).run(conn) + logging.info("result=%s", result) request.addfinalizer(fin) return ddb @@ -228,26 +230,27 @@ def warcprox_(request, captures_db, dedup_db, stats_db): recorded_url_q = queue.Queue() - proxy = warcprox.warcproxy.WarcProxy(server_address=('localhost', 0), ca=ca, - recorded_url_q=recorded_url_q, stats_db=stats_db) + options = warcprox.Options(port=0, playback_port=0) + proxy = warcprox.warcproxy.WarcProxy(ca=ca, recorded_url_q=recorded_url_q, + stats_db=stats_db, options=options) + options.port = proxy.server_port - warcs_dir = tempfile.mkdtemp(prefix='warcprox-test-warcs-') + options.directory = tempfile.mkdtemp(prefix='warcprox-test-warcs-') f = tempfile.NamedTemporaryFile(prefix='warcprox-test-playback-index-', suffix='.db', delete=False) f.close() playback_index_db_file = f.name playback_index_db = warcprox.playback.PlaybackIndexDb(playback_index_db_file) - playback_proxy = warcprox.playback.PlaybackProxy(server_address=('localhost', 0), ca=ca, - playback_index_db=playback_index_db, warcs_dir=warcs_dir) + playback_proxy = warcprox.playback.PlaybackProxy(ca=ca, + playback_index_db=playback_index_db, options=options) + options.playback_proxy = playback_proxy.server_port - default_warc_writer = warcprox.writer.WarcWriter(directory=warcs_dir, - port=proxy.server_port) - writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer) + writer_pool = warcprox.writer.WarcWriterPool(options) warc_writer_thread = warcprox.writerthread.WarcWriterThread( recorded_url_q=recorded_url_q, writer_pool=writer_pool, dedup_db=dedup_db, listeners=[captures_db or dedup_db, playback_index_db, stats_db]) - warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) + warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy, options) logging.info('starting warcprox') warcprox_thread = threading.Thread(name='WarcproxThread', target=warcprox_.run_until_shutdown) @@ -257,7 +260,7 @@ def warcprox_(request, captures_db, dedup_db, stats_db): logging.info('stopping warcprox') warcprox_.stop.set() warcprox_thread.join() - for f in (ca_file, ca_dir, warcs_dir, playback_index_db_file): + for f in (ca_file, ca_dir, options.directory, playback_index_db_file): if os.path.isdir(f): logging.info('deleting directory {}'.format(f)) shutil.rmtree(f) @@ -394,8 +397,9 @@ def test_dedup_http(http_daemon, warcprox_, archiving_proxies, playback_proxies) assert response.headers['warcprox-test-header'] == 'e!' assert response.content == b'I am the warcprox test payload! ffffffffff!\n' - # XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\ - time.sleep(2.0) + # wait for writer thread to process + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) # check in dedup db (no change from prev) dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:65e1216acfd220f0292715e74bd7a1ec35c99dfc') @@ -455,8 +459,9 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.headers['warcprox-test-header'] == 'g!' assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' - # XXX need to give warc writer thread a chance, and we don't have any change to poll for :-\ - time.sleep(2.0) + # wait for writer thread to process + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) # check in dedup db (no change from prev) dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:5b4efa64fdb308ec06ae56a9beba155a6f734b89') @@ -472,7 +477,7 @@ def test_dedup_https(https_daemon, warcprox_, archiving_proxies, playback_proxie assert response.content == b'I am the warcprox test payload! hhhhhhhhhh!\n' # XXX how to check dedup was used? -def test_limits(http_daemon, archiving_proxies): +def test_limits(http_daemon, warcprox_, archiving_proxies): url = 'http://localhost:{}/i/j'.format(http_daemon.server_port) request_meta = {"stats":{"buckets":["job1"]},"limits":{"job1.total.urls":10}} headers = {"Warcprox-Meta": json.dumps(request_meta)} @@ -483,8 +488,9 @@ def test_limits(http_daemon, archiving_proxies): assert response.headers['warcprox-test-header'] == 'i!' assert response.content == b'I am the warcprox test payload! jjjjjjjjjj!\n' - # XXX give warc writer thread a chance to update stats - time.sleep(2.0) + # wait for writer thread to process + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) response = requests.get(url, proxies=archiving_proxies, headers=headers, stream=True) assert response.status_code == 420 @@ -494,6 +500,124 @@ def test_limits(http_daemon, archiving_proxies): assert response.headers["content-type"] == "text/plain;charset=utf-8" assert response.raw.data == b"request rejected by warcprox: reached limit job1.total.urls=10\n" +def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies, playback_proxies): + url1 = 'http://localhost:{}/k/l'.format(http_daemon.server_port) + url2 = 'https://localhost:{}/k/l'.format(https_daemon.server_port) + + # archive url1 bucket_a + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for writer thread to process + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + + # check url1 in dedup db bucket_a + dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_a") + assert dedup_lookup['url'] == url1.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # check url1 not in dedup db bucket_b + dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") + assert dedup_lookup is None + + # archive url2 bucket_b + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} + response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for writer thread to process + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + + # check url2 in dedup db bucket_b + dedup_lookup = warcprox_.warc_writer_thread.dedup_db.lookup(b'sha1:bc3fac8847c9412f49d955e626fb58a76befbf81', bucket="bucket_b") + assert dedup_lookup['url'] == url2.encode('ascii') + assert re.match(br'^$', dedup_lookup['id']) + assert re.match(br'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$', dedup_lookup['date']) + record_id = dedup_lookup['id'] + dedup_date = dedup_lookup['date'] + + # archive url2 bucket_a + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_a"})} + response = requests.get(url2, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # archive url1 bucket_b + headers = {"Warcprox-Meta": json.dumps({"warc-prefix":"test_dedup_buckets","captures-bucket":"bucket_b"})} + response = requests.get(url1, proxies=archiving_proxies, verify=False, headers=headers) + assert response.status_code == 200 + assert response.headers['warcprox-test-header'] == 'k!' + assert response.content == b'I am the warcprox test payload! llllllllll!\n' + + # wait for writer thread to process + while not warcprox_.warc_writer_thread.idle: + time.sleep(0.5) + + # close the warc + assert warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] + writer = warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"] + warc_path = os.path.join(writer.directory, writer._f_finalname) + warcprox_.warc_writer_thread.writer_pool.warc_writers["test_dedup_buckets"].close_writer() + assert os.path.exists(warc_path) + + # read the warc + fh = warctools.ArchiveRecord.open_archive(warc_path) + record_iter = fh.read_records(limit=None, offsets=True) + try: + (offset, record, errors) = next(record_iter) + assert record.type == b'warcinfo' + + # url1 bucket_a + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url1.encode('ascii') + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # url2 bucket_b + (offset, record, errors) = next(record_iter) + assert record.type == b'response' + assert record.url == url2.encode('ascii') + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\nI am the warcprox test payload! llllllllll!\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # url2 bucket_a (revisit) + (offset, record, errors) = next(record_iter) + assert record.type == b'revisit' + assert record.url == url2.encode('ascii') + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # url1 bucket_b (revisit) + (offset, record, errors) = next(record_iter) + assert record.type == b'revisit' + assert record.url == url1.encode('ascii') + assert record.content[1] == b'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nwarcprox-test-header: k!\r\nContent-Length: 44\r\n\r\n' + (offset, record, errors) = next(record_iter) + assert record.type == b'request' + + # that's all folks + assert next(record_iter)[1] == None + assert next(record_iter, None) == None + + finally: + fh.close() + + if __name__ == '__main__': pytest.main() diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 84702dc..1b56e4b 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -349,13 +349,11 @@ class RecordedUrl: class SingleThreadedWarcProxy(http_server.HTTPServer): logger = logging.getLogger("warcprox.warcproxy.WarcProxy") - def __init__(self, server_address=('localhost', 8000), - req_handler_class=WarcProxyHandler, bind_and_activate=True, - ca=None, recorded_url_q=None, digest_algorithm='sha1', - stats_db=None, options=warcprox.Options()): - http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) + def __init__(self, ca=None, recorded_url_q=None, stats_db=None, options=warcprox.Options()): + server_address = (options.address or 'localhost', options.port if options.port is not None else 8000) + http_server.HTTPServer.__init__(self, server_address, WarcProxyHandler, bind_and_activate=True) - self.digest_algorithm = digest_algorithm + self.digest_algorithm = options.digest_algorithm or 'sha1' if ca is not None: self.ca = ca diff --git a/warcprox/writer.py b/warcprox/writer.py index 6f58809..21ae23f 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -13,25 +13,22 @@ import string import random class WarcWriter: - logger = logging.getLogger("warcprox.writer.WarcWriter") + logger = logging.getLogger('warcprox.writer.WarcWriter') - # port is only used for warc filename - def __init__(self, prefix='WARCPROX', directory='./warcs', - rollover_size=1000000000, gzip=False, port=0, - digest_algorithm='sha1', base32=False, rollover_idle_time=None, - options=warcprox.Options()): + def __init__(self, options=warcprox.Options()): - self.rollover_size = rollover_size - self.rollover_idle_time = rollover_idle_time + self.rollover_size = options.rollover_size or 1000000000 + self.rollover_idle_time = options.rollover_idle_time or None self._last_activity = time.time() - self.gzip = gzip + self.gzip = options.gzip or False + digest_algorithm = options.digest_algorithm or 'sha1' + base32 = options.base32 self.record_builder = warcprox.warc.WarcRecordBuilder(digest_algorithm=digest_algorithm, base32=base32) # warc path and filename stuff - self.directory = directory - self.prefix = prefix - self.port = port + self.directory = options.directory or './warcs' + self.prefix = options.prefix or 'warcprox' self._f = None self._fpath = None @@ -40,9 +37,9 @@ class WarcWriter: self._randomtoken = "".join(random.Random().sample(string.digits + string.ascii_lowercase, 8)) - if not os.path.exists(directory): - self.logger.info("warc destination directory {} doesn't exist, creating it".format(directory)) - os.mkdir(directory) + if not os.path.exists(self.directory): + self.logger.info("warc destination directory {} doesn't exist, creating it".format(self.directory)) + os.mkdir(self.directory) def timestamp17(self): now = datetime.utcnow() @@ -115,11 +112,8 @@ class WarcWriter: class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") - def __init__(self, default_warc_writer=None, options=warcprox.Options()): - if default_warc_writer: - self.default_warc_writer = default_warc_writer - else: - self.default_warc_writer = WarcWriter(options=options) + def __init__(self, options=warcprox.Options()): + self.default_warc_writer = WarcWriter(options=options) self.warc_writers = {} # {prefix:WarcWriter} self._last_sync = time.time() self.options = options @@ -129,10 +123,11 @@ class WarcWriterPool: w = self.default_warc_writer if recorded_url.warcprox_meta and "warc-prefix" in recorded_url.warcprox_meta: # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) - prefix = recorded_url.warcprox_meta["warc-prefix"] - if not prefix in self.warc_writers: - self.warc_writers[prefix] = WarcWriter(prefix=prefix, options=self.options) - w = self.warc_writers[prefix] + options = warcprox.Options(**vars(self.options)) + options.prefix = recorded_url.warcprox_meta["warc-prefix"] + if not options.prefix in self.warc_writers: + self.warc_writers[options.prefix] = WarcWriter(options=options) + w = self.warc_writers[options.prefix] return w def write_records(self, recorded_url): diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index a766f6c..182835f 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -34,18 +34,21 @@ class WarcWriterThread(threading.Thread): self.dedup_db = dedup_db self.listeners = listeners self.options = options + self.idle = None def run(self): try: while not self.stop.is_set(): try: recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) + self.idle = None if self.dedup_db: warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url, base32=self.options.base32) records = self.writer_pool.write_records(recorded_url) self._final_tasks(recorded_url, records) except queue.Empty: + self.idle = time.time() self.writer_pool.maybe_idle_rollover() self.logger.info('WarcWriterThread shutting down')