Merge branch 'master' into adds-logging-for-failed-connections

This commit is contained in:
Adam Miller 2020-09-23 19:18:41 +00:00
commit 36784de174
8 changed files with 58 additions and 290 deletions

View File

@ -1,20 +1,17 @@
os: linux
dist: xenial dist: xenial
language: python language: python
python: python:
- 3.8
- 3.7 - 3.7
- 3.6 - 3.6
- 3.5 - 3.5
- 3.4
- 2.7
- pypy
- pypy3.5 - pypy3.5
- nightly - nightly
matrix: jobs:
allow_failures: allow_failures:
- python: nightly - python: nightly
- python: 2.7
- python: pypy
addons: addons:
apt: apt:
@ -39,7 +36,7 @@ before_install:
- ping -c2 trough - ping -c2 trough
install: install:
- pip install . pytest requests warcio mock - pip install .[trough] pytest requests warcio mock
before_script: before_script:
- docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true - docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true

View File

@ -2,7 +2,7 @@
''' '''
setup.py - setuptools installation configuration for warcprox setup.py - setuptools installation configuration for warcprox
Copyright (C) 2013-2019 Internet Archive Copyright (C) 2013-2020 Internet Archive
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -28,7 +28,7 @@ deps = [
'warctools>=4.10.0', 'warctools>=4.10.0',
'urlcanon>=0.3.0', 'urlcanon>=0.3.0',
'doublethink>=0.2.0.dev87', 'doublethink>=0.2.0.dev87',
'urllib3>=1.14', 'urllib3>=1.23',
'requests>=2.0.1', 'requests>=2.0.1',
'PySocks>=1.6.8', 'PySocks>=1.6.8',
'cryptography>=2.3', 'cryptography>=2.3',
@ -43,7 +43,7 @@ except:
setuptools.setup( setuptools.setup(
name='warcprox', name='warcprox',
version='2.4.21', version='2.4.27',
description='WARC writing MITM HTTP/S proxy', description='WARC writing MITM HTTP/S proxy',
url='https://github.com/internetarchive/warcprox', url='https://github.com/internetarchive/warcprox',
author='Noah Levitt', author='Noah Levitt',
@ -52,6 +52,7 @@ setuptools.setup(
license='GPL', license='GPL',
packages=['warcprox'], packages=['warcprox'],
install_requires=deps, install_requires=deps,
extras_require={'trough': ['trough>=0.1.4',],},
setup_requires=['pytest-runner'], setup_requires=['pytest-runner'],
tests_require=['mock', 'pytest', 'warcio'], tests_require=['mock', 'pytest', 'warcio'],
entry_points={ entry_points={
@ -66,10 +67,10 @@ setuptools.setup(
'Development Status :: 5 - Production/Stable', 'Development Status :: 5 - Production/Stable',
'Environment :: Console', 'Environment :: Console',
'License :: OSI Approved :: GNU General Public License (GPL)', 'License :: OSI Approved :: GNU General Public License (GPL)',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Internet :: Proxy Servers', 'Topic :: Internet :: Proxy Servers',
'Topic :: Internet :: WWW/HTTP', 'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: Software Development :: Libraries :: Python Modules',

View File

@ -2132,24 +2132,6 @@ def test_payload_digest(warcprox_, http_daemon):
req, prox_rec_res = mitm.do_GET() req, prox_rec_res = mitm.do_GET()
assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1 assert warcprox.digest_str(prox_rec_res.payload_digest) == GZIP_GZIP_SHA1
def test_trough_segment_promotion(warcprox_):
if not warcprox_.options.rethinkdb_trough_db_url:
return
cli = warcprox.trough.TroughClient(
warcprox_.options.rethinkdb_trough_db_url, 3)
promoted = []
def mock(segment_id):
promoted.append(segment_id)
cli.promote = mock
cli.register_schema('default', 'create table foo (bar varchar(100))')
cli.write('my_seg', 'insert into foo (bar) values ("boof")')
assert promoted == []
time.sleep(3)
assert promoted == ['my_seg']
promoted = []
time.sleep(3)
assert promoted == []
def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies): def test_dedup_min_text_size(http_daemon, warcprox_, archiving_proxies):
"""We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we """We use options --dedup-min-text-size=3 --dedup-min-binary-size=5 and we
try to download content smaller than these limits to make sure that it is try to download content smaller than these limits to make sure that it is

View File

@ -111,7 +111,7 @@ class Factory:
assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup') assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
return plugin return plugin
except Exception as e: except Exception as e:
logging.fatal('problem with plugin class %r: %s', qualname, e) logging.fatal('problem with plugin class %r', qualname, exc_info=1)
sys.exit(1) sys.exit(1)
@staticmethod @staticmethod

View File

@ -26,7 +26,6 @@ import os
import json import json
from hanzo import warctools from hanzo import warctools
import warcprox import warcprox
import warcprox.trough
import sqlite3 import sqlite3
import doublethink import doublethink
import datetime import datetime
@ -49,8 +48,12 @@ class DedupableMixin(object):
size compared with min text/binary dedup size options. size compared with min text/binary dedup size options.
When we use option --dedup-only-with-bucket, `dedup-buckets` is required When we use option --dedup-only-with-bucket, `dedup-buckets` is required
in Warcprox-Meta to perform dedup. in Warcprox-Meta to perform dedup.
If recorded_url.do_not_archive is True, we skip dedup. This record will
not be written to WARC anyway.
Return Boolean. Return Boolean.
""" """
if recorded_url.do_not_archive:
return False
if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta: if self.dedup_only_with_bucket and "dedup-buckets" not in recorded_url.warcprox_meta:
return False return False
if recorded_url.is_text(): if recorded_url.is_text():
@ -502,16 +505,24 @@ class TroughDedupDb(DedupDb, DedupableMixin):
SCHEMA_SQL = ('create table dedup (\n' SCHEMA_SQL = ('create table dedup (\n'
' digest_key varchar(100) primary key,\n' ' digest_key varchar(100) primary key,\n'
' url varchar(2100) not null,\n' ' url varchar(2100) not null,\n'
' date datetime not null,\n' ' date varchar(100) not null,\n'
' id varchar(100));\n') # warc record id ' id varchar(100));\n') # warc record id
WRITE_SQL_TMPL = ('insert or ignore into dedup\n' WRITE_SQL_TMPL = ('insert or ignore into dedup\n'
'(digest_key, url, date, id)\n' '(digest_key, url, date, id)\n'
'values (%s, %s, %s, %s);') 'values (%s, %s, %s, %s);')
def __init__(self, options=warcprox.Options()): def __init__(self, options=warcprox.Options()):
try:
import trough.client
except ImportError as e:
logging.critical(
'%s: %s\n\nYou might need to run "pip install '
'warcprox[trough]".', type(e).__name__, e)
sys.exit(1)
DedupableMixin.__init__(self, options) DedupableMixin.__init__(self, options)
self.options = options self.options = options
self._trough_cli = warcprox.trough.TroughClient( self._trough_cli = trough.client.TroughClient(
options.rethinkdb_trough_db_url, promotion_interval=60*60) options.rethinkdb_trough_db_url, promotion_interval=60*60)
def loader(self, *args, **kwargs): def loader(self, *args, **kwargs):
@ -533,9 +544,13 @@ class TroughDedupDb(DedupDb, DedupableMixin):
record_id = response_record.get_header(warctools.WarcRecord.ID) record_id = response_record.get_header(warctools.WarcRecord.ID)
url = response_record.get_header(warctools.WarcRecord.URL) url = response_record.get_header(warctools.WarcRecord.URL)
warc_date = response_record.get_header(warctools.WarcRecord.DATE) warc_date = response_record.get_header(warctools.WarcRecord.DATE)
self._trough_cli.write( try:
bucket, self.WRITE_SQL_TMPL, self._trough_cli.write(
(digest_key, url, warc_date, record_id), self.SCHEMA_ID) bucket, self.WRITE_SQL_TMPL,
(digest_key, url, warc_date, record_id), self.SCHEMA_ID)
except:
self.logger.warning(
'problem posting dedup data to trough', exc_info=True)
def batch_save(self, batch, bucket='__unspecified__'): def batch_save(self, batch, bucket='__unspecified__'):
sql_tmpl = ('insert or ignore into dedup\n' sql_tmpl = ('insert or ignore into dedup\n'
@ -550,12 +565,22 @@ class TroughDedupDb(DedupDb, DedupableMixin):
recorded_url.url, recorded_url.url,
recorded_url.warc_records[0].date, recorded_url.warc_records[0].date,
recorded_url.warc_records[0].id,]) recorded_url.warc_records[0].id,])
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID) try:
self._trough_cli.write(bucket, sql_tmpl, values, self.SCHEMA_ID)
except:
self.logger.warning(
'problem posting dedup data to trough', exc_info=True)
def lookup(self, digest_key, bucket='__unspecified__', url=None): def lookup(self, digest_key, bucket='__unspecified__', url=None):
results = self._trough_cli.read( try:
bucket, 'select * from dedup where digest_key=%s;', results = self._trough_cli.read(
(digest_key,)) bucket, 'select * from dedup where digest_key=%s;',
(digest_key,))
except:
self.logger.warning(
'problem reading dedup data from trough', exc_info=True)
return None
if results: if results:
assert len(results) == 1 # sanity check (digest_key is primary key) assert len(results) == 1 # sanity check (digest_key is primary key)
result = results[0] result = results[0]
@ -572,7 +597,14 @@ class TroughDedupDb(DedupDb, DedupableMixin):
'''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]''' '''Returns [{'digest_key': ..., 'url': ..., 'date': ...}, ...]'''
sql_tmpl = 'select * from dedup where digest_key in (%s)' % ( sql_tmpl = 'select * from dedup where digest_key in (%s)' % (
','.join('%s' for i in range(len(digest_keys)))) ','.join('%s' for i in range(len(digest_keys))))
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
try:
results = self._trough_cli.read(bucket, sql_tmpl, digest_keys)
except:
self.logger.warning(
'problem reading dedup data from trough', exc_info=True)
results = None
if results is None: if results is None:
return [] return []
self.logger.debug( self.logger.debug(

View File

@ -93,7 +93,7 @@ def _build_arg_parser(prog='warcprox', show_hidden=False):
default='./warcs', help='where to write warcs') default='./warcs', help='where to write warcs')
arg_parser.add_argument('--warc-filename', dest='warc_filename', arg_parser.add_argument('--warc-filename', dest='warc_filename',
default='{prefix}-{timestamp17}-{serialno}-{randomtoken}', default='{prefix}-{timestamp17}-{serialno}-{randomtoken}',
help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}') help='define custom WARC filename with variables {prefix}, {timestamp14}, {timestamp17}, {serialno}, {randomtoken}, {hostname}, {shorthostname}, {port}')
arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true', arg_parser.add_argument('-z', '--gzip', dest='gzip', action='store_true',
help='write gzip-compressed warc records') help='write gzip-compressed warc records')
hidden.add_argument( hidden.add_argument(
@ -302,6 +302,7 @@ def main(argv=None):
else: else:
loglevel = logging.INFO loglevel = logging.INFO
logging.root.handlers = []
logging.basicConfig( logging.basicConfig(
stream=sys.stdout, level=loglevel, format=( stream=sys.stdout, level=loglevel, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s ' '%(asctime)s %(process)d %(levelname)s %(threadName)s '

View File

@ -1,246 +0,0 @@
'''
warcprox/trough.py - trough client code
Copyright (C) 2017 Internet Archive
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
USA.
'''
from __future__ import absolute_import
import logging
import os
import json
import requests
import doublethink
import rethinkdb as r
import datetime
import threading
import time
class TroughClient(object):
logger = logging.getLogger("warcprox.trough.TroughClient")
def __init__(self, rethinkdb_trough_db_url, promotion_interval=None):
'''
TroughClient constructor
Args:
rethinkdb_trough_db_url: url with schema rethinkdb:// pointing to
trough configuration database
promotion_interval: if specified, `TroughClient` will spawn a
thread that "promotes" (pushed to hdfs) "dirty" trough segments
(segments that have received writes) periodically, sleeping for
`promotion_interval` seconds between cycles (default None)
'''
parsed = doublethink.parse_rethinkdb_url(rethinkdb_trough_db_url)
self.rr = doublethink.Rethinker(
servers=parsed.hosts, db=parsed.database)
self.svcreg = doublethink.ServiceRegistry(self.rr)
self._write_url_cache = {}
self._read_url_cache = {}
self._dirty_segments = set()
self._dirty_segments_lock = threading.RLock()
self.promotion_interval = promotion_interval
self._promoter_thread = None
if promotion_interval:
self._promoter_thread = threading.Thread(
target=self._promotrix, name='TroughClient-promoter')
self._promoter_thread.setDaemon(True)
self._promoter_thread.start()
def _promotrix(self):
while True:
time.sleep(self.promotion_interval)
try:
with self._dirty_segments_lock:
dirty_segments = list(self._dirty_segments)
self._dirty_segments.clear()
logging.info(
'promoting %s trough segments', len(dirty_segments))
for segment_id in dirty_segments:
try:
self.promote(segment_id)
except:
logging.error(
'problem promoting segment %s', segment_id,
exc_info=True)
except:
logging.error(
'caught exception doing segment promotion',
exc_info=True)
def promote(self, segment_id):
url = os.path.join(self.segment_manager_url(), 'promote')
payload_dict = {'segment': segment_id}
response = requests.post(url, json=payload_dict, timeout=21600)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, url,
json.dumps(payload_dict)))
@staticmethod
def sql_value(x):
if x is None:
return 'null'
elif isinstance(x, datetime.datetime):
return 'datetime(%r)' % x.isoformat()
elif isinstance(x, bool):
return int(x)
elif isinstance(x, str) or isinstance(x, bytes):
# the only character that needs escaped in sqlite string literals
# is single-quote, which is escaped as two single-quotes
if isinstance(x, bytes):
s = x.decode('utf-8')
else:
s = x
return "'" + s.replace("'", "''") + "'"
elif isinstance(x, (int, float)):
return x
else:
raise Exception(
"don't know how to make an sql value from %r (%r)" % (
x, type(x)))
def segment_manager_url(self):
master_node = self.svcreg.unique_service('trough-sync-master')
assert master_node
return master_node['url']
def write_url_nocache(self, segment_id, schema_id='default'):
provision_url = os.path.join(self.segment_manager_url(), 'provision')
payload_dict = {'segment': segment_id, 'schema': schema_id}
response = requests.post(provision_url, json=payload_dict, timeout=600)
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %s' % (
response.status_code, response.text, provision_url,
json.dumps(payload_dict)))
result_dict = response.json()
# assert result_dict['schema'] == schema_id # previously provisioned?
return result_dict['write_url']
def read_url_nocache(self, segment_id):
reql = self.rr.table('services').get_all(
segment_id, index='segment').filter(
{'role':'trough-read'}).filter(
lambda svc: r.now().sub(
svc['last_heartbeat']).lt(svc['ttl'])
).order_by('load')
self.logger.debug('querying rethinkdb: %r', reql)
results = reql.run()
if results:
return results[0]['url']
else:
return None
def write_url(self, segment_id, schema_id='default'):
if not segment_id in self._write_url_cache:
self._write_url_cache[segment_id] = self.write_url_nocache(
segment_id, schema_id)
self.logger.info(
'segment %r write url is %r', segment_id,
self._write_url_cache[segment_id])
return self._write_url_cache[segment_id]
def read_url(self, segment_id):
if not self._read_url_cache.get(segment_id):
self._read_url_cache[segment_id] = self.read_url_nocache(segment_id)
self.logger.info(
'segment %r read url is %r', segment_id,
self._read_url_cache[segment_id])
return self._read_url_cache[segment_id]
def write(self, segment_id, sql_tmpl, values=(), schema_id='default'):
write_url = self.write_url(segment_id, schema_id)
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
sql_bytes = sql.encode('utf-8')
try:
response = requests.post(
write_url, sql_bytes, timeout=600,
headers={'content-type': 'application/sql;charset=utf-8'})
if response.status_code != 200:
raise Exception(
'Received %s: %r in response to POST %s with data %r' % (
response.status_code, response.text, write_url, sql))
if segment_id not in self._dirty_segments:
with self._dirty_segments_lock:
self._dirty_segments.add(segment_id)
except:
self._write_url_cache.pop(segment_id, None)
self.logger.error(
'problem with trough write url %r', write_url,
exc_info=True)
return
if response.status_code != 200:
self._write_url_cache.pop(segment_id, None)
self.logger.warning(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
write_url, sql)
return
self.logger.debug('posted to %s: %r', write_url, sql)
def read(self, segment_id, sql_tmpl, values=()):
read_url = self.read_url(segment_id)
if not read_url:
return None
sql = sql_tmpl % tuple(self.sql_value(v) for v in values)
sql_bytes = sql.encode('utf-8')
try:
response = requests.post(
read_url, sql_bytes, timeout=600,
headers={'content-type': 'application/sql;charset=utf-8'})
except:
self._read_url_cache.pop(segment_id, None)
self.logger.error(
'problem with trough read url %r', read_url, exc_info=True)
return None
if response.status_code != 200:
self._read_url_cache.pop(segment_id, None)
self.logger.warn(
'unexpected response %r %r %r from %r to sql=%r',
response.status_code, response.reason, response.text,
read_url, sql)
return None
self.logger.trace(
'got %r from posting query %r to %r', response.text, sql,
read_url)
results = json.loads(response.text)
return results
def schema_exists(self, schema_id):
url = os.path.join(self.segment_manager_url(), 'schema', schema_id)
response = requests.get(url, timeout=60)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
response.raise_for_status()
def register_schema(self, schema_id, sql):
url = os.path.join(
self.segment_manager_url(), 'schema', schema_id, 'sql')
response = requests.put(url, sql, timeout=600)
if response.status_code not in (201, 204):
raise Exception(
'Received %s: %r in response to PUT %r with data %r' % (
response.status_code, response.text, sql, url))

View File

@ -51,6 +51,7 @@ class WarcWriter:
self.finalname = None self.finalname = None
self.gzip = options.gzip or False self.gzip = options.gzip or False
self.prefix = options.prefix or 'warcprox' self.prefix = options.prefix or 'warcprox'
self.port = options.port or 8000
self.open_suffix = '' if options.no_warc_open_suffix else '.open' self.open_suffix = '' if options.no_warc_open_suffix else '.open'
self.rollover_size = options.rollover_size or 1000000000 self.rollover_size = options.rollover_size or 1000000000
self.rollover_idle_time = options.rollover_idle_time or None self.rollover_idle_time = options.rollover_idle_time or None
@ -67,7 +68,7 @@ class WarcWriter:
"""WARC filename is configurable with CLI parameter --warc-filename. """WARC filename is configurable with CLI parameter --warc-filename.
Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}' Default: '{prefix}-{timestamp17}-{randomtoken}-{serialno}'
Available variables are: prefix, timestamp14, timestamp17, serialno, Available variables are: prefix, timestamp14, timestamp17, serialno,
randomtoken, hostname, shorthostname. randomtoken, hostname, shorthostname, port.
Extension ``.warc`` or ``.warc.gz`` is appended automatically. Extension ``.warc`` or ``.warc.gz`` is appended automatically.
""" """
hostname = socket.getfqdn() hostname = socket.getfqdn()
@ -77,7 +78,7 @@ class WarcWriter:
timestamp17=warcprox.timestamp17(), timestamp17=warcprox.timestamp17(),
serialno='{:05d}'.format(serial), serialno='{:05d}'.format(serial),
randomtoken=self.randomtoken, hostname=hostname, randomtoken=self.randomtoken, hostname=hostname,
shorthostname=shorthostname) shorthostname=shorthostname, port=self.port)
if self.gzip: if self.gzip:
fname = fname + '.warc.gz' fname = fname + '.warc.gz'
else: else: