From 2f93cdcad9f4d8e9c84d9b25e37a1582dfe6c7c4 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 25 May 2017 17:38:20 +0000 Subject: [PATCH] use locking to ensure consistency and avoid this kind of test failure https://travis-ci.org/internetarchive/warcprox/jobs/235819316 --- setup.py | 2 +- warcprox/dedup.py | 3 +- warcprox/playback.py | 41 ++++++++++++---------- warcprox/stats.py | 83 +++++++++++++++++++++----------------------- 4 files changed, 64 insertions(+), 65 deletions(-) diff --git a/setup.py b/setup.py index c86d831..02b57dc 100755 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ except: setuptools.setup( name='warcprox', - version='2.1b1.dev86', + version='2.1b1.dev87', description='WARC writing MITM HTTP/S proxy', url='https://github.com/internetarchive/warcprox', author='Noah Levitt', diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 4508e71..7f894a6 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -28,7 +28,6 @@ from hanzo import warctools import warcprox import random import sqlite3 -import threading class DedupDb(object): logger = logging.getLogger("warcprox.dedup.DedupDb") @@ -77,7 +76,7 @@ class DedupDb(object): conn = sqlite3.connect(self.file) conn.execute( - 'insert into dedup (key, value) values (?, ?);', + 'insert or replace into dedup (key, value) values (?, ?)', (key, json_value)) conn.commit() conn.close() diff --git a/warcprox/playback.py b/warcprox/playback.py index a913da7..6f5c183 100644 --- a/warcprox/playback.py +++ b/warcprox/playback.py @@ -41,6 +41,7 @@ import re from warcprox.mitmproxy import MitmProxyHandler import warcprox import sqlite3 +import threading class PlaybackProxyHandler(MitmProxyHandler): logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") @@ -232,6 +233,7 @@ class PlaybackIndexDb(object): def __init__(self, file='./warcprox.sqlite'): self.file = file + self._lock = threading.RLock() if os.path.exists(self.file): self.logger.info( @@ -270,27 +272,30 @@ class PlaybackIndexDb(object): # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} - conn = sqlite3.connect(self.file) - cursor = conn.execute( - 'select value from playback where url = ?', (url,)) - result_tuple = cursor.fetchone() - if result_tuple: - py_value = json.loads(result_tuple[0]) - else: - py_value = {} + with self._lock: + conn = sqlite3.connect(self.file) + cursor = conn.execute( + 'select value from playback where url = ?', (url,)) + result_tuple = cursor.fetchone() + if result_tuple: + py_value = json.loads(result_tuple[0]) + else: + py_value = {} - if date_str in py_value: - py_value[date_str].append({'f':warcfile, 'o':offset, 'i':record_id_str}) - else: - py_value[date_str] = [{'f':warcfile, 'o':offset, 'i':record_id_str}] + if date_str in py_value: + py_value[date_str].append( + {'f':warcfile, 'o':offset, 'i':record_id_str}) + else: + py_value[date_str] = [ + {'f':warcfile, 'o':offset, 'i':record_id_str}] - json_value = json.dumps(py_value, separators=(',',':')) + json_value = json.dumps(py_value, separators=(',',':')) - conn.execute( - 'insert or replace into playback (url, value) values (?, ?)', - (url, json_value)) - conn.commit() - conn.close() + conn.execute( + 'insert or replace into playback (url, value) ' + 'values (?, ?)', (url, json_value)) + conn.commit() + conn.close() self.logger.debug('playback index saved: {}:{}'.format(url, json_value)) diff --git a/warcprox/stats.py b/warcprox/stats.py index 905b077..88fc566 100644 --- a/warcprox/stats.py +++ b/warcprox/stats.py @@ -56,23 +56,25 @@ class StatsDb: def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()): self.file = file self.options = options + self._lock = threading.RLock() def start(self): - if os.path.exists(self.file): - self.logger.info( - 'opening existing stats database %s', self.file) - else: - self.logger.info( - 'creating new stats database %s', self.file) + with self._lock: + if os.path.exists(self.file): + self.logger.info( + 'opening existing stats database %s', self.file) + else: + self.logger.info( + 'creating new stats database %s', self.file) - conn = sqlite3.connect(self.file) - conn.execute( - 'create table if not exists buckets_of_stats (' - ' bucket varchar(300) primary key,' - ' stats varchar(4000)' - ');') - conn.commit() - conn.close() + conn = sqlite3.connect(self.file) + conn.execute( + 'create table if not exists buckets_of_stats (' + ' bucket varchar(300) primary key,' + ' stats varchar(4000)' + ');') + conn.commit() + conn.close() self.logger.info('created table buckets_of_stats in %s', self.file) @@ -150,45 +152,38 @@ class StatsDb: return buckets def tally(self, recorded_url, records): - conn = sqlite3.connect(self.file) + with self._lock: + conn = sqlite3.connect(self.file) - i = 0 - for bucket in self.buckets(recorded_url): - try: + for bucket in self.buckets(recorded_url): cursor = conn.execute( 'select stats from buckets_of_stats where bucket=?', (bucket,)) - except: - logging.info( - 'i=%s bucket=%s self.file=%s', i, repr(bucket), - repr(self.file), exc_info=1) - raise - i += 1 - result_tuple = cursor.fetchone() - cursor.close() - if result_tuple: - bucket_stats = json.loads(result_tuple[0]) - else: - bucket_stats = _empty_bucket(bucket) + result_tuple = cursor.fetchone() + cursor.close() + if result_tuple: + bucket_stats = json.loads(result_tuple[0]) + else: + bucket_stats = _empty_bucket(bucket) - bucket_stats["total"]["urls"] += 1 - bucket_stats["total"]["wire_bytes"] += recorded_url.size + bucket_stats["total"]["urls"] += 1 + bucket_stats["total"]["wire_bytes"] += recorded_url.size - if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: - bucket_stats["revisit"]["urls"] += 1 - bucket_stats["revisit"]["wire_bytes"] += recorded_url.size - else: - bucket_stats["new"]["urls"] += 1 - bucket_stats["new"]["wire_bytes"] += recorded_url.size + if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: + bucket_stats["revisit"]["urls"] += 1 + bucket_stats["revisit"]["wire_bytes"] += recorded_url.size + else: + bucket_stats["new"]["urls"] += 1 + bucket_stats["new"]["wire_bytes"] += recorded_url.size - json_value = json.dumps(bucket_stats, separators=(',',':')) - conn.execute( - 'insert or replace into buckets_of_stats(bucket, stats) ' - 'values (?, ?)', (bucket, json_value)) - conn.commit() + json_value = json.dumps(bucket_stats, separators=(',',':')) + conn.execute( + 'insert or replace into buckets_of_stats ' + '(bucket, stats) values (?, ?)', (bucket, json_value)) + conn.commit() - conn.close() + conn.close() class RethinkStatsDb(StatsDb): """Updates database in batch every 2.0 seconds"""