Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions olmocr/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ def _kill_proc():
running_reqs_decreased = False
server_printed_ready_message = False
last_semaphore_release = time.time()
# Track pages submitted by last worker to ensure some complete before next release
pages_submitted_at_last_release = 0
# Lock to prevent multiple rapid semaphore releases
release_lock = asyncio.Lock()

async def process_line(line):
nonlocal last_running_req, last_queue_req, peak_running_req, running_reqs_decreased, last_semaphore_release, server_printed_ready_message
Expand Down Expand Up @@ -673,25 +677,29 @@ async def read_stream(stream):
logger.warning(f"Got {ex} when reading log line from inference server, skipping")

async def timeout_task():
nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased
nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased, pages_submitted_at_last_release

try:
while True:
await asyncio.sleep(1)

# Check if we should release the semaphore
should_release = (
server_printed_ready_message
and last_queue_req <= int(peak_running_req * 0.1)
and time.time() - last_semaphore_release > 30
and semaphore.locked()
and (last_running_req == 0 or running_reqs_decreased)
)

if should_release:
semaphore.release()
running_reqs_decreased = False # Reset flag after release
last_semaphore_release = time.time()
logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})")

# Only check basic conditions before acquiring lock
if server_printed_ready_message and semaphore.locked():
# Use lock to prevent multiple releases
async with release_lock:
# Check ALL conditions inside the lock to prevent race conditions
should_release = (
semaphore.locked()
and last_queue_req <= int(peak_running_req * 0.1) # Original logic: 10% of peak
and time.time() - last_semaphore_release > 30 # Original 30s cooldown
and (last_running_req == 0 or running_reqs_decreased) # Original condition
)

if should_release:
semaphore.release()
running_reqs_decreased = False # Reset flag after release
last_semaphore_release = time.time()
logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})")
except asyncio.CancelledError:
pass # Clean up if the task is cancelled

Expand Down