From 07420143c465a453e7f356ee504b0a167e154818 Mon Sep 17 00:00:00 2001 From: Luca G <1923603+lgibelli@users.noreply.github.com> Date: Wed, 27 Aug 2025 11:34:46 +0000 Subject: [PATCH] Fix vLLM queue overflow with serialized semaphore release Multiple workers could acquire semaphore in rapid succession when queue dropped, causing bursts of 1000+ page submissions and vLLM crashes. Race condition in semaphore release logic - multiple threads could evaluate conditions and release simultaneously before queue updated. Add asyncio.Lock() to serialize release checks, ensuring atomic evaluation and release. All condition checks now happen inside the lock. --- olmocr/pipeline.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/olmocr/pipeline.py b/olmocr/pipeline.py index 04a21701..4c7fd662 100644 --- a/olmocr/pipeline.py +++ b/olmocr/pipeline.py @@ -632,6 +632,10 @@ def _kill_proc(): running_reqs_decreased = False server_printed_ready_message = False last_semaphore_release = time.time() + # Track pages submitted by last worker to ensure some complete before next release + pages_submitted_at_last_release = 0 + # Lock to prevent multiple rapid semaphore releases + release_lock = asyncio.Lock() async def process_line(line): nonlocal last_running_req, last_queue_req, peak_running_req, running_reqs_decreased, last_semaphore_release, server_printed_ready_message @@ -673,25 +677,29 @@ async def read_stream(stream): logger.warning(f"Got {ex} when reading log line from inference server, skipping") async def timeout_task(): - nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased + nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased, pages_submitted_at_last_release + try: while True: await asyncio.sleep(1) - - # Check if we should release the semaphore - should_release = ( - server_printed_ready_message - and last_queue_req <= int(peak_running_req * 0.1) - and time.time() - last_semaphore_release > 30 - and semaphore.locked() - and (last_running_req == 0 or running_reqs_decreased) - ) - - if should_release: - semaphore.release() - running_reqs_decreased = False # Reset flag after release - last_semaphore_release = time.time() - logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})") + + # Only check basic conditions before acquiring lock + if server_printed_ready_message and semaphore.locked(): + # Use lock to prevent multiple releases + async with release_lock: + # Check ALL conditions inside the lock to prevent race conditions + should_release = ( + semaphore.locked() + and last_queue_req <= int(peak_running_req * 0.1) # Original logic: 10% of peak + and time.time() - last_semaphore_release > 30 # Original 30s cooldown + and (last_running_req == 0 or running_reqs_decreased) # Original condition + ) + + if should_release: + semaphore.release() + running_reqs_decreased = False # Reset flag after release + last_semaphore_release = time.time() + logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})") except asyncio.CancelledError: pass # Clean up if the task is cancelled