some refactoring to prep for big rethinkdb capture table

This commit is contained in:
Noah Levitt 2015-08-24 23:53:11 +00:00
parent cc71c331a1
commit c430f81883
13 changed files with 128 additions and 95 deletions

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
certauth>=1.1.0
rethinkdb
git+https://github.com/internetarchive/warctools.git
git+https://github.com/nlevitt/surt.git@py3

View File

@ -47,8 +47,6 @@ setuptools.setup(name='warcprox',
license='GPL', license='GPL',
packages=['warcprox'], packages=['warcprox'],
package_data={'warcprox':['version.txt']}, package_data={'warcprox':['version.txt']},
install_requires=['certauth>=1.1.0', 'warctools>=4.8.3', 'rethinkdb'], # gdbm not in pip :(
dependency_links=['git+https://github.com/internetarchive/warctools.git#egg=warctools-4.8.3'],
tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636 tests_require=['requests>=2.0.1', 'pytest'], # >=2.0.1 for https://github.com/kennethreitz/requests/pull/1636
cmdclass = {'test': PyTest}, cmdclass = {'test': PyTest},
test_suite='warcprox.tests', test_suite='warcprox.tests',

View File

@ -1,14 +1,6 @@
# vim:set sw=4 et: # vim:set sw=4 et:
import warcprox.controller as controller from argparse import Namespace as _Namespace
import warcprox.playback as playback
import warcprox.dedup as dedup
import warcprox.warcproxy as warcproxy
import warcprox.mitmproxy as mitmproxy
import warcprox.writer as writer
import warcprox.warc as warc
import warcprox.writerthread as writerthread
import warcprox.stats as stats
def digest_str(hash_obj, base32): def digest_str(hash_obj, base32):
import base64 import base64
@ -20,5 +12,22 @@ def _read_version_bytes():
with open(version_txt, 'rb') as fin: with open(version_txt, 'rb') as fin:
return fin.read().strip() return fin.read().strip()
class Options(_Namespace):
def __getattr__(self, name):
try:
return super(Options, self).__getattr__(self, name)
except AttributeError:
return None
version_bytes = _read_version_bytes().strip() version_bytes = _read_version_bytes().strip()
version_str = version_bytes.decode('utf-8') version_str = version_bytes.decode('utf-8')
import warcprox.controller as controller
import warcprox.playback as playback
import warcprox.dedup as dedup
import warcprox.warcproxy as warcproxy
import warcprox.mitmproxy as mitmproxy
import warcprox.writer as writer
import warcprox.warc as warc
import warcprox.writerthread as writerthread
import warcprox.stats as stats

View File

@ -45,3 +45,20 @@ class RethinkCaptures:
# r.db(self.db).table(self.table).index_create("timestamp").run(conn) # r.db(self.db).table(self.table).index_create("timestamp").run(conn)
# r.db(self.db).table(self.table).index_create("sha1base32").run(conn) # r.db(self.db).table(self.table).index_create("sha1base32").run(conn)
def notify(self, recorded_url, records):
canon_surt = surt.surt(recorded_url.url, trailing_comma=True, host_massage=False)
entry = {
# id only specified for rethinkdb partitioning
"id": "{} {}".format(canon_surt[:20], record.id.decode("utf-8")[10:-1]),
"abbr_canon_surt": canon_surt[:150],
"timestamp": re.sub(r"[^0-9]", "", record.date.decode("utf-8")),
"url": record.url.decode("utf-8"),
"offset": offset,
"filename": os.path.basename(warc_file),
"warc_type": record.type.decode("utf-8"),
"warc_id": record.id.decode("utf-8"),
"sha1base32": record.get_header(b'WARC-Payload-Digest').decode("utf-8")[5:],
# mimetype
# response_code
# http_method
}

View File

@ -10,7 +10,7 @@ import warcprox
class WarcproxController(object): class WarcproxController(object):
logger = logging.getLogger("warcprox.controller.WarcproxController") logger = logging.getLogger("warcprox.controller.WarcproxController")
def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None): def __init__(self, proxy=None, warc_writer_thread=None, playback_proxy=None, options=warcprox.Options()):
""" """
Create warcprox controller. Create warcprox controller.
@ -32,6 +32,7 @@ class WarcproxController(object):
self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q) self.warc_writer_thread = warcprox.warcwriter.WarcWriterThread(recorded_url_q=self.proxy.recorded_url_q)
self.playback_proxy = playback_proxy self.playback_proxy = playback_proxy
self.options = options
def run_until_shutdown(self): def run_until_shutdown(self):
""" """

View File

@ -22,13 +22,14 @@ import random
class DedupDb(object): class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb") logger = logging.getLogger("warcprox.dedup.DedupDb")
def __init__(self, dbm_file='./warcprox-dedup.db'): def __init__(self, dbm_file='./warcprox-dedup.db', options=warcprox.Options()):
if os.path.exists(dbm_file): if os.path.exists(dbm_file):
self.logger.info('opening existing deduplication database {}'.format(dbm_file)) self.logger.info('opening existing deduplication database {}'.format(dbm_file))
else: else:
self.logger.info('creating new deduplication database {}'.format(dbm_file)) self.logger.info('creating new deduplication database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c') self.db = dbm_gnu.open(dbm_file, 'c')
self.options = options
def close(self): def close(self):
self.db.close() self.db.close()
@ -61,6 +62,15 @@ class DedupDb(object):
self.logger.debug('dedup db lookup of key=%s returning %s', key, result) self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
return result return result
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.options.base32)
self.save(key, records[0])
def decorate_with_dedup_info(dedup_db, recorded_url, base32=False): def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest: if recorded_url.response_recorder and recorded_url.response_recorder.payload_digest:
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32) key = warcprox.digest_str(recorded_url.response_recorder.payload_digest, base32)
@ -69,13 +79,14 @@ def decorate_with_dedup_info(dedup_db, recorded_url, base32=False):
class RethinkDedupDb: class RethinkDedupDb:
logger = logging.getLogger("warcprox.dedup.RethinkDedupDb") logger = logging.getLogger("warcprox.dedup.RethinkDedupDb")
def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3): def __init__(self, servers=["localhost"], db="warcprox", table="dedup", shards=3, replicas=3, options=warcprox.Options()):
self.servers = servers self.servers = servers
self.db = db self.db = db
self.table = table self.table = table
self.shards = shards self.shards = shards
self.replicas = replicas self.replicas = replicas
self._ensure_db_table() self._ensure_db_table()
self.options = options
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
# "Best practices: Managing connections: a connection per request" # "Best practices: Managing connections: a connection per request"
@ -125,3 +136,10 @@ class RethinkDedupDb:
result[x] = result[x].encode("utf-8") result[x] = result[x].encode("utf-8")
self.logger.debug('dedup db lookup of key=%s returning %s', key, result) self.logger.debug('dedup db lookup of key=%s returning %s', key, result)
return result return result
def notify(self, recorded_url, records):
if (records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.options.base32)
self.save(key, records[0])

View File

@ -27,7 +27,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
description='warcprox - WARC writing MITM HTTP/S proxy', description='warcprox - WARC writing MITM HTTP/S proxy',
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-p', '--port', dest='port', default='8000', arg_parser.add_argument('-p', '--port', dest='port', default='8000',
help='port to listen on') type=int, help='port to listen on')
arg_parser.add_argument('-b', '--address', dest='address', arg_parser.add_argument('-b', '--address', dest='address',
default='localhost', help='address to listen on') default='localhost', help='address to listen on')
arg_parser.add_argument('-c', '--cacert', dest='cacert', arg_parser.add_argument('-c', '--cacert', dest='cacert',
@ -59,7 +59,7 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser.add_argument('--stats-db-file', dest='stats_db_file', arg_parser.add_argument('--stats-db-file', dest='stats_db_file',
default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking') default='./warcprox-stats.db', help='persistent statistics database file; empty string or /dev/null disables statistics tracking')
arg_parser.add_argument('-P', '--playback-port', dest='playback_port', arg_parser.add_argument('-P', '--playback-port', dest='playback_port',
default=None, help='port to listen on for instant playback') type=int, default=None, help='port to listen on for instant playback')
arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file', arg_parser.add_argument('--playback-index-db-file', dest='playback_index_db_file',
default='./warcprox-playback-index.db', default='./warcprox-playback-index.db',
help='playback index database file (only used if --playback-port is specified)') help='playback index database file (only used if --playback-port is specified)')
@ -70,6 +70,9 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') help='rethinkdb servers, used for dedup and stats if specified; e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox", arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="warcprox",
help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)') help='rethinkdb database name (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--rethinkdb-big-table',
dest='rethinkdb_big_table', action='store_true', default=False,
help='use a big rethinkdb table called "captures", instead of a small table called "dedup"; table is suitable for use as index for playback (ignored unless --rethinkdb-servers is specified)')
arg_parser.add_argument('--version', action='version', arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.version_str)) version="warcprox {}".format(warcprox.version_str))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true') arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
@ -97,6 +100,7 @@ def dump_state(signum=None, frame=None):
def main(argv=sys.argv): def main(argv=sys.argv):
arg_parser = _build_arg_parser(prog=os.path.basename(argv[0])) arg_parser = _build_arg_parser(prog=os.path.basename(argv[0]))
args = arg_parser.parse_args(args=argv[1:]) args = arg_parser.parse_args(args=argv[1:])
options = warcprox.Options(**vars(args))
if args.verbose: if args.verbose:
loglevel = logging.DEBUG loglevel = logging.DEBUG
@ -114,22 +118,31 @@ def main(argv=sys.argv):
logging.fatal(e) logging.fatal(e)
exit(1) exit(1)
listeners = []
if args.rethinkdb_servers: if args.rethinkdb_servers:
dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db) if args.rethinkdb_big_table:
captures_db = warcprox.bigtable.RethinkCaptures(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
dedup_db = warcprox.bigtable.RethinkCapturesDedup(bigtable, options=options)
listeners.append(captures_db)
else:
dedup_db = warcprox.dedup.RethinkDedupDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
listeners.append(dedup_db)
elif args.dedup_db_file in (None, '', '/dev/null'): elif args.dedup_db_file in (None, '', '/dev/null'):
logging.info('deduplication disabled') logging.info('deduplication disabled')
dedup_db = None dedup_db = None
else: else:
dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file) dedup_db = warcprox.dedup.DedupDb(args.dedup_db_file, options=options)
listeners.append(dedup_db)
if args.rethinkdb_servers: if args.rethinkdb_servers:
stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db) stats_db = warcprox.stats.RethinkStatsDb(args.rethinkdb_servers.split(","), args.rethinkdb_db, options=options)
listeners.append(stats_db)
elif args.stats_db_file in (None, '', '/dev/null'): elif args.stats_db_file in (None, '', '/dev/null'):
logging.info('statistics tracking disabled') logging.info('statistics tracking disabled')
stats_db = None stats_db = None
else: else:
stats_db = warcprox.stats.StatsDb(args.stats_db_file) stats_db = warcprox.stats.StatsDb(args.stats_db_file, options=options)
listeners.append(stats_db)
recorded_url_q = queue.Queue() recorded_url_q = queue.Queue()
@ -138,33 +151,29 @@ def main(argv=sys.argv):
ca_name=ca_name) ca_name=ca_name)
proxy = warcprox.warcproxy.WarcProxy( proxy = warcprox.warcproxy.WarcProxy(
server_address=(args.address, int(args.port)), ca=ca, server_address=(args.address, args.port), ca=ca,
recorded_url_q=recorded_url_q, recorded_url_q=recorded_url_q,
digest_algorithm=args.digest_algorithm, digest_algorithm=args.digest_algorithm,
stats_db=stats_db) stats_db=stats_db, options=options)
if args.playback_port is not None: if args.playback_port is not None:
playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file) playback_index_db = warcprox.playback.PlaybackIndexDb(args.playback_index_db_file, options=options)
playback_server_address=(args.address, int(args.playback_port)) playback_proxy = warcprox.playback.PlaybackProxy(
playback_proxy = warcprox.playback.PlaybackProxy(server_address=playback_server_address, server_address=(args.address, args.playback_port), ca=ca,
ca=ca, playback_index_db=playback_index_db, playback_index_db=playback_index_db, warcs_dir=args.directory,
warcs_dir=args.directory) options=options)
listeners.append(playback_index_db)
else: else:
playback_index_db = None playback_index_db = None
playback_proxy = None playback_proxy = None
default_warc_writer = warcprox.writer.WarcWriter(directory=args.directory, default_warc_writer = warcprox.writer.WarcWriter(args.prefix, options=options)
gzip=args.gzip, prefix=args.prefix, port=int(args.port), writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer, options=options)
rollover_size=int(args.size), base32=args.base32,
digest_algorithm=args.digest_algorithm,
rollover_idle_time=int(args.rollover_idle_time) if args.rollover_idle_time is not None else None)
writer_pool=warcprox.writer.WarcWriterPool(default_warc_writer)
warc_writer_thread = warcprox.writerthread.WarcWriterThread( warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool, recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, playback_index_db=playback_index_db, dedup_db=dedup_db, listeners=listeners, options=options)
stats_db=stats_db)
controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) controller = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy, options=options)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())

View File

@ -217,6 +217,9 @@ class PlaybackIndexDb(object):
except: except:
pass pass
def notify(self, recorded_url, records):
self.save(records[0].warc_filename, records, records[0].offset)
def save(self, warcfile, recordset, offset): def save(self, warcfile, recordset, offset):
response_record = recordset[0] response_record = recordset[0]
# XXX canonicalize url? # XXX canonicalize url?

View File

@ -17,6 +17,7 @@ from hanzo import warctools
import rethinkdb import rethinkdb
r = rethinkdb r = rethinkdb
import random import random
import warcprox
def _empty_bucket(bucket): def _empty_bucket(bucket):
return { return {
@ -41,13 +42,14 @@ def _empty_bucket(bucket):
class StatsDb: class StatsDb:
logger = logging.getLogger("warcprox.stats.StatsDb") logger = logging.getLogger("warcprox.stats.StatsDb")
def __init__(self, dbm_file='./warcprox-stats.db'): def __init__(self, dbm_file='./warcprox-stats.db', options=warcprox.Options()):
if os.path.exists(dbm_file): if os.path.exists(dbm_file):
self.logger.info('opening existing stats database {}'.format(dbm_file)) self.logger.info('opening existing stats database {}'.format(dbm_file))
else: else:
self.logger.info('creating new stats database {}'.format(dbm_file)) self.logger.info('creating new stats database {}'.format(dbm_file))
self.db = dbm_gnu.open(dbm_file, 'c') self.db = dbm_gnu.open(dbm_file, 'c')
self.options = options
def close(self): def close(self):
self.db.close() self.db.close()
@ -71,6 +73,9 @@ class StatsDb:
else: else:
return None return None
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
def tally(self, recorded_url, records): def tally(self, recorded_url, records):
buckets = ["__all__"] buckets = ["__all__"]
@ -102,13 +107,14 @@ class StatsDb:
class RethinkStatsDb: class RethinkStatsDb:
logger = logging.getLogger("warcprox.stats.RethinkStatsDb") logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3): def __init__(self, servers=["localhost"], db="warcprox", table="stats", shards=3, replicas=3, options=warcprox.Options()):
self.servers = servers self.servers = servers
self.db = db self.db = db
self.table = table self.table = table
self.shards = shards self.shards = shards
self.replicas = replicas self.replicas = replicas
self._ensure_db_table() self._ensure_db_table()
self.options = options
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
# "Best practices: Managing connections: a connection per request" # "Best practices: Managing connections: a connection per request"
@ -179,3 +185,6 @@ class RethinkStatsDb:
if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]: if sorted(result.values()) != [0,0,0,0,0,1] or [result["deleted"],result["skipped"],result["errors"]] != [0,0,0]:
raise Exception("unexpected result %s saving %s", result, record) raise Exception("unexpected result %s saving %s", result, record)
def notify(self, recorded_url, records):
self.tally(recorded_url, records)

View File

@ -208,8 +208,7 @@ def warcprox_(request, dedup_db, stats_db):
writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer) writer_pool = warcprox.writer.WarcWriterPool(default_warc_writer)
warc_writer_thread = warcprox.writerthread.WarcWriterThread( warc_writer_thread = warcprox.writerthread.WarcWriterThread(
recorded_url_q=recorded_url_q, writer_pool=writer_pool, recorded_url_q=recorded_url_q, writer_pool=writer_pool,
dedup_db=dedup_db, playback_index_db=playback_index_db, dedup_db=dedup_db, listeners=[dedup_db, playback_index_db, stats_db])
stats_db=stats_db)
warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy) warcprox_ = warcprox.controller.WarcproxController(proxy, warc_writer_thread, playback_proxy)
logging.info('starting warcprox') logging.info('starting warcprox')

View File

@ -39,7 +39,7 @@ import socket
from hanzo import warctools from hanzo import warctools
from certauth.certauth import CertificateAuthority from certauth.certauth import CertificateAuthority
import warcprox.mitmproxy import warcprox
class ProxyingRecorder(object): class ProxyingRecorder(object):
""" """
@ -349,7 +349,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer):
def __init__(self, server_address=('localhost', 8000), def __init__(self, server_address=('localhost', 8000),
req_handler_class=WarcProxyHandler, bind_and_activate=True, req_handler_class=WarcProxyHandler, bind_and_activate=True,
ca=None, recorded_url_q=None, digest_algorithm='sha1', ca=None, recorded_url_q=None, digest_algorithm='sha1',
stats_db=None): stats_db=None, options=warcprox.Options()):
http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate) http_server.HTTPServer.__init__(self, server_address, req_handler_class, bind_and_activate)
self.digest_algorithm = digest_algorithm self.digest_algorithm = digest_algorithm
@ -369,6 +369,8 @@ class SingleThreadedWarcProxy(http_server.HTTPServer):
self.stats_db = stats_db self.stats_db = stats_db
self.options = options
def server_activate(self): def server_activate(self):
http_server.HTTPServer.server_activate(self) http_server.HTTPServer.server_activate(self)
self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1])) self.logger.info('WarcProxy listening on {0}:{1}'.format(self.server_address[0], self.server_address[1]))

View File

@ -16,9 +16,10 @@ class WarcWriter:
logger = logging.getLogger("warcprox.writer.WarcWriter") logger = logging.getLogger("warcprox.writer.WarcWriter")
# port is only used for warc filename # port is only used for warc filename
def __init__(self, directory='./warcs', rollover_size=1000000000, def __init__(self, prefix='WARCPROX', directory='./warcs',
gzip=False, prefix='WARCPROX', port=0, digest_algorithm='sha1', rollover_size=1000000000, gzip=False, port=0,
base32=False, rollover_idle_time=None): digest_algorithm='sha1', base32=False, rollover_idle_time=None,
options=warcprox.Options()):
self.rollover_size = rollover_size self.rollover_size = rollover_size
self.rollover_idle_time = rollover_idle_time self.rollover_idle_time = rollover_idle_time
@ -114,17 +115,14 @@ class WarcWriter:
class WarcWriterPool: class WarcWriterPool:
logger = logging.getLogger("warcprox.writer.WarcWriterPool") logger = logging.getLogger("warcprox.writer.WarcWriterPool")
def __init__(self, default_warc_writer=None): def __init__(self, default_warc_writer=None, options=warcprox.Options()):
if default_warc_writer: if default_warc_writer:
self.default_warc_writer = default_warc_writer self.default_warc_writer = default_warc_writer
else: else:
self.default_warc_writer = WarcWriter() self.default_warc_writer = WarcWriter(options=options)
self.warc_writers = {} # {prefix:WarcWriter} self.warc_writers = {} # {prefix:WarcWriter}
self._last_sync = time.time() self._last_sync = time.time()
self.options = options
self.logger.info('directory={} gzip={} rollover_size={} rollover_idle_time={} prefix={} port={}'.format(
os.path.abspath(self.default_warc_writer.directory), self.default_warc_writer.gzip, self.default_warc_writer.rollover_size,
self.default_warc_writer.rollover_idle_time, self.default_warc_writer.prefix, self.default_warc_writer.port))
# chooses writer for filename specified by warcprox_meta["warc-prefix"] if set # chooses writer for filename specified by warcprox_meta["warc-prefix"] if set
def _writer(self, recorded_url): def _writer(self, recorded_url):
@ -133,14 +131,7 @@ class WarcWriterPool:
# self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url)) # self.logger.info("recorded_url.warcprox_meta={} for {}".format(recorded_url.warcprox_meta, recorded_url.url))
prefix = recorded_url.warcprox_meta["warc-prefix"] prefix = recorded_url.warcprox_meta["warc-prefix"]
if not prefix in self.warc_writers: if not prefix in self.warc_writers:
self.warc_writers[prefix] = WarcWriter(prefix=prefix, self.warc_writers[prefix] = WarcWriter(prefix=prefix, options=self.options)
directory=self.default_warc_writer.directory,
rollover_size=self.default_warc_writer.rollover_size,
rollover_idle_time=self.default_warc_writer.rollover_idle_time,
gzip=self.default_warc_writer.gzip,
port=self.default_warc_writer.port,
digest_algorithm=self.default_warc_writer.record_builder.digest_algorithm,
base32=self.default_warc_writer.record_builder.base32)
w = self.warc_writers[prefix] w = self.warc_writers[prefix]
return w return w

View File

@ -22,7 +22,7 @@ import warcprox
class WarcWriterThread(threading.Thread): class WarcWriterThread(threading.Thread):
logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread") logger = logging.getLogger("warcprox.warcproxwriter.WarcWriterThread")
def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, playback_index_db=None, stats_db=None): def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, listeners=None, options=warcprox.Options()):
"""recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl.""" """recorded_url_q is a queue.Queue of warcprox.warcprox.RecordedUrl."""
threading.Thread.__init__(self, name='WarcWriterThread') threading.Thread.__init__(self, name='WarcWriterThread')
self.recorded_url_q = recorded_url_q self.recorded_url_q = recorded_url_q
@ -32,9 +32,8 @@ class WarcWriterThread(threading.Thread):
else: else:
self.writer_pool = WarcWriterPool() self.writer_pool = WarcWriterPool()
self.dedup_db = dedup_db self.dedup_db = dedup_db
self.playback_index_db = playback_index_db self.listeners = listeners
self.stats_db = stats_db self.options = options
self._last_sync = time.time()
def run(self): def run(self):
try: try:
@ -42,40 +41,18 @@ class WarcWriterThread(threading.Thread):
try: try:
recorded_url = self.recorded_url_q.get(block=True, timeout=0.5) recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
if self.dedup_db: if self.dedup_db:
warcprox.dedup.decorate_with_dedup_info(self.dedup_db, recorded_url, warcprox.dedup.decorate_with_dedup_info(self.dedup_db,
base32=self.writer_pool.default_warc_writer.record_builder.base32) recorded_url, base32=self.options.base32)
records = self.writer_pool.write_records(recorded_url) records = self.writer_pool.write_records(recorded_url)
self._final_tasks(recorded_url, records) self._final_tasks(recorded_url, records)
except queue.Empty: except queue.Empty:
self.writer_pool.maybe_idle_rollover() self.writer_pool.maybe_idle_rollover()
self._sync()
self.logger.info('WarcWriterThread shutting down') self.logger.info('WarcWriterThread shutting down')
self.writer_pool.close_writers() self.writer_pool.close_writers()
except: except:
self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True) self.logger.critical("WarcWriterThread shutting down after unexpected error", exc_info=True)
def _sync(self):
# XXX prob doesn't belong here (do we need it at all?)
if time.time() - self._last_sync > 60:
if self.dedup_db:
self.dedup_db.sync()
if self.playback_index_db:
self.playback_index_db.sync()
self._last_sync = time.time()
def _save_dedup_info(self, recorded_url, records):
if (self.dedup_db
and records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.RESPONSE
and recorded_url.response_recorder.payload_size() > 0):
key = warcprox.digest_str(recorded_url.response_recorder.payload_digest,
self.writer_pool.default_warc_writer.record_builder.base32)
self.dedup_db.save(key, records[0])
def _save_playback_info(self, recorded_url, records):
if self.playback_index_db is not None:
self.playback_index_db.save(records[0].warc_filename, records, records[0].offset)
# closest thing we have to heritrix crawl log at the moment # closest thing we have to heritrix crawl log at the moment
def _log(self, recorded_url, records): def _log(self, recorded_url, records):
def _decode(x): def _decode(x):
@ -107,12 +84,8 @@ class WarcWriterThread(threading.Thread):
_decode(records[0].warc_filename), _decode(records[0].warc_filename),
records[0].offset)) records[0].offset))
def _update_stats(self, recorded_url, records):
if self.stats_db:
self.stats_db.tally(recorded_url, records)
def _final_tasks(self, recorded_url, records): def _final_tasks(self, recorded_url, records):
self._save_dedup_info(recorded_url, records) if self.listeners:
self._save_playback_info(recorded_url, records) for listener in self.listeners:
self._update_stats(recorded_url, records) listener.notify(recorded_url, records)
self._log(recorded_url, records) self._log(recorded_url, records)