From c430f81883d740c978e805226958d08f3bf1ca20 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 24 Aug 2015 23:53:11 +0000 Subject: [PATCH] some refactoring to prep for big rethinkdb capture table --- requirements.txt | 4 +++ setup.py | 2 -- warcprox/__init__.py | 27 ++++++++++------ warcprox/bigtable.py | 17 ++++++++++ warcprox/controller.py | 3 +- warcprox/dedup.py | 22 +++++++++++-- warcprox/main.py | 55 +++++++++++++++++++-------------- warcprox/playback.py | 3 ++ warcprox/stats.py | 13 ++++++-- warcprox/tests/test_warcprox.py | 3 +- warcprox/warcproxy.py | 6 ++-- warcprox/writer.py | 25 +++++---------- warcprox/writerthread.py | 43 +++++--------------------- 13 files changed, 128 insertions(+), 95 deletions(-) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dcc1f62 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +certauth>=1.1.0 +rethinkdb +git+https://github.com/internetarchive/warctools.git +git+https://github.com/nlevitt/surt.git@py3 diff --git a/setup.py b/setup.py index e5b71d5..ab42452 100755 --- a/setup.py +++ b/setup.py @@ -47,8 +47,6 @@ setuptools.setup(name='warcprox', license='GPL', packages=['warcprox'], package_data={'warcprox':['version.txt']}, - install_requires=['certauth>=1.1.0', 'warctools>=4.8.3', 'rethinkdb'], # gdbm not in pip :( - dependency_links=['git+https://github.com/internetarchive/warctools.git#egg=warctools-4.8.3'], tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 cmdclass = {'test': PyTest}, test_suite='warcprox.tests', diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 7235056..4f8ad91 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -1,14 +1,6 @@ # vim:set sw=4 et: -import warcprox.controller as controller -import warcprox.playback as playback -import warcprox.dedup as dedup -import warcprox.warcproxy as warcproxy -import warcprox.mitmproxy as mitmproxy -import warcprox.writer as writer -import warcprox.warc as warc -import warcprox.writerthread as writerthread -import warcprox.stats as stats +from argparse import Namespace as _Namespace def digest_str(hash_obj, base32): import base64 @@ -20,5 +12,22 @@ def _read_version_bytes(): with open(version_txt, 'rb') as fin: return fin.read().strip() +class Options(_Namespace): + def __getattr__(self, name): + try: + return super(Options, self).__getattr__(self, name) + except AttributeError: + return None + version_bytes = _read_version_bytes().strip() version_str = version_bytes.decode('utf-8') + +import warcprox.controller as controller +import warcprox.playback as playback +import warcprox.dedup as dedup +import warcprox.warcproxy as warcproxy +import warcprox.mitmproxy as mitmproxy +import warcprox.writer as writer +import warcprox.warc as warc +import warcprox.writerthread as writerthread +import warcprox.stats as stats diff --git a/warcprox/bigtable.py b/warcprox/bigtable.py index 60be77a..787aa9b 100644 --- a/warcprox/bigtable.py +++ b/warcprox/bigtable.py @@ -45,3 +45,20 @@ class RethinkCaptures: # r.db(self.db).table(self.table).index_create("timestamp").run(conn) # r.db(self.db).table(self.table).index_create("sha1base32").run(conn) + def notify(self, recorded_url, records): + canon_surt = surt.surt(recorded_url.url, trailing_comma=True, host_massage=False) + entry = { + # id only specified for rethinkdb partitioning + "id": "{} {}".format(canon_surt[:20], record.id.decode("utf-8")[10:-1]), + "abbr_canon_surt": canon_surt[:150], + "timestamp": re.sub(r"[^0-9]", "", record.date.decode("utf-8")), + "url": record.url.decode("utf-8"), + "offset": offset, + "filename": os.path.basename(warc_file), + "warc_type": record.type.decode("utf-8"), + "warc_id": record.id.decode("utf-8"), + "sha1base32": record.get_header(b'WARC-Payload-Digest').decode("utf-8")[5:], + # mimetype + # response_code + # http_method + } diff --git a/warcprox/controller.py b/warcprox/controller.py index ba73859..e198006 100644 --- a/warcprox/controller.py +++ b/warcprox/controller.py @@ -10,7 +10,7 @@ import warcprox class WarcproxController(object): logger = logging.getLogger("warcprox.controller.WarcproxController") - def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None): + def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None, options=warcprox.Options()): """ Create warcprox controller. @@ -32,6 +32,7 @@ class WarcproxController(object): self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) self.playback_proxy = playback_proxy + self.options = options def run_until_shutdown(self): """ diff --git a/warcprox/dedup.py b/warcprox/dedup.py index a715b01..adf2c44 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -22,13 +22,14 @@ import random class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") - def __init__(self, dbm_file='./warcprox-dedup.db'): + def __init__(self, dbm_file='./warcprox-dedup.db', options=warcprox.Options()): if os.path.exists(dbm_file): self.logger.info('opening existing deduplication database {}'.format(dbm_file)) else: self.logger.info('creating new deduplication database {}'.format(dbm_file)) self.db = dbm_gnu.open(dbm_file, 'c') + self.options = options def close(self): self.db.close() @@ -61,6 +62,15 @@ class DedupDb(object): self.logger.debug('dedup db lookup of key=%s returning %s', key, result) return result + def notify(self, recorded_url, records): + if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, + self.options.base32) + self.save(key, records[0]) + + + def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest: key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) @@ -69,13 +79,14 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): class RethinkDedupDb: logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") - def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3): + def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()): self.servers = servers self.db = db self.table = table self.shards = shards self.replicas = replicas self._ensure_db_table() + self.options = options # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py # "Best practices: Managing connections: a connection per request" @@ -125,3 +136,10 @@ class RethinkDedupDb: result[x] = result[x].encode("utf-8") self.logger.debug('dedup db lookup of key=%s returning %s', key, result) return result + + def notify(self, recorded_url, records): + if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE + and recorded_url.response_recorder.payload_size() > 0): + key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, + self.options.base32) + self.save(key, records[0]) diff --git a/warcprox/main.py b/warcprox/main.py index e7bfee2..3a2d032 100644 --- a/warcprox/main.py +++ b/warcprox/main.py @@ -27,7 +27,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): description='warcprox - WARC writing MITM HTTP/S proxy', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('-p', '--port', dest='port', default='8000', - help='port to listen on') + type=int, help='port to listen on') arg_parser.add_argument('-b', '--address', dest='address', default='localhost', help='address to listen on') arg_parser.add_argument('-c', '--cacert', dest='cacert', @@ -59,7 +59,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): arg_parser.add_argument('--stats-db-file', dest='stats_db_file', default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') arg_parser.add_argument('-P', '--playback-port', dest='playback_port', - default=None, help='port to listen on for instant playback') + type=int, default=None, help='port to listen on for instant playback') arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', default='./warcprox-playback-index.db', help='playback index database file (only used if --playback-port is specified)') @@ -70,6 +70,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])): help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox", help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') + arg_parser.add_argument('--rethinkdb-big-table', + dest='rethinkdb_big_table', action='store_true', default=False, + help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)') arg_parser.add_argument('--version', action='version', version="warcprox {}".format(warcprox.version_str)) arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') @@ -97,6 +100,7 @@ def dump_state(signum=None, frame=None): def main(argv=sys.argv): arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) args = arg_parser.parse_args(args=argv[1:]) + options = warcprox.Options(**vars(args)) if args.verbose: loglevel = logging.DEBUG @@ -114,22 +118,31 @@ def main(argv=sys.argv): logging.fatal(e) exit(1) + listeners = [] if args.rethinkdb_servers: - dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db) + if args.rethinkdb_big_table: + captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) + dedup_db = warcprox.bigtable.RethinkCapturesDedup(bigtable, options=options) + listeners.append(captures_db) + else: + dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) + listeners.append(dedup_db) elif args.dedup_db_file in (None, '', '/dev/null'): logging.info('deduplication disabled') dedup_db = None else: - dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file) - + dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options) + listeners.append(dedup_db) if args.rethinkdb_servers: - stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db) + stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options) + listeners.append(stats_db) elif args.stats_db_file in (None, '', '/dev/null'): logging.info('statistics tracking disabled') stats_db = None else: - stats_db = warcprox.stats.StatsDb(args.stats_db_file) + stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options) + listeners.append(stats_db) recorded_url_q = queue.Queue() @@ -138,33 +151,29 @@ def main(argv=sys.argv): ca_name=ca_name) proxy = warcprox.warcproxy.WarcProxy( - server_address=(args.address, int(args.port)), ca=ca, + server_address=(args.address, args.port), ca=ca, recorded_url_q=recorded_url_q, digest_algorithm=args.digest_algorithm, - stats_db=stats_db) + stats_db=stats_db, options=options) if args.playback_port is not None: - playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file) - playback_server_address=(args.address, int(args.playback_port)) - playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address, - ca=ca, playback_index_db=playback_index_db, - warcs_dir=args.directory) + playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file, options=options) + playback_proxy = warcprox.playback.PlaybackProxy( + server_address=(args.address, args.playback_port), ca=ca, + playback_index_db=playback_index_db, warcs_dir=args.directory, + options=options) + listeners.append(playback_index_db) else: playback_index_db = None playback_proxy = None - default_warc_writer = warcprox.writer.WarcWriter(directory=args.directory, - gzip=args.gzip, prefix=args.prefix, port=int(args.port), - rollover_size=int(args.size), base32=args.base32, - digest_algorithm=args.digest_algorithm, - rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None) - writer_pool=warcprox.writer.WarcWriterPool(default_warc_writer) + default_warc_writer = warcprox.writer.WarcWriter(args.prefix, options=options) + writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer, options=options) warc_writer_thread = warcprox.writerthread.WarcWriterThread( recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, playback_index_db=playback_index_db, - stats_db=stats_db) + dedup_db=dedup_db, listeners=listeners, options=options) - controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) + controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy, options=options) signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) diff --git a/warcprox/playback.py b/warcprox/playback.py index 9fae6e1..3424337 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -217,6 +217,9 @@ class PlaybackIndexDb(object): except: pass + def notify(self, recorded_url, records): + self.save(records[0].warc_filename, records, records[0].offset) + def save(self, warcfile, recordset, offset): response_record = recordset[0] # XXX canonicalize url? diff --git a/warcprox/stats.py b/warcprox/stats.py index 38dabd6..d246d69 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -17,6 +17,7 @@ from hanzo import warctools import rethinkdb r = rethinkdb import random +import warcprox def _empty_bucket(bucket): return { @@ -41,13 +42,14 @@ def _empty_bucket(bucket): class StatsDb: logger = logging.getLogger("warcprox.stats.StatsDb") - def __init__(self, dbm_file='./warcprox-stats.db'): + def __init__(self, dbm_file='./warcprox-stats.db', options=warcprox.Options()): if os.path.exists(dbm_file): self.logger.info('opening existing stats database {}'.format(dbm_file)) else: self.logger.info('creating new stats database {}'.format(dbm_file)) self.db = dbm_gnu.open(dbm_file, 'c') + self.options = options def close(self): self.db.close() @@ -71,6 +73,9 @@ class StatsDb: else: return None + def notify(self, recorded_url, records): + self.tally(recorded_url, records) + def tally(self, recorded_url, records): buckets = ["__all__"] @@ -102,13 +107,14 @@ class StatsDb: class RethinkStatsDb: logger = logging.getLogger("warcprox.stats.RethinkStatsDb") - def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3): + def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()): self.servers = servers self.db = db self.table = table self.shards = shards self.replicas = replicas self._ensure_db_table() + self.options = options # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py # "Best practices: Managing connections: a connection per request" @@ -179,3 +185,6 @@ class RethinkStatsDb: if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: raise Exception("unexpected result %s saving %s", result, record) + def notify(self, recorded_url, records): + self.tally(recorded_url, records) + diff --git a/warcprox/tests/test_warcprox.py b/warcprox/tests/test_warcprox.py index 2489807..7477d05 100755 --- a/warcprox/tests/test_warcprox.py +++ b/warcprox/tests/test_warcprox.py @@ -208,8 +208,7 @@ def warcprox_(request, dedup_db, stats_db): writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer) warc_writer_thread = warcprox.writerthread.WarcWriterThread( recorded_url_q=recorded_url_q, writer_pool=writer_pool, - dedup_db=dedup_db, playback_index_db=playback_index_db, - stats_db=stats_db) + dedup_db=dedup_db, listeners=[dedup_db, playback_index_db, stats_db]) warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) logging.info('starting warcprox') diff --git a/warcprox/warcproxy.py b/warcprox/warcproxy.py index 21c72b8..ee3de49 100644 --- a/warcprox/warcproxy.py +++ b/warcprox/warcproxy.py @@ -39,7 +39,7 @@ import socket from hanzo import warctools from certauth.certauth import CertificateAuthority -import warcprox.mitmproxy +import warcprox class ProxyingRecorder(object): """ @@ -349,7 +349,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): def __init__(self, server_address=('localhost', 8000), req_handler_class=WarcProxyHandler, bind_and_activate=True, ca=None, recorded_url_q=None, digest_algorithm='sha1', - stats_db=None): + stats_db=None, options=warcprox.Options()): http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) self.digest_algorithm = digest_algorithm @@ -369,6 +369,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer): self.stats_db = stats_db + self.options = options + def server_activate(self): http_server.HTTPServer.server_activate(self) self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) diff --git a/warcprox/writer.py b/warcprox/writer.py index c8c1b44..6f58809 100644 --- a/warcprox/writer.py +++ b/warcprox/writer.py @@ -16,9 +16,10 @@ class WarcWriter: logger = logging.getLogger("warcprox.writer.WarcWriter") # port is only used for warc filename - def __init__(self, directory='./warcs', rollover_size=1000000000, - gzip=False, prefix='WARCPROX', port=0, digest_algorithm='sha1', - base32=False, rollover_idle_time=None): + def __init__(self, prefix='WARCPROX', directory='./warcs', + rollover_size=1000000000, gzip=False, port=0, + digest_algorithm='sha1', base32=False, rollover_idle_time=None, + options=warcprox.Options()): self.rollover_size = rollover_size self.rollover_idle_time = rollover_idle_time @@ -114,17 +115,14 @@ class WarcWriter: class WarcWriterPool: logger = logging.getLogger("warcprox.writer.WarcWriterPool") - def __init__(self, default_warc_writer=None): + def __init__(self, default_warc_writer=None, options=warcprox.Options()): if default_warc_writer: self.default_warc_writer = default_warc_writer else: - self.default_warc_writer = WarcWriter() + self.default_warc_writer = WarcWriter(options=options) self.warc_writers = {} # {prefix:WarcWriter} self._last_sync = time.time() - - self.logger.info('directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format( - os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size, - self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port)) + self.options = options # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set def _writer(self, recorded_url): @@ -133,14 +131,7 @@ class WarcWriterPool: # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) prefix = recorded_url.warcprox_meta["warc-prefix"] if not prefix in self.warc_writers: - self.warc_writers[prefix] = WarcWriter(prefix=prefix, - directory=self.default_warc_writer.directory, - rollover_size=self.default_warc_writer.rollover_size, - rollover_idle_time=self.default_warc_writer.rollover_idle_time, - gzip=self.default_warc_writer.gzip, - port=self.default_warc_writer.port, - digest_algorithm=self.default_warc_writer.record_builder.digest_algorithm, - base32=self.default_warc_writer.record_builder.base32) + self.warc_writers[prefix] = WarcWriter(prefix=prefix, options=self.options) w = self.warc_writers[prefix] return w diff --git a/warcprox/writerthread.py b/warcprox/writerthread.py index df70d63..a766f6c 100644 --- a/warcprox/writerthread.py +++ b/warcprox/writerthread.py @@ -22,7 +22,7 @@ import warcprox class WarcWriterThread(threading.Thread): logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") - def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, playback_index_db=None, stats_db=None): + def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, listeners=None, options=warcprox.Options()): """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" threading.Thread.__init__(self, name='WarcWriterThread') self.recorded_url_q = recorded_url_q @@ -32,9 +32,8 @@ class WarcWriterThread(threading.Thread): else: self.writer_pool = WarcWriterPool() self.dedup_db = dedup_db - self.playback_index_db = playback_index_db - self.stats_db = stats_db - self._last_sync = time.time() + self.listeners = listeners + self.options = options def run(self): try: @@ -42,40 +41,18 @@ class WarcWriterThread(threading.Thread): try: recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) if self.dedup_db: - warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url, - base32=self.writer_pool.default_warc_writer.record_builder.base32) + warcprox.dedup.decorate_with_dedup_info(self.dedup_db, + recorded_url, base32=self.options.base32) records = self.writer_pool.write_records(recorded_url) self._final_tasks(recorded_url, records) except queue.Empty: self.writer_pool.maybe_idle_rollover() - self._sync() self.logger.info('WarcWriterThread shutting down') self.writer_pool.close_writers() except: self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True) - def _sync(self): - # XXX prob doesn't belong here (do we need it at all?) - if time.time() - self._last_sync > 60: - if self.dedup_db: - self.dedup_db.sync() - if self.playback_index_db: - self.playback_index_db.sync() - self._last_sync = time.time() - - def _save_dedup_info(self, recorded_url, records): - if (self.dedup_db - and records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE - and recorded_url.response_recorder.payload_size() > 0): - key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, - self.writer_pool.default_warc_writer.record_builder.base32) - self.dedup_db.save(key, records[0]) - - def _save_playback_info(self, recorded_url, records): - if self.playback_index_db is not None: - self.playback_index_db.save(records[0].warc_filename, records, records[0].offset) - # closest thing we have to heritrix crawl log at the moment def _log(self, recorded_url, records): def _decode(x): @@ -107,12 +84,8 @@ class WarcWriterThread(threading.Thread): _decode(records[0].warc_filename), records[0].offset)) - def _update_stats(self, recorded_url, records): - if self.stats_db: - self.stats_db.tally(recorded_url, records) - def _final_tasks(self, recorded_url, records): - self._save_dedup_info(recorded_url, records) - self._save_playback_info(recorded_url, records) - self._update_stats(recorded_url, records) + if self.listeners: + for listener in self.listeners: + listener.notify(recorded_url, records) self._log(recorded_url, records)