Merge branch 'fix-seconds-behind' into qa

* fix-seconds-behind:
  datetimes with timezone in status because...
  be clear about timezone in timestamps
  take all the queues and active requests into...
This commit is contained in:
Noah Levitt 2018-10-31 12:36:26 -07:00
commit 64e92d8953
8 changed files with 116 additions and 544 deletions

96
api.rst
View File

@ -15,72 +15,60 @@ a json blob with a bunch of status info. For example:
$ curl -sS http://localhost:8000/status
{
"rates_5min": {
"warc_bytes_per_sec": 0.0,
"urls_per_sec": 0.0,
"actual_elapsed": 277.2983281612396
},
"version": "2.4b2.dev174",
"load": 0.0,
"seconds_behind": 0.0,
"threads": 100,
"warc_bytes_written": 0,
"role": "warcprox",
"version": "2.4b3.dev189",
"host": "ayutla.local",
"address": "127.0.0.1",
"port": 8000,
"pid": 60555,
"threads": 100,
"active_requests": 1,
"unaccepted_requests": 0,
"load": 0.0,
"queued_urls": 0,
"queue_max_size": 500,
"urls_processed": 0,
"warc_bytes_written": 0,
"start_time": "2018-10-30T20:15:19.929861Z",
"rates_1min": {
"actual_elapsed": 61.76024103164673,
"urls_per_sec": 0.0,
"warc_bytes_per_sec": 0.0
},
"rates_5min": {
"actual_elapsed": 1.7602601051330566,
"urls_per_sec": 0.0,
"warc_bytes_per_sec": 0.0
},
"rates_15min": {
"actual_elapsed": 1.7602710723876953,
"urls_per_sec": 0.0,
"warc_bytes_per_sec": 0.0
},
"earliest_still_active_fetch_start": "2018-10-30T20:15:21.691467Z",
"seconds_behind": 0.001758,
"postfetch_chain": [
{
"queued_urls": 0,
"processor": "SkipFacebookCaptchas"
"processor": "DedupLoader",
"queued_urls": 0
},
{
"queued_urls": 0,
"processor": "BatchTroughLoader"
"processor": "WarcWriterProcessor",
"queued_urls": 0
},
{
"queued_urls": 0,
"processor": "WarcWriterProcessor"
"processor": "DedupDb",
"queued_urls": 0
},
{
"queued_urls": 0,
"processor": "BatchTroughStorer"
"processor": "StatsProcessor",
"queued_urls": 0
},
{
"queued_urls": 0,
"processor": "RethinkStatsProcessor"
},
{
"queued_urls": 0,
"processor": "CrawlLogger"
},
{
"queued_urls": 0,
"processor": "TroughFeed"
},
{
"queued_urls": 0,
"processor": "RunningStats"
"processor": "RunningStats",
"queued_urls": 0
}
],
"queue_max_size": 500,
"role": "warcprox",
"queued_urls": 0,
"active_requests": 1,
"host": "wbgrp-svc405.us.archive.org",
"rates_15min": {
"warc_bytes_per_sec": 0.0,
"urls_per_sec": 0.0,
"actual_elapsed": 876.9885368347168
},
"unaccepted_requests": 0,
"urls_processed": 0,
"pid": 18841,
"address": "127.0.0.1",
"rates_1min": {
"warc_bytes_per_sec": 0.0,
"urls_per_sec": 0.0,
"actual_elapsed": 54.92501664161682
},
"start_time": 1526690353.4060142
}
]
``WARCPROX_WRITE_RECORD`` http method
=====================================

View File

@ -1,416 +0,0 @@
# === warcprox note ===
# tests/test_queue.py - tests that ensure warcprox.TimestampedQueue doesn't
# break any of the queue.Queue functionality
#
# This file copied and slightly modified from the cpython source:
# https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/Lib/test/test_queue.py
# See https://github.com/python/cpython/blob/38c707e7e0322f9139bb51ad73ede1e3b46985ef/LICENSE
# for licensing information.
#
# Changes for warcprox are clearly delimited. The modifications are:
#
# Copyright (C) 2017 Internet Archive
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
# === end warcprox note ===
# Some simple queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
# === commented out for warcprox ===
# import queue
# === end commented out for warcprox ===
# === added for warcprox ===
# make it work with py2
try:
import queue
except:
import Queue as queue
import threading
# === end added for warcprox ===
import time
import unittest
# === commented out for warcprox ===
# from test import support
# threading = support.import_module('threading')
# === end commented out for warcprox ===
QUEUE_SIZE = 5
def qfull(q):
return q.maxsize > 0 and q.qsize() == q.maxsize
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
self.fn = fn
self.args = args
self.startedEvent = threading.Event()
threading.Thread.__init__(self)
def run(self):
# The sleep isn't necessary, but is intended to give the blocking
# function in the main thread a chance at actually blocking before
# we unclog it. But if the sleep is longer than the timeout-based
# tests wait in their blocking functions, those tests will fail.
# So we give them much longer timeout values compared to the
# sleep here (I aimed at 10 seconds for blocking functions --
# they should never actually wait that long - they should make
# progress as soon as we call self.fn()).
time.sleep(0.1)
self.startedEvent.set()
self.fn(*self.args)
# Execute a function that blocks, and in a separate thread, a function that
# triggers the release. Returns the result of the blocking function. Caution:
# block_func must guarantee to block until trigger_func is called, and
# trigger_func must guarantee to change queue state so that block_func can make
# enough progress to return. In particular, a block_func that just raises an
# exception regardless of whether trigger_func is called will lead to
# timing-dependent sporadic failures, and one of those went rarely seen but
# undiagnosed for years. Now block_func must be unexceptional. If block_func
# is supposed to raise an exception, call do_exceptional_blocking_test()
# instead.
class BlockingTestMixin:
def tearDown(self):
self.t = None
def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
self.t = _TriggerThread(trigger_func, trigger_args)
self.t.start()
self.result = block_func(*block_args)
# If block_func returned before our thread made the call, we failed!
if not self.t.startedEvent.is_set():
self.fail("blocking function '%r' appeared not to block" %
block_func)
self.t.join(10) # make sure the thread terminates
if self.t.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
return self.result
# Call this instead if block_func is supposed to raise an exception.
def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
trigger_args, expected_exception_class):
self.t = _TriggerThread(trigger_func, trigger_args)
self.t.start()
try:
try:
block_func(*block_args)
except expected_exception_class:
raise
else:
self.fail("expected exception of kind %r" %
expected_exception_class)
finally:
self.t.join(10) # make sure the thread terminates
if self.t.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
if not self.t.startedEvent.is_set():
self.fail("trigger thread ended but event never set")
class BaseQueueTestMixin(BlockingTestMixin):
def setUp(self):
self.cum = 0
self.cumlock = threading.Lock()
def simple_queue_test(self, q):
if q.qsize():
raise RuntimeError("Call this function with an empty queue")
self.assertTrue(q.empty())
self.assertFalse(q.full())
# I guess we better check things actually queue correctly a little :)
q.put(111)
q.put(333)
q.put(222)
target_order = dict(Queue = [111, 333, 222],
# === warcprox addition ===
TimestampedQueue = [111, 333, 222],
# === end warcprox addition ===
LifoQueue = [222, 333, 111],
PriorityQueue = [111, 222, 333])
actual_order = [q.get(), q.get(), q.get()]
self.assertEqual(actual_order, target_order[q.__class__.__name__],
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
self.assertTrue(q.qsize(), "Queue should not be empty")
self.assertTrue(not qfull(q), "Queue should not be full")
last = 2 * QUEUE_SIZE
full = 3 * 2 * QUEUE_SIZE
q.put(last)
self.assertTrue(qfull(q), "Queue should be full")
self.assertFalse(q.empty())
self.assertTrue(q.full())
try:
q.put(full, block=0)
self.fail("Didn't appear to block with a full queue")
except queue.Full:
pass
try:
q.put(full, timeout=0.01)
self.fail("Didn't appear to time-out with a full queue")
except queue.Full:
pass
# Test a blocking put
self.do_blocking_test(q.put, (full,), q.get, ())
self.do_blocking_test(q.put, (full, True, 10), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
self.assertTrue(not q.qsize(), "Queue should be empty")
try:
q.get(block=0)
self.fail("Didn't appear to block with an empty queue")
except queue.Empty:
pass
try:
q.get(timeout=0.01)
self.fail("Didn't appear to time-out with an empty queue")
except queue.Empty:
pass
# Test a blocking get
self.do_blocking_test(q.get, (), q.put, ('empty',))
self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
def worker(self, q):
while True:
x = q.get()
if x < 0:
q.task_done()
return
with self.cumlock:
self.cum += x
q.task_done()
def queue_join_test(self, q):
self.cum = 0
for i in (0,1):
threading.Thread(target=self.worker, args=(q,)).start()
for i in range(100):
q.put(i)
q.join()
self.assertEqual(self.cum, sum(range(100)),
"q.join() did not block until all tasks were done")
for i in (0,1):
q.put(-1) # instruct the threads to close
q.join() # verify that you can join twice
def test_queue_task_done(self):
# Test to make sure a queue task completed successfully.
q = self.type2test()
try:
q.task_done()
except ValueError:
pass
else:
self.fail("Did not detect task count going negative")
def test_queue_join(self):
# Test that a queue join()s successfully, and before anything else
# (done twice for insurance).
q = self.type2test()
self.queue_join_test(q)
self.queue_join_test(q)
try:
q.task_done()
except ValueError:
pass
else:
self.fail("Did not detect task count going negative")
def test_simple_queue(self):
# Do it a couple of times on the same queue.
# Done twice to make sure works with same instance reused.
q = self.type2test(QUEUE_SIZE)
self.simple_queue_test(q)
self.simple_queue_test(q)
def test_negative_timeout_raises_exception(self):
q = self.type2test(QUEUE_SIZE)
with self.assertRaises(ValueError):
q.put(1, timeout=-1)
with self.assertRaises(ValueError):
q.get(1, timeout=-1)
def test_nowait(self):
q = self.type2test(QUEUE_SIZE)
for i in range(QUEUE_SIZE):
q.put_nowait(1)
with self.assertRaises(queue.Full):
q.put_nowait(1)
for i in range(QUEUE_SIZE):
q.get_nowait()
with self.assertRaises(queue.Empty):
q.get_nowait()
def test_shrinking_queue(self):
# issue 10110
q = self.type2test(3)
q.put(1)
q.put(2)
q.put(3)
with self.assertRaises(queue.Full):
q.put_nowait(4)
self.assertEqual(q.qsize(), 3)
# === changed for warcprox to make test pass on python2 ===
# q.maxsize = 2 # shrink the queue
q.maxsize = 3 # shrink the queue
# === end changed for warcprox ===
with self.assertRaises(queue.Full):
q.put_nowait(4)
# === commented out for warcprox ===
# class QueueTest(BaseQueueTestMixin, unittest.TestCase):
# type2test = queue.Queue
#
# class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
# type2test = queue.LifoQueue
#
# class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
# type2test = queue.PriorityQueue
#
# === end commented out for warcprox ===
# === warcprox additions ===
import warcprox
class TimestampedQueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = warcprox.TimestampedQueue
# === end warcprox additions
# === commented out for warcprox ===
# # A Queue subclass that can provoke failure at a moment's notice :)
# class FailingQueueException(Exception):
# pass
#
# class FailingQueue(queue.Queue):
# def __init__(self, *args):
# self.fail_next_put = False
# self.fail_next_get = False
# queue.Queue.__init__(self, *args)
# def _put(self, item):
# if self.fail_next_put:
# self.fail_next_put = False
# raise FailingQueueException("You Lose")
# return queue.Queue._put(self, item)
# def _get(self):
# if self.fail_next_get:
# self.fail_next_get = False
# raise FailingQueueException("You Lose")
# return queue.Queue._get(self)
#
# class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
#
# def failing_queue_test(self, q):
# if q.qsize():
# raise RuntimeError("Call this function with an empty queue")
# for i in range(QUEUE_SIZE-1):
# q.put(i)
# # Test a failing non-blocking put.
# q.fail_next_put = True
# try:
# q.put("oops", block=0)
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# q.fail_next_put = True
# try:
# q.put("oops", timeout=0.1)
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# q.put("last")
# self.assertTrue(qfull(q), "Queue should be full")
# # Test a failing blocking put
# q.fail_next_put = True
# try:
# self.do_blocking_test(q.put, ("full",), q.get, ())
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# # Check the Queue isn't damaged.
# # put failed, but get succeeded - re-add
# q.put("last")
# # Test a failing timeout put
# q.fail_next_put = True
# try:
# self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
# FailingQueueException)
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# # Check the Queue isn't damaged.
# # put failed, but get succeeded - re-add
# q.put("last")
# self.assertTrue(qfull(q), "Queue should be full")
# q.get()
# self.assertTrue(not qfull(q), "Queue should not be full")
# q.put("last")
# self.assertTrue(qfull(q), "Queue should be full")
# # Test a blocking put
# self.do_blocking_test(q.put, ("full",), q.get, ())
# # Empty it
# for i in range(QUEUE_SIZE):
# q.get()
# self.assertTrue(not q.qsize(), "Queue should be empty")
# q.put("first")
# q.fail_next_get = True
# try:
# q.get()
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# self.assertTrue(q.qsize(), "Queue should not be empty")
# q.fail_next_get = True
# try:
# q.get(timeout=0.1)
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# self.assertTrue(q.qsize(), "Queue should not be empty")
# q.get()
# self.assertTrue(not q.qsize(), "Queue should be empty")
# q.fail_next_get = True
# try:
# self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
# FailingQueueException)
# self.fail("The queue didn't fail when it should have")
# except FailingQueueException:
# pass
# # put succeeded, but get failed.
# self.assertTrue(q.qsize(), "Queue should not be empty")
# q.get()
# self.assertTrue(not q.qsize(), "Queue should be empty")
#
# def test_failing_queue(self):
# # Test to make sure a queue is functioning correctly.
# # Done twice to the same instance.
# q = FailingQueue(QUEUE_SIZE)
# self.failing_queue_test(q)
# self.failing_queue_test(q)
# === end commented out for warcprox ===
if __name__ == "__main__":
unittest.main()

View File

@ -1551,7 +1551,8 @@ def test_status_api(warcprox_):
'queued_urls', 'queue_max_size', 'seconds_behind', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests', 'rates_15min',
'active_requests','start_time','urls_processed',
'warc_bytes_written','postfetch_chain',}
'warc_bytes_written', 'postfetch_chain',
'earliest_still_active_fetch_start',}
assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port
@ -1573,26 +1574,14 @@ def test_svcreg_status(warcprox_):
'first_heartbeat', 'ttl', 'last_heartbeat', 'threads',
'rates_5min', 'rates_1min', 'unaccepted_requests',
'rates_15min', 'active_requests','start_time','urls_processed',
'warc_bytes_written','postfetch_chain',}
'warc_bytes_written', 'postfetch_chain',
'earliest_still_active_fetch_start',}
assert status['role'] == 'warcprox'
assert status['version'] == warcprox.__version__
assert status['port'] == warcprox_.proxy.server_port
assert status['pid'] == os.getpid()
assert status['threads'] == warcprox_.proxy.pool._max_workers
def test_timestamped_queue():
# see also test_queue.py
q = warcprox.TimestampedQueue()
q.put('monkey')
q.put('flonkey')
timestamp_item = q.get_with_timestamp()
assert isinstance(timestamp_item, tuple)
assert isinstance(timestamp_item[0], datetime.datetime)
assert timestamp_item[1] == 'monkey'
assert timestamp_item[0] < q.oldest_timestamp()
time.sleep(1)
assert q.seconds_behind() > 1
def test_controller_with_defaults():
# tests some initialization code that we rarely touch otherwise
controller = warcprox.controller.WarcproxController()

View File

@ -35,8 +35,9 @@ import io
import tempfile
import logging
import hashlib
import queue
def lock_file(queue, filename):
def lock_file(q, filename):
"""Try to lock file and return 1 if successful, else return 0.
It is necessary to run this method in a different process to test locking.
"""
@ -44,9 +45,9 @@ def lock_file(queue, filename):
fi = open(filename, 'ab')
fcntl.lockf(fi, fcntl.LOCK_EX | fcntl.LOCK_NB)
fi.close()
queue.put('OBTAINED LOCK')
q.put('OBTAINED LOCK')
except IOError:
queue.put('FAILED TO OBTAIN LOCK')
q.put('FAILED TO OBTAIN LOCK')
def test_warc_writer_locking(tmpdir):
@ -69,18 +70,18 @@ def test_warc_writer_locking(tmpdir):
assert warcs
target_warc = os.path.join(dirname, warcs[0])
# launch another process and try to lock WARC file
queue = Queue()
p = Process(target=lock_file, args=(queue, target_warc))
q = Queue()
p = Process(target=lock_file, args=(q, target_warc))
p.start()
p.join()
assert queue.get() == 'FAILED TO OBTAIN LOCK'
assert q.get() == 'FAILED TO OBTAIN LOCK'
wwriter.close_writer()
# locking must succeed after writer has closed the WARC file.
p = Process(target=lock_file, args=(queue, target_warc))
p = Process(target=lock_file, args=(q, target_warc))
p.start()
p.join()
assert queue.get() == 'OBTAINED LOCK'
assert q.get() == 'OBTAINED LOCK'
def wait(callback, timeout):
start = time.time()
@ -97,8 +98,8 @@ def test_special_dont_write_prefix():
wwt = warcprox.writerthread.WarcWriterProcessor(
Options(prefix='-', writer_threads=1))
wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1)
wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1)
try:
wwt.start()
# not to be written due to default prefix
@ -131,8 +132,8 @@ def test_special_dont_write_prefix():
wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1, blackout_period=60, prefix='foo'))
wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1)
wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1)
try:
wwt.start()
# to be written due to default prefix
@ -206,8 +207,8 @@ def test_do_not_archive():
wwt = warcprox.writerthread.WarcWriterProcessor(
Options(writer_threads=1))
wwt.inq = warcprox.TimestampedQueue(maxsize=1)
wwt.outq = warcprox.TimestampedQueue(maxsize=1)
wwt.inq = queue.Queue(maxsize=1)
wwt.outq = queue.Queue(maxsize=1)
try:
wwt.start()
# to be written -- default do_not_archive False

View File

@ -31,6 +31,7 @@ try:
import queue
except ImportError:
import Queue as queue
import json
__version__ = _get_distribution('warcprox').version
@ -47,6 +48,15 @@ class Options(_Namespace):
except AttributeError:
return None
class Jsonner(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime.datetime):
return o.isoformat()
elif isinstance(o, bytes):
return base64.b64encode(o).decode('ascii')
else:
return json.JSONEncoder.default(self, o)
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
'''
`concurrent.futures.ThreadPoolExecutor` supporting a queue of limited size.
@ -58,36 +68,6 @@ class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
super().__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=max_queued or 0)
class TimestampedQueue(queue.Queue):
"""
A queue.Queue that exposes the time enqueued of the oldest item in the
queue.
"""
def put(self, item, block=True, timeout=None):
return queue.Queue.put(
self, (datetime.datetime.utcnow(), item), block, timeout)
def get(self, block=True, timeout=None):
timestamp, item = self.get_with_timestamp(block, timeout)
return item
get_with_timestamp = queue.Queue.get
def oldest_timestamp(self):
with self.mutex:
if self.queue:
timestamp, item = self.queue[0]
else:
return None
return timestamp
def seconds_behind(self):
timestamp = self.oldest_timestamp()
if timestamp:
return (datetime.datetime.utcnow() - timestamp).total_seconds()
else:
return 0.0
# XXX linux-specific
def gettid():
try:

View File

@ -35,6 +35,7 @@ import certauth
import functools
import doublethink
import importlib
import queue
class Factory:
@staticmethod
@ -149,8 +150,35 @@ class WarcproxController(object):
self.service_registry = Factory.service_registry(options)
def earliest_still_active_fetch_start(self):
'''
Looks at urls currently in flight, either being fetched or being
processed at some step of the postfetch chain, finds the one with the
earliest fetch start time, and returns that time.
'''
earliest = None
for timestamp in list(self.proxy.active_requests.values()):
if earliest is None or timestamp < earliest:
earliest = timestamp
for processor in self._postfetch_chain:
with processor.inq.mutex:
l = list(processor.inq.queue)
for recorded_url in l:
if earliest is None or recorded_url.timestamp < earliest:
earliest = recorded_url.timestamp
return earliest
def postfetch_status(self):
result = {'postfetch_chain': []}
earliest = self.earliest_still_active_fetch_start()
if earliest:
seconds_behind = (doublethink.utcnow() - earliest).total_seconds()
else:
seconds_behind = 0
result = {
'earliest_still_active_fetch_start': earliest,
'seconds_behind': seconds_behind,
'postfetch_chain': []
}
for processor in self._postfetch_chain:
if processor.__class__ == warcprox.ListenerPostfetchProcessor:
name = processor.listener.__class__.__name__
@ -171,7 +199,7 @@ class WarcproxController(object):
'''
assert not processor0.outq
assert not processor1.inq
q = warcprox.TimestampedQueue(maxsize=self.options.queue_size)
q = queue.Queue(maxsize=self.options.queue_size)
processor0.outq = q
processor1.inq = q

View File

@ -65,6 +65,7 @@ import time
import collections
import cProfile
from urllib3.util import is_connection_dropped
import doublethink
class ProxyingRecorder(object):
"""
@ -515,7 +516,7 @@ class MitmProxyHandler(http_server.BaseHTTPRequestHandler):
class PooledMixIn(socketserver.ThreadingMixIn):
logger = logging.getLogger("warcprox.mitmproxy.PooledMixIn")
def __init__(self, max_threads=None):
self.active_requests = set()
self.active_requests = {}
self.unaccepted_requests = 0
self.max_threads = max_threads or 100
self.pool = concurrent.futures.ThreadPoolExecutor(self.max_threads)
@ -533,15 +534,15 @@ class PooledMixIn(socketserver.ThreadingMixIn):
return result
def process_request(self, request, client_address):
self.active_requests.add(request)
self.active_requests[request] = doublethink.utcnow()
future = self.pool.submit(
self.process_request_thread, request, client_address)
future.add_done_callback(
lambda f: self.active_requests.discard(request))
lambda f: self.active_requests.pop(request, None))
if future.done():
# avoid theoretical timing issue, in case process_request_thread
# managed to finish before future.add_done_callback() ran
self.active_requests.discard(request)
self.active_requests.pop(request, None)
def get_request(self):
'''

View File

@ -46,6 +46,7 @@ import os
from urllib3 import PoolManager
import tempfile
import hashlib
import doublethink
class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
'''
@ -199,7 +200,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
del self.headers['Warcprox-Meta']
remote_ip = self._remote_server_conn.sock.getpeername()[0]
timestamp = datetime.datetime.utcnow()
timestamp = doublethink.utcnow()
extra_response_headers = {}
if warcprox_meta and 'accept' in warcprox_meta and \
'capture-metadata' in warcprox_meta['accept']:
@ -225,7 +226,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
client_ip=self.client_address[0],
content_type=content_type, method=self.command,
timestamp=timestamp, host=self.hostname,
duration=datetime.datetime.utcnow()-timestamp,
duration=doublethink.utcnow()-timestamp,
referer=self.headers.get('referer'),
payload_digest=prox_rec_res.payload_digest,
truncated=prox_rec_res.truncated)
@ -245,7 +246,8 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
}
status_info.update(self.server.status())
payload = json.dumps(
status_info, indent=2).encode('utf-8') + b'\n'
status_info, cls=warcprox.Jsonner,
indent=2).encode('utf-8') + b'\n'
self.send_response(200, 'OK')
self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', len(payload))
@ -289,7 +291,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
if ('Content-Length' in self.headers and 'Content-Type' in self.headers
and (warc_type or 'WARC-Type' in self.headers)):
timestamp = datetime.datetime.utcnow()
timestamp = doublethink.utcnow()
request_data = tempfile.SpooledTemporaryFile(
max_size=self._tmp_file_max_memory_size)
@ -322,7 +324,7 @@ class WarcProxyHandler(warcprox.mitmproxy.MitmProxyHandler):
client_ip=self.client_address[0],
method=self.command,
timestamp=timestamp,
duration=datetime.datetime.utcnow()-timestamp,
duration=doublethink.utcnow()-timestamp,
payload_digest=payload_digest)
request_data.seek(0)
@ -424,6 +426,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
def __init__(
self, stats_db=None, status_callback=None,
options=warcprox.Options()):
self.start_time = doublethink.utcnow()
self.status_callback = status_callback
self.stats_db = stats_db
self.options = options
@ -460,8 +463,7 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
certs_dir=options.certs_dir or './warcprox-ca',
ca_name=ca_name)
self.recorded_url_q = warcprox.TimestampedQueue(
maxsize=options.queue_size or 1000)
self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000)
self.running_stats = warcprox.stats.RunningStats()
@ -475,10 +477,9 @@ class SingleThreadedWarcProxy(http_server.HTTPServer, object):
self.recorded_url_q.maxsize or 100),
'queued_urls': self.recorded_url_q.qsize(),
'queue_max_size': self.recorded_url_q.maxsize,
'seconds_behind': self.recorded_url_q.seconds_behind(),
'urls_processed': self.running_stats.urls,
'warc_bytes_written': self.running_stats.warc_bytes,
'start_time': self.running_stats.first_snap_time,
'start_time': self.start_time,
})
elapsed, urls_per_sec, warc_bytes_per_sec = self.running_stats.current_rates(1)
result['rates_1min'] = {