Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,53 @@ def flush(self) -> None:
self._score_ingestion_queue.join()
langfuse_logger.debug("Successfully flushed score ingestion queue")

self._media_upload_queue.join()
# Check if threads are alive AND healthy (recently active)
healthy_threads = [
c for c in self._media_upload_consumers
if c.is_alive() and c.is_healthy(timeout_seconds=5.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid magic numbers: consider extracting the 5.0s health check and 30s timeout into configurable constants.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 9a3506c

]

if healthy_threads:
# Wait for queue to be processed, but with a timeout
langfuse_logger.debug(
f"{len(healthy_threads)} healthy consumer threads active, waiting for queue to drain"
)
start_time = time.time()
timeout = 30

while not self._media_upload_queue.empty() and (time.time() - start_time) < timeout:
time.sleep(0.1)

if self._media_upload_queue.empty():
langfuse_logger.debug("Successfully flushed media upload queue via consumer threads")
return
else:
langfuse_logger.warning(
f"Media upload queue not empty after {timeout}s, "
f"{self._media_upload_queue.qsize()} items remaining. "
f"Processing synchronously."
)
else:
alive_count = len([c for c in self._media_upload_consumers if c.is_alive()])
langfuse_logger.warning(
f"Consumer threads unhealthy or dead ({alive_count} alive but unhealthy). "
f"Processing {self._media_upload_queue.qsize()} queued items synchronously."
)

# Synchronous fallback processing
items_processed = 0
while not self._media_upload_queue.empty():
try:
self._media_manager.process_next_media_upload()
items_processed += 1
except Exception as e:
langfuse_logger.error(f"Error processing media upload synchronously: {e}")

if items_processed > 0:
langfuse_logger.info(
f"Processed {items_processed} media uploads synchronously in flush()"
)

langfuse_logger.debug("Successfully flushed media upload queue")

def shutdown(self) -> None:
Expand Down
27 changes: 26 additions & 1 deletion langfuse/_task_manager/media_upload_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import threading
import time

from .media_manager import MediaManager

Expand All @@ -25,6 +26,8 @@ def __init__(
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
# Track when thread last processed something
self.last_activity = time.time()
self._identifier = identifier
self._media_manager = media_manager

Expand All @@ -34,11 +37,33 @@ def run(self) -> None:
f"Thread: Media upload consumer thread #{self._identifier} started and actively processing queue items"
)
while self.running:
self._media_manager.process_next_media_upload()
try:
# Update activity timestamp before processing
self.last_activity = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating last_activity both before and after processing may misrepresent actual work; consider updating only after successful processing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an upload takes long, I think we'd rather have a signature of when the task starts so that it serves the purpose of knowing the thread is_healthy

self._media_manager.process_next_media_upload()
# Update after successful processing
self.last_activity = time.time()
except Exception as e:
self._log.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use _log.exception() in the exception block to capture the full stack trace for debugging.

Suggested change
self._log.error(
self._log.exception(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 9a3506c

f"Thread #{self._identifier}: Unexpected error in consumer loop: {e}"
)
# Continue running despite errors
time.sleep(0.1)

def pause(self) -> None:
"""Pause the media upload consumer."""
self._log.debug(
f"Thread: Pausing media upload consumer thread #{self._identifier}"
)
self.running = False

def is_healthy(self, timeout_seconds: float = 5.0) -> bool:
"""
Check if thread is healthy and recently active.
Returns False if thread hasn't processed anything in timeout_seconds.
"""
if not self.is_alive():
return False

time_since_activity = time.time() - self.last_activity
return time_since_activity < timeout_seconds