use locking to ensure consistency and avoid this kind of test failure https://travis-ci.org/internetarchive/warcprox/jobs/235819316

This commit is contained in:
Noah Levitt 2017-05-25 17:38:20 +00:00
parent 00b982aa24
commit 2f93cdcad9
4 changed files with 64 additions and 65 deletions

View File

@ -50,7 +50,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.1b1.dev86', version='2.1b1.dev87',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',

View File

@ -28,7 +28,6 @@ from hanzo import warctools
import warcprox import warcprox
import random import random
import sqlite3 import sqlite3
import threading
class DedupDb(object): class DedupDb(object):
logger = logging.getLogger("warcprox.dedup.DedupDb") logger = logging.getLogger("warcprox.dedup.DedupDb")
@ -77,7 +76,7 @@ class DedupDb(object):
conn = sqlite3.connect(self.file) conn = sqlite3.connect(self.file)
conn.execute( conn.execute(
'insert into dedup (key, value) values (?, ?);', 'insert or replace into dedup (key, value) values (?, ?)',
(key, json_value)) (key, json_value))
conn.commit() conn.commit()
conn.close() conn.close()

View File

@ -41,6 +41,7 @@ import re
from warcprox.mitmproxy import MitmProxyHandler from warcprox.mitmproxy import MitmProxyHandler
import warcprox import warcprox
import sqlite3 import sqlite3
import threading
class PlaybackProxyHandler(MitmProxyHandler): class PlaybackProxyHandler(MitmProxyHandler):
logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler") logger = logging.getLogger("warcprox.playback.PlaybackProxyHandler")
@ -232,6 +233,7 @@ class PlaybackIndexDb(object):
def __init__(self, file='./warcprox.sqlite'): def __init__(self, file='./warcprox.sqlite'):
self.file = file self.file = file
self._lock = threading.RLock()
if os.path.exists(self.file): if os.path.exists(self.file):
self.logger.info( self.logger.info(
@ -270,27 +272,30 @@ class PlaybackIndexDb(object):
# url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...} # url:{date1:[record1={'f':warcfile,'o':response_offset,'q':request_offset,'i':record_id},record2,...],date2:[{...}],...}
conn = sqlite3.connect(self.file) with self._lock:
cursor = conn.execute( conn = sqlite3.connect(self.file)
'select value from playback where url = ?', (url,)) cursor = conn.execute(
result_tuple = cursor.fetchone() 'select value from playback where url = ?', (url,))
if result_tuple: result_tuple = cursor.fetchone()
py_value = json.loads(result_tuple[0]) if result_tuple:
else: py_value = json.loads(result_tuple[0])
py_value = {} else:
py_value = {}
if date_str in py_value: if date_str in py_value:
py_value[date_str].append({'f':warcfile, 'o':offset, 'i':record_id_str}) py_value[date_str].append(
else: {'f':warcfile, 'o':offset, 'i':record_id_str})
py_value[date_str] = [{'f':warcfile, 'o':offset, 'i':record_id_str}] else:
py_value[date_str] = [
{'f':warcfile, 'o':offset, 'i':record_id_str}]
json_value = json.dumps(py_value, separators=(',',':')) json_value = json.dumps(py_value, separators=(',',':'))
conn.execute( conn.execute(
'insert or replace into playback (url, value) values (?, ?)', 'insert or replace into playback (url, value) '
(url, json_value)) 'values (?, ?)', (url, json_value))
conn.commit() conn.commit()
conn.close() conn.close()
self.logger.debug('playback index saved: {}:{}'.format(url, json_value)) self.logger.debug('playback index saved: {}:{}'.format(url, json_value))

View File

@ -56,23 +56,25 @@ class StatsDb:
def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()): def __init__(self, file='./warcprox.sqlite', options=warcprox.Options()):
self.file = file self.file = file
self.options = options self.options = options
self._lock = threading.RLock()
def start(self): def start(self):
if os.path.exists(self.file): with self._lock:
self.logger.info( if os.path.exists(self.file):
'opening existing stats database %s', self.file) self.logger.info(
else: 'opening existing stats database %s', self.file)
self.logger.info( else:
'creating new stats database %s', self.file) self.logger.info(
'creating new stats database %s', self.file)
conn = sqlite3.connect(self.file) conn = sqlite3.connect(self.file)
conn.execute( conn.execute(
'create table if not exists buckets_of_stats (' 'create table if not exists buckets_of_stats ('
' bucket varchar(300) primary key,' ' bucket varchar(300) primary key,'
' stats varchar(4000)' ' stats varchar(4000)'
');') ');')
conn.commit() conn.commit()
conn.close() conn.close()
self.logger.info('created table buckets_of_stats in %s', self.file) self.logger.info('created table buckets_of_stats in %s', self.file)
@ -150,45 +152,38 @@ class StatsDb:
return buckets return buckets
def tally(self, recorded_url, records): def tally(self, recorded_url, records):
conn = sqlite3.connect(self.file) with self._lock:
conn = sqlite3.connect(self.file)
i = 0 for bucket in self.buckets(recorded_url):
for bucket in self.buckets(recorded_url):
try:
cursor = conn.execute( cursor = conn.execute(
'select stats from buckets_of_stats where bucket=?', 'select stats from buckets_of_stats where bucket=?',
(bucket,)) (bucket,))
except:
logging.info(
'i=%s bucket=%s self.file=%s', i, repr(bucket),
repr(self.file), exc_info=1)
raise
i += 1
result_tuple = cursor.fetchone() result_tuple = cursor.fetchone()
cursor.close() cursor.close()
if result_tuple: if result_tuple:
bucket_stats = json.loads(result_tuple[0]) bucket_stats = json.loads(result_tuple[0])
else: else:
bucket_stats = _empty_bucket(bucket) bucket_stats = _empty_bucket(bucket)
bucket_stats["total"]["urls"] += 1 bucket_stats["total"]["urls"] += 1
bucket_stats["total"]["wire_bytes"] += recorded_url.size bucket_stats["total"]["wire_bytes"] += recorded_url.size
if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT: if records[0].get_header(warctools.WarcRecord.TYPE) == warctools.WarcRecord.REVISIT:
bucket_stats["revisit"]["urls"] += 1 bucket_stats["revisit"]["urls"] += 1
bucket_stats["revisit"]["wire_bytes"] += recorded_url.size bucket_stats["revisit"]["wire_bytes"] += recorded_url.size
else: else:
bucket_stats["new"]["urls"] += 1 bucket_stats["new"]["urls"] += 1
bucket_stats["new"]["wire_bytes"] += recorded_url.size bucket_stats["new"]["wire_bytes"] += recorded_url.size
json_value = json.dumps(bucket_stats, separators=(',',':')) json_value = json.dumps(bucket_stats, separators=(',',':'))
conn.execute( conn.execute(
'insert or replace into buckets_of_stats(bucket, stats) ' 'insert or replace into buckets_of_stats '
'values (?, ?)', (bucket, json_value)) '(bucket, stats) values (?, ?)', (bucket, json_value))
conn.commit() conn.commit()
conn.close() conn.close()
class RethinkStatsDb(StatsDb): class RethinkStatsDb(StatsDb):
"""Updates database in batch every 2.0 seconds""" """Updates database in batch every 2.0 seconds"""