mirror of
https://github.com/internetarchive/warcprox.git
synced 2025-01-18 13:22:09 +01:00
don't keep next processor waiting
in batch postfetch processor, accumulate urls for the next batch for at most 0.5 sec, if the outq is empty (i.e. the next processor is waiting idly)
This commit is contained in:
parent
9e1a7cb6f0
commit
6a64107478
@ -168,13 +168,24 @@ class BaseStandardPostfetchProcessor(BasePostfetchProcessor):
|
|||||||
class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
|
class BaseBatchPostfetchProcessor(BasePostfetchProcessor):
|
||||||
MAX_BATCH_SIZE = 500
|
MAX_BATCH_SIZE = 500
|
||||||
MAX_BATCH_SEC = 10
|
MAX_BATCH_SEC = 10
|
||||||
|
MIN_BATCH_SEC = 0.5
|
||||||
|
|
||||||
def _get_process_put(self):
|
def _get_process_put(self):
|
||||||
batch = []
|
batch = []
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
|
||||||
while (len(batch) < self.MAX_BATCH_SIZE
|
while True:
|
||||||
and time.time() - start < self.MAX_BATCH_SEC):
|
if len(batch) >= self.MAX_BATCH_SIZE:
|
||||||
|
break # full batch
|
||||||
|
|
||||||
|
elapsed = time.time() - start
|
||||||
|
if elapsed >= self.MAX_BATCH_SEC:
|
||||||
|
break # been batching for a while
|
||||||
|
|
||||||
|
if (elapsed >= self.MIN_BATCH_SEC and self.outq
|
||||||
|
and len(self.outq.queue) == 0):
|
||||||
|
break # next processor is waiting on us
|
||||||
|
|
||||||
try:
|
try:
|
||||||
batch.append(self.inq.get(block=True, timeout=0.5))
|
batch.append(self.inq.get(block=True, timeout=0.5))
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user