warcprox/warcprox/stats.py
Noah Levitt c40ad8391d Merge branch 'master' into trough-dedup
* master:
  hopefully fix test failing occasionally apparently due to race condition by checking that the file we're waiting for has some content
  fix payload digest by pulling calculation up one level where content has already been transfer-decoded
  new failing test for correct calculation of payload digest
  missed a spot handling case of no warc records written
2017-11-13 11:47:04 -08:00

341 lines
13 KiB
Python

'''
warcprox/stats.py - keeps statistics on what has been archived
Copyright (C) 2013-2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import os
import json
from hanzo import warctools
import warcprox
import threading
import rethinkdb as r
import datetime
import urlcanon
import sqlite3
import copy
import doublethink
def _empty_bucket(bucket):
return {
"bucket": bucket,
"total": {
"urls": 0,
"wire_bytes": 0,
},
"new": {
"urls": 0,
"wire_bytes": 0,
},
"revisit": {
"urls": 0,
"wire_bytes": 0,
},
}
class StatsDb:
logger = logging.getLogger("warcprox.stats.StatsDb")
def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()):
self.file = file
self.options = options
self._lock = threading.RLock()
def start(self):
with self._lock:
if os.path.exists(self.file):
self.logger.info(
'opening existing stats database %s', self.file)
else:
self.logger.info(
'creating new stats database %s', self.file)
conn = sqlite3.connect(self.file)
conn.execute(
'create table if not exists buckets_of_stats ('
' bucket varchar(300) primary key,'
' stats varchar(4000)'
');')
conn.commit()
conn.close()
self.logger.info('created table buckets_of_stats in %s', self.file)
def stop(self):
pass
def close(self):
pass
def sync(self):
pass
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
conn = sqlite3.connect(self.file)
cursor = conn.execute(
'select stats from buckets_of_stats where bucket = ?',
(bucket0,))
result_tuple = cursor.fetchone()
conn.close()
if result_tuple:
bucket0_stats = json.loads(result_tuple[0])
if bucket1:
if bucket2:
return bucket0_stats[bucket1][bucket2]
else:
return bucket0_stats[bucket1]
else:
return bucket0_stats
else:
return None
def notify(self, recorded_url, records):
self.tally(recorded_url, records)
def buckets(self, recorded_url):
'''
Unravels bucket definitions in Warcprox-Meta header. Each bucket
definition can either be a string, which signifies the name of the
bucket, or a dict. If a dict it is expected to have at least an item
with key 'bucket' whose value is the name of the bucket. The other
currently recognized item is 'tally-domains', which if supplied should
be a list of domains. This instructs warcprox to additionally tally
substats of the given bucket by domain. Host stats are stored in the
stats table under the key '{parent-bucket}:{domain(normalized)}'.
Example Warcprox-Meta header (a real one will likely have other
sections besides 'stats'):
Warcprox-Meta: {'stats':{'buckets':['bucket1',{'bucket':'bucket2','tally-domains':['foo.bar.com','192.168.10.20'}]}}
'''
buckets = ["__all__"]
if (recorded_url.warcprox_meta
and "stats" in recorded_url.warcprox_meta
and "buckets" in recorded_url.warcprox_meta["stats"]):
for bucket in recorded_url.warcprox_meta["stats"]["buckets"]:
if isinstance(bucket, dict):
if not 'bucket' in bucket:
self.logger.warn(
'ignoring invalid stats bucket in '
'warcprox-meta header %s', bucket)
continue
buckets.append(bucket['bucket'])
if bucket.get('tally-domains'):
url = urlcanon.semantic(recorded_url.url)
for domain in bucket['tally-domains']:
domain = urlcanon.normalize_host(domain).decode('ascii')
if urlcanon.url_matches_domain(url, domain):
buckets.append(
'%s:%s' % (bucket['bucket'], domain))
else:
buckets.append(bucket)
else:
buckets.append("__unspecified__")
return buckets
def tally(self, recorded_url, records):
with self._lock:
conn = sqlite3.connect(self.file)
for bucket in self.buckets(recorded_url):
cursor = conn.execute(
'select stats from buckets_of_stats where bucket=?',
(bucket,))
result_tuple = cursor.fetchone()
cursor.close()
if result_tuple:
bucket_stats = json.loads(result_tuple[0])
else:
bucket_stats = _empty_bucket(bucket)
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute(
'insert or replace into buckets_of_stats '
'(bucket, stats) values (?, ?)', (bucket, json_value))
conn.commit()
conn.close()
class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds"""
logger = logging.getLogger("warcprox.stats.RethinkStatsDb")
def __init__(self, options=warcprox.Options()):
parsed = doublethink.parse_rethinkdb_url(options.rethinkdb_stats_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.table = parsed.table
self.replicas = min(3, len(self.rr.servers))
self._ensure_db_table()
self.options = options
self._stop = threading.Event()
self._batch_lock = threading.RLock()
with self._batch_lock:
self._batch = {}
self._timer = None
def start(self):
"""Starts batch update repeating timer."""
self._update_batch() # starts repeating timer
def _bucket_batch_update_reql(self, bucket, batch):
return self.rr.table(self.table).get(bucket).replace(
lambda old: r.branch(
old.eq(None), batch[bucket], old.merge({
"total": {
"urls": old["total"]["urls"].add(
batch[bucket]["total"]["urls"]),
"wire_bytes": old["total"]["wire_bytes"].add(
batch[bucket]["total"]["wire_bytes"]),
},
"new": {
"urls": old["new"]["urls"].add(
batch[bucket]["new"]["urls"]),
"wire_bytes": old["new"]["wire_bytes"].add(
batch[bucket]["new"]["wire_bytes"]),
},
"revisit": {
"urls": old["revisit"]["urls"].add(
batch[bucket]["revisit"]["urls"]),
"wire_bytes": old["revisit"]["wire_bytes"].add(
batch[bucket]["revisit"]["wire_bytes"]),
},
})))
def _update_batch(self):
with self._batch_lock:
batch_copy = copy.deepcopy(self._batch)
self._batch = {}
try:
if len(batch_copy) > 0:
# XXX can all the buckets be done in one query?
for bucket in batch_copy:
result = self._bucket_batch_update_reql(
bucket, batch_copy).run()
if (not result["inserted"] and not result["replaced"]
or sorted(result.values()) != [0,0,0,0,0,1]):
raise Exception(
"unexpected result %s updating stats %s" % (
result, batch_copy[bucket]))
except Exception as e:
self.logger.error("problem updating stats", exc_info=True)
# now we need to restore the stats that didn't get saved to the
# batch so that they are saved in the next call to _update_batch()
with self._batch_lock:
self._add_to_batch(batch_copy)
finally:
if not self._stop.is_set():
self._timer = threading.Timer(2.0, self._update_batch)
self._timer.name = "RethinkStats-batch-update-timer-%s" % (
datetime.datetime.utcnow().isoformat())
self._timer.start()
else:
self.logger.info("finished")
def _ensure_db_table(self):
dbs = self.rr.db_list().run()
if not self.rr.dbname in dbs:
self.logger.info(
"creating rethinkdb database %r", self.rr.dbname)
self.rr.db_create(self.rr.dbname).run()
tables = self.rr.table_list().run()
if not self.table in tables:
self.logger.info(
"creating rethinkdb table %r in database %r shards=%r "
"replicas=%r", self.table, self.rr.dbname, 1,
self.replicas)
self.rr.table_create(
self.table, primary_key="bucket", shards=1,
replicas=self.replicas).run()
def close(self):
self.stop()
def stop(self):
self.logger.info("stopping rethinkdb stats table batch updates")
self._stop.set()
if self._timer:
self._timer.join()
def sync(self):
pass
def value(self, bucket0="__all__", bucket1=None, bucket2=None):
bucket0_stats = self.rr.table(self.table).get(bucket0).run()
self.logger.debug(
'stats db lookup of bucket=%s returned %s',
bucket0, bucket0_stats)
if bucket0_stats:
if bucket1:
if bucket2:
return bucket0_stats[bucket1][bucket2]
else:
return bucket0_stats[bucket1]
return bucket0_stats
def tally(self, recorded_url, records):
buckets = self.buckets(recorded_url)
with self._batch_lock:
for bucket in buckets:
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records:
if records[0].type == b'revisit':
bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else:
bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size
def _add_to_batch(self, add_me):
with self._batch_lock:
for bucket in add_me:
bucket_stats = self._batch.setdefault(
bucket, _empty_bucket(bucket))
bucket_stats["total"]["urls"] += add_me[bucket]["total"]["urls"]
bucket_stats["total"]["wire_bytes"] += add_me[bucket]["total"]["wire_bytes"]
bucket_stats["revisit"]["urls"] += add_me[bucket]["revisit"]["urls"]
bucket_stats["revisit"]["wire_bytes"] += add_me[bucket]["revisit"]["wire_bytes"]
bucket_stats["new"]["urls"] += add_me[bucket]["new"]["urls"]
bucket_stats["new"]["wire_bytes"] += add_me[bucket]["new"]["wire_bytes"]
def notify(self, recorded_url, records):
self.tally(recorded_url, records)