Skip to content

Conversation

@datanatas
Copy link

@datanatas datanatas commented Dec 12, 2025

Issue discussion langfuse/langfuse#11104

Problem

The flush() method would hang indefinitely in serverless environments like Google Cloud Functions when consumer threads died before flush was called. This occurred because:

  1. queue.join() waits for all queued items to be processed via task_done() calls
  2. Consumer threads are daemon threads that can be terminated by the serverless runtime
  3. When threads die, no one calls task_done() on remaining queue items, causing join() to wait forever
  4. The function would hit its timeout limit without completing

Related issue: The flush would successfully process OTEL and score ingestion queues, but hang on the media upload queue, never reaching the final log statements.

Root Cause

In serverless environments like Google Cloud Functions, daemon threads can be killed at any point during execution. As documented in the official Google Cloud documentation:

A function has access to its allocated resources (memory and CPU) only for the duration of function execution. Code run outside of the execution period is not guaranteed to execute, and it can be stopped at any time. ([Source](https://cloud.google.com/functions/1stgendocs/concepts/execution-environment))

For background activities specifically:

Background activity is anything that happens after your function has terminated. Any code run after graceful termination cannot access the CPU and will not make any progress. ([Source](https://cloud.google.com/run/docs/tips/functions-best-practices))

From Google's blog on avoiding Cloud Functions anti-patterns:

A background task started by a Cloud Function is not guaranteed to complete. As soon as the Functions completes, e.g. the Function returns or a timeout error occurs, the Function instance can be terminated at any time. ([Source](https://cloud.google.com/blog/topics/developers-practitioners/avoiding-gcf-anti-patterns-part-5-how-run-background-processes-correctly-python))

This behavior is not unique to Cloud Functions - AWS Lambda has similar restrictions as noted in their [Lambda execution environment documentation](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html), where background processes may be frozen when the execution context is frozen for reuse.

Solution

This PR implements a graceful fallback mechanism:

  1. Health checking: Added is_healthy() method to MediaUploadConsumer that tracks thread activity via last_activity timestamp
  2. Smart detection: flush() now checks if consumer threads are alive AND recently active before waiting
  3. Synchronous fallback: If threads are dead or unhealthy, process remaining queue items synchronously in the main thread
  4. Timeout protection: Added 30-second timeout when waiting for healthy threads to prevent infinite waits
  5. Better observability: Enhanced logging to clearly indicate why synchronous processing was triggered

Changes

  • Added last_activity timestamp tracking to MediaUploadConsumer
  • Added is_healthy() method to detect stalled/frozen threads
  • Replaced blocking queue.join() with conditional logic that falls back to synchronous processing
  • Improved error handling and logging in consumer thread loop
  • Added detailed logging to help diagnose thread health issues

Testing

Tested in Google Cloud Functions environment where the issue was consistently reproducible. After this change:

  • ✅ All queued media items are processed successfully
  • ✅ Function completes without hanging
  • ✅ Clear logs indicate when fallback processing occurs
  • ✅ No data loss when threads die

Important

Fixes hanging flush() in serverless environments by adding consumer thread health checks and synchronous fallback processing in resource_manager.py.

  • Behavior:
    • flush() in resource_manager.py now checks consumer thread health using is_healthy() before waiting on the queue.
    • If threads are unhealthy or dead, processes remaining queue items synchronously in the main thread.
    • Adds a 30-second timeout for waiting on healthy threads to prevent indefinite hangs.
  • Methods:
    • Adds is_healthy() to MediaUploadConsumer to check thread activity based on last_activity timestamp.
    • Tracks last_activity in MediaUploadConsumer to monitor thread health.
  • Logging:
    • Enhanced logging in flush() to indicate when synchronous processing is triggered and to log errors during synchronous processing.
    • Improved error handling and logging in MediaUploadConsumer.run() loop.

This description was created by Ellipsis for 7893a44. You can customize this summary. It will automatically update as commits are pushed.

Disclaimer: Experimental PR review

Greptile Overview

Greptile Summary

This PR implements a graceful fallback mechanism to handle dead consumer threads in serverless environments like Google Cloud Functions, preventing infinite hangs during flush() operations.

Key Changes

  • Added last_activity timestamp tracking to MediaUploadConsumer to detect stalled threads
  • Implemented is_healthy() method that checks both thread liveness and recent activity
  • Replaced blocking queue.join() with conditional logic that checks thread health before waiting
  • Added synchronous fallback processing when threads are dead or unhealthy
  • Enhanced error handling and logging throughout the consumer thread loop

Critical Issue

Missing Import: The code uses time.time() and time.sleep() in resource_manager.py but the time module is not imported at the top of the file. This will cause a NameError at runtime when the new code path is executed.

Architecture

The solution correctly addresses the root cause: daemon threads in serverless environments can be terminated by the runtime, leaving queued items unprocessed. The fallback to synchronous processing ensures no data loss while maintaining backward compatibility with normal environments.

Confidence Score: 1/5

  • This PR will fail at runtime due to a missing import and should not be merged until fixed
  • The missing time module import in resource_manager.py is a critical issue that will cause an immediate NameError when the new flush logic executes. While the overall design and approach are sound, this syntax error makes the code non-functional. The PR cannot be safely merged until this import is added.
  • Pay immediate attention to langfuse/_client/resource_manager.py - add the missing import time statement at the top of the module

Important Files Changed

File Analysis

Filename Score Overview
langfuse/_client/resource_manager.py 2/5 Added health checks and synchronous fallback for dead threads, but missing time import will cause runtime error
langfuse/_task_manager/media_upload_consumer.py 5/5 Added activity tracking and health check method with proper error handling

Sequence Diagram

sequenceDiagram
    participant Main as Main Thread
    participant RM as ResourceManager
    participant Consumer as MediaUploadConsumer Thread
    participant Queue as Media Upload Queue
    participant MM as MediaManager

    Note over Main,MM: Normal Operation
    Main->>RM: flush()
    RM->>Consumer: Check is_alive() && is_healthy(5s)
    Consumer-->>RM: True (healthy threads exist)
    RM->>Queue: Check empty() with 30s timeout
    loop While queue not empty and timeout not reached
        Consumer->>Queue: get(block=True, timeout=1)
        Queue-->>Consumer: UploadMediaJob
        Consumer->>Consumer: Update last_activity
        Consumer->>MM: process_next_media_upload()
        MM->>Queue: task_done()
        Consumer->>Consumer: Update last_activity
    end
    Queue-->>RM: Queue empty
    RM-->>Main: Success

    Note over Main,MM: Serverless Environment (Dead Threads)
    Main->>RM: flush()
    RM->>Consumer: Check is_alive() && is_healthy(5s)
    Note over Consumer: Thread killed by runtime
    Consumer-->>RM: False (threads dead/unhealthy)
    RM->>Queue: Check qsize()
    
    loop While queue not empty
        RM->>MM: process_next_media_upload()
        MM->>Queue: get(block=True, timeout=1)
        Queue-->>MM: UploadMediaJob
        MM->>MM: Upload media to storage
        MM->>Queue: task_done()
    end
    RM-->>Main: Success (synchronous fallback)
Loading

- Implement health checks for media upload consumer threads to determine activity status.
- Add fallback logic to process media uploads synchronously if threads are unhealthy or queue is not drained within the timeout.
- Update media upload consumer to track the last activity timestamp and add error handling in the processing loop.
@CLAassistant
Copy link

CLAassistant commented Dec 12, 2025

CLA assistant check
All committers have signed the CLA.

# Check if threads are alive AND healthy (recently active)
healthy_threads = [
c for c in self._media_upload_consumers
if c.is_alive() and c.is_healthy(timeout_seconds=5.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid magic numbers: consider extracting the 5.0s health check and 30s timeout into configurable constants.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 9a3506c

self._media_manager.process_next_media_upload()
try:
# Update activity timestamp before processing
self.last_activity = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating last_activity both before and after processing may misrepresent actual work; consider updating only after successful processing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an upload takes long, I think we'd rather have a signature of when the task starts so that it serves the purpose of knowing the thread is_healthy

# Update after successful processing
self.last_activity = time.time()
except Exception as e:
self._log.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use _log.exception() in the exception block to capture the full stack trace for debugging.

Suggested change
self._log.error(
self._log.exception(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 9a3506c

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@hassiebp hassiebp self-requested a review December 15, 2025 17:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants