Skip to content

Commit 65549c4

Browse files
authored
add/fix(langfuse): log levels (#2522)
* add: langfuse log levels * fix: dry finally block
1 parent 57935db commit 65549c4

File tree

2 files changed

+89
-7
lines changed

2 files changed

+89
-7
lines changed

integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import contextlib
66
import os
7+
import sys
78
from abc import ABC, abstractmethod
89
from collections import Counter
910
from collections.abc import Iterator
@@ -299,7 +300,9 @@ def create_span(self, context: SpanContext) -> LangfuseSpan:
299300
)
300301
# Create a new trace when there's no parent span
301302
span_context_manager = self.tracer.start_as_current_observation(
302-
name=context.trace_name, version=tracing_ctx.get("version"), as_type=root_span_type
303+
name=context.trace_name,
304+
version=tracing_ctx.get("version"),
305+
as_type=root_span_type,
303306
)
304307

305308
# Create LangfuseSpan which will handle entering the context manager
@@ -466,16 +469,44 @@ def trace(
466469

467470
try:
468471
yield span
469-
finally:
470-
# Always clean up context, even if nested operations fail
472+
except Exception:
473+
# Exception occurred - capture exception info and pass to __exit__
474+
# This allows Langfuse/OpenTelemetry to properly mark the span with ERROR level
475+
exc_info = sys.exc_info()
471476
try:
472477
# Process span data (may fail with nested pipeline exceptions)
473478
self._span_handler.handle(span, component_type)
474479

475-
# End span (may fail if span data is corrupted)
480+
# End span with exception info (may fail if span data is corrupted)
481+
raw_span = span.raw_span()
482+
if span._context_manager is not None:
483+
# Pass actual exception info to mark span as failed with ERROR level
484+
span._context_manager.__exit__(*exc_info)
485+
elif hasattr(raw_span, "end"):
486+
# Only call end() if it's not a context manager
487+
raw_span.end()
488+
except Exception as cleanup_error:
489+
# Log cleanup errors but don't let them corrupt context
490+
logger.warning(
491+
"Error during span cleanup for {operation_name}: {cleanup_error}",
492+
operation_name=operation_name,
493+
cleanup_error=cleanup_error,
494+
)
495+
496+
# Re-raise the original exception
497+
raise
498+
else:
499+
# No exception - clean exit with success status
500+
# This preserves any manually-set log levels (WARNING, DEBUG)
501+
try:
502+
# Process span data
503+
self._span_handler.handle(span, component_type)
504+
505+
# End span successfully
476506
raw_span = span.raw_span()
477507
# In v3, we need to properly exit context managers
478508
if span._context_manager is not None:
509+
# No exception - pass None to indicate success
479510
span._context_manager.__exit__(None, None, None)
480511
elif hasattr(raw_span, "end"):
481512
# Only call end() if it's not a context manager
@@ -487,9 +518,9 @@ def trace(
487518
operation_name=operation_name,
488519
cleanup_error=cleanup_error,
489520
)
490-
finally:
491-
# Restore previous span stack using saved token - ensures proper cleanup
492-
span_stack_var.reset(token)
521+
finally:
522+
# Restore previous span stack using saved token
523+
span_stack_var.reset(token)
493524

494525
if self.enforce_flush:
495526
self.flush()

integrations/langfuse/tests/test_tracer.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,3 +690,54 @@ async def run_concurrent_traces():
690690
assert task2_spans[1][2] == task2_inner # current_span during inner
691691
assert task2_spans[2][2] == task2_outer # current_span after inner
692692
assert task2_spans[3][2] is None # current_span after outer
693+
694+
def test_trace_exception_handling(self):
695+
"""
696+
Test that exceptions are properly captured and passed to span __exit__.
697+
698+
This verifies the new exception handling behavior where:
699+
- Exception case: __exit__() receives (exc_type, exc_val, exc_tb)
700+
- Success case: __exit__() receives (None, None, None)
701+
"""
702+
# Create a mock context manager that tracks how __exit__ was called
703+
mock_exit_calls = []
704+
705+
class TrackingContextManager:
706+
def __init__(self):
707+
self._span = MockSpan()
708+
709+
def __enter__(self):
710+
return self._span
711+
712+
def __exit__(self, exc_type, exc_val, exc_tb):
713+
# Track what was passed to __exit__
714+
mock_exit_calls.append((exc_type, exc_val, exc_tb))
715+
return False # Don't suppress exceptions
716+
717+
mock_client = MockLangfuseClient()
718+
mock_client._mock_context_manager = TrackingContextManager()
719+
720+
tracer = LangfuseTracer(tracer=mock_client, name="Test", public=False)
721+
722+
# Test 1: Exception case - __exit__ should receive exception info
723+
mock_exit_calls.clear()
724+
error_msg = "test error"
725+
with pytest.raises(ValueError, match="test error"):
726+
with tracer.trace("test_operation"):
727+
raise ValueError(error_msg)
728+
729+
assert len(mock_exit_calls) == 1
730+
assert mock_exit_calls[0][0] is ValueError # exc_type
731+
assert str(mock_exit_calls[0][1]) == error_msg # exc_val
732+
assert mock_exit_calls[0][2] is not None # exc_tb (traceback)
733+
734+
# Test 2: Success case - __exit__ should receive (None, None, None)
735+
mock_exit_calls.clear()
736+
with tracer.trace("test_operation"):
737+
pass # No exception
738+
739+
assert len(mock_exit_calls) == 1
740+
assert mock_exit_calls[0] == (None, None, None)
741+
742+
# Test 3: Verify span stack is cleaned up after exception
743+
assert tracer.current_span() is None

0 commit comments

Comments
 (0)