diff --git a/warcprox/__init__.py b/warcprox/__init__.py index 60ca2ef..bc1365c 100644 --- a/warcprox/__init__.py +++ b/warcprox/__init__.py @@ -168,13 +168,24 @@ class BaseStandardPostfetchProcessor(BasePostfetchProcessor): class BaseBatchPostfetchProcessor(BasePostfetchProcessor): MAX_BATCH_SIZE = 500 MAX_BATCH_SEC = 10 + MIN_BATCH_SEC = 0.5 def _get_process_put(self): batch = [] start = time.time() - while (len(batch) < self.MAX_BATCH_SIZE - and time.time() - start < self.MAX_BATCH_SEC): + while True: + if len(batch) >= self.MAX_BATCH_SIZE: + break # full batch + + elapsed = time.time() - start + if elapsed >= self.MAX_BATCH_SEC: + break # been batching for a while + + if (elapsed >= self.MIN_BATCH_SEC and self.outq + and len(self.outq.queue) == 0): + break # next processor is waiting on us + try: batch.append(self.inq.get(block=True, timeout=0.5)) except queue.Empty: