From 7ec34478a482cc4c7debdf73a8c65977304a1dfc Mon Sep 17 00:00:00 2001 From: Ivan Sorokin <27285181+1ytic@users.noreply.github.com> Date: Fri, 2 May 2025 18:22:19 +0300 Subject: [PATCH 1/5] Store structure tool inputs Signed-off-by: Ivan Sorokin <27285181+1ytic@users.noreply.github.com> --- src/aiq/profiler/callbacks/langchain_callback_handler.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/aiq/profiler/callbacks/langchain_callback_handler.py b/src/aiq/profiler/callbacks/langchain_callback_handler.py index 88fc7696d..07bd16ed6 100644 --- a/src/aiq/profiler/callbacks/langchain_callback_handler.py +++ b/src/aiq/profiler/callbacks/langchain_callback_handler.py @@ -253,7 +253,7 @@ async def on_tool_start( usage_info=UsageInfo(token_usage=TokenUsageBaseModel())) self.step_manager.push_intermediate_step(stats) - self._run_id_to_tool_input[str(run_id)] = input_str + self._run_id_to_tool_input[str(run_id)] = copy.deepcopy(inputs) self._run_id_to_start_time[str(run_id)] = time.time() async def on_tool_end( @@ -265,14 +265,15 @@ async def on_tool_end( **kwargs: Any, ) -> Any: + inputs = self._run_id_to_tool_input.get(str(run_id), "") + stats = IntermediateStepPayload(event_type=IntermediateStepType.TOOL_END, span_event_timestamp=self._run_id_to_start_time.get(str(run_id), time.time()), framework=LLMFrameworkEnum.LANGCHAIN, name=kwargs.get("name", ""), UUID=str(run_id), - metadata=TraceMetadata(tool_outputs=output), + metadata=TraceMetadata(tool_inputs=inputs, tool_outputs=output), usage_info=UsageInfo(token_usage=TokenUsageBaseModel()), - data=StreamEventData(input=self._run_id_to_tool_input.get(str(run_id), ""), - output=output)) + data=StreamEventData(input=inputs, output=output)) self.step_manager.push_intermediate_step(stats) From 8d5ece2ba009be7e68f15a7fcc4266af062ae51d Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 23 Jul 2025 13:55:25 -0700 Subject: [PATCH 2/5] Add unit test for Langchain tool handler structured inputs Signed-off-by: Yuchen Zhang --- tests/aiq/profiler/test_callback_handler.py | 84 ++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/tests/aiq/profiler/test_callback_handler.py b/tests/aiq/profiler/test_callback_handler.py index 9045bdc77..cdbfebfd0 100644 --- a/tests/aiq/profiler/test_callback_handler.py +++ b/tests/aiq/profiler/test_callback_handler.py @@ -15,7 +15,7 @@ import asyncio import time -from uuid import uuid4 +from uuid import uuid4, UUID import pytest @@ -606,3 +606,85 @@ def execute_agno_tool(tool_func, *args, **kwargs): assert end_event.payload.name == "TestTool" assert "result" in end_event.payload.metadata.tool_outputs assert end_event.payload.metadata.tool_outputs["result"] == "Tool execution result" + + +async def test_langchain_handler_tool_execution(reactive_stream: Subject): + """ + Test that the LangchainProfilerHandler properly stores and retrieves + structured tool inputs for TOOL_START and TOOL_END events. + This test verifies the functionality added in the PR that stores + copy.deepcopy(inputs) instead of just the string representation. + """ + + all_stats = [] + handler = LangchainProfilerHandler() + _ = reactive_stream.subscribe(all_stats.append) + + # Simulate a tool start event with structured inputs + tool_name = "TestSearchTool" + run_id = uuid4() + + # Create structured input data (this is what the PR aims to preserve) + structured_inputs = { + "query": "test search query", + "max_results": 5, + "filters": { + "date_range": { + "start": "2025-01-01", "end": "2025-12-31" + }, "category": ["tech", "science"] + } + } + + await handler.on_tool_start( + serialized={}, + input_str="test search query", # This was the old format + run_id=run_id, + inputs=structured_inputs, # This is the new structured format + name=tool_name) + + # Simulate tool processing time + await asyncio.sleep(0.1) + + # Create tool output + tool_output = { + "results": [{ + "title": "Result 1", "url": "http://example.com/1" + }, { + "title": "Result 2", "url": "http://example.com/2" + }], + "count": 2 + } + + # Simulate tool end event + await handler.on_tool_end(output=tool_output, run_id=run_id, name=tool_name) + + # Verify we have the correct number of events + assert len(all_stats) == 2, f"Expected 2 events but got {len(all_stats)}" + + tool_start_event = all_stats[0] + tool_end_event = all_stats[1] + + # Verify TOOL_START event + assert tool_start_event.event_type == IntermediateStepType.TOOL_START + assert tool_start_event.name == tool_name + assert tool_start_event.framework == LLMFrameworkEnum.LANGCHAIN + + # Verify TOOL_END event + assert tool_end_event.event_type == IntermediateStepType.TOOL_END + assert tool_end_event.name == tool_name + assert tool_end_event.framework == LLMFrameworkEnum.LANGCHAIN + + # Verify that structured inputs are preserved in TOOL_END event + # This is the key functionality being tested from the PR + assert tool_end_event.metadata.tool_inputs == structured_inputs + assert tool_end_event.metadata.tool_outputs == tool_output + assert tool_end_event.data.input == structured_inputs + assert tool_end_event.data.output == tool_output + + # Verify that the inputs are deep copied (not just referenced) + # Modify original inputs and ensure event data is unchanged + structured_inputs["query"] = "modified query" + assert tool_end_event.metadata.tool_inputs["query"] == "test search query" + assert tool_end_event.data.input["query"] == "test search query" + + print("✅ Langchain tool test passed: structured inputs properly stored and retrieved") From 0eb8ea30a3108a9299588b84169672bc060ab8bf Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 23 Jul 2025 14:54:22 -0700 Subject: [PATCH 3/5] fix CI Signed-off-by: Yuchen Zhang --- tests/aiq/profiler/test_callback_handler.py | 50 ++++++++++----------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/aiq/profiler/test_callback_handler.py b/tests/aiq/profiler/test_callback_handler.py index 06c568c04..025fe4c13 100644 --- a/tests/aiq/profiler/test_callback_handler.py +++ b/tests/aiq/profiler/test_callback_handler.py @@ -15,7 +15,7 @@ import asyncio import time -from uuid import uuid4, UUID +from uuid import uuid4 import pytest @@ -116,12 +116,12 @@ async def test_llama_index_handler_order(reactive_stream: Subject): from llama_index.core.llms import ChatMessage from llama_index.core.llms import ChatResponse - payload_start = {EventPayload.PROMPT: "Say something wise."} + payload_start = {EventPayload.PROMPT.value: "Say something wise."} handler.on_event_start(event_type=CBEventType.LLM, payload=payload_start, event_id="evt-1") # Simulate an LLM end event payload_end = { - EventPayload.RESPONSE: + EventPayload.RESPONSE.value: ChatResponse(message=ChatMessage.from_str("42 is the meaning of life."), raw="42 is the meaning of life.") } handler.on_event_end(event_type=CBEventType.LLM, payload=payload_end, event_id="evt-1") @@ -246,9 +246,7 @@ async def test_agno_handler_llm_call(reactive_stream: Subject): """ pytest.importorskip("litellm") - from aiq.builder.context import AIQContext from aiq.profiler.callbacks.agno_callback_handler import AgnoProfilerHandler - from aiq.profiler.callbacks.token_usage_base_model import TokenUsageBaseModel # Create handler and set up collection of results all_stats = [] @@ -258,7 +256,7 @@ async def test_agno_handler_llm_call(reactive_stream: Subject): step_manager = AIQContext.get().intermediate_step_manager # Mock the original LLM call function that would be patched - def original_completion(*args, **kwargs): + def original_completion(*_args, **_kwargs): # pylint: disable=unused-argument return None handler._original_llm_call = original_completion @@ -405,32 +403,35 @@ def wrapped(*args, **kwargs): # Find IntermediateStep objects in all_stats intermediate_steps = [event for event in all_stats if hasattr(event, 'payload')] - # If we don't have IntermediateStep objects, check step_manager + # If we don't have IntermediateStep objects, check all_stats directly if len(intermediate_steps) < 2: - print("Not enough IntermediateStep objects in all_stats, checking step_manager...") - steps = step_manager.get_intermediate_steps() - print(f"Found {len(steps)} steps in step_manager") - for i, step in enumerate(steps): - print(f"Step {i}: {step.event_type}") + print("Not enough IntermediateStep objects in all_stats, using all_stats directly...") + print(f"Found {len(all_stats)} items in all_stats") + for i, stat in enumerate(all_stats): + print(f"Stat {i}: {type(stat)}") - # Verify steps in step_manager - assert len(steps) >= 2, f"Expected at least 2 steps in step_manager, got {len(steps)}" + # Verify we have events in all_stats + assert len(all_stats) >= 2, f"Expected at least 2 events in all_stats, got {len(all_stats)}" - # Find the START and END events from step_manager - start_events = [s for s in steps if s.event_type == IntermediateStepType.LLM_START] - end_events = [s for s in steps if s.event_type == IntermediateStepType.LLM_END] + # Find the START and END events from all_stats + start_events = [ + s for s in all_stats if hasattr(s, 'payload') and s.payload.event_type == IntermediateStepType.LLM_START + ] + end_events = [ + s for s in all_stats if hasattr(s, 'payload') and s.payload.event_type == IntermediateStepType.LLM_END + ] - assert len(start_events) > 0, "No LLM_START events found in step_manager" - assert len(end_events) > 0, "No LLM_END events found in step_manager" + assert len(start_events) > 0, "No LLM_START events found in all_stats" + assert len(end_events) > 0, "No LLM_END events found in all_stats" # Use the latest events for our test start_event = start_events[-1] end_event = end_events[-1] # Check token usage values in the end event - assert end_event.usage_info.token_usage.prompt_tokens == token_usage_obj.prompt_tokens - assert end_event.usage_info.token_usage.completion_tokens == token_usage_obj.completion_tokens - assert end_event.usage_info.token_usage.total_tokens == token_usage_obj.total_tokens + assert end_event.payload.usage_info.token_usage.prompt_tokens == token_usage_obj.prompt_tokens + assert end_event.payload.usage_info.token_usage.completion_tokens == token_usage_obj.completion_tokens + assert end_event.payload.usage_info.token_usage.total_tokens == token_usage_obj.total_tokens else: # Find the START and END events in our intermediate steps start_events = [e for e in intermediate_steps if e.payload.event_type == IntermediateStepType.LLM_START] @@ -466,7 +467,6 @@ async def test_agno_handler_tool_execution(reactive_stream: Subject): Note: This test simulates how tool execution is tracked in the tool_wrapper.py since AgnoProfilerHandler doesn't directly patch tool execution. """ - from aiq.builder.context import AIQContext from aiq.data_models.intermediate_step import IntermediateStep from aiq.data_models.invocation_node import InvocationNode from aiq.profiler.callbacks.agno_callback_handler import AgnoProfilerHandler @@ -479,7 +479,7 @@ async def test_agno_handler_tool_execution(reactive_stream: Subject): step_manager = AIQContext.get().intermediate_step_manager # Define a simple tool function - def sample_tool(arg1, arg2, param1=None, tool_name="SampleTool"): + def sample_tool(arg1, arg2, param1=None, _tool_name="SampleTool"): # pylint: disable=unused-argument print(f"Tool called with {arg1}, {arg2}, {param1}") return "Tool execution result" @@ -686,7 +686,7 @@ async def test_langchain_handler_tool_execution(reactive_stream: Subject): assert tool_end_event.data.input == structured_inputs assert tool_end_event.data.output == tool_output - # Verify that the inputs are deep copied (not just referenced) + # Verify that the inputs are deep copied (not just referenced) # Modify original inputs and ensure event data is unchanged structured_inputs["query"] = "modified query" assert tool_end_event.metadata.tool_inputs["query"] == "test search query" From b89738ba9d6fa4bb6e556e7b7e5227eeaac60f56 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 23 Jul 2025 15:14:36 -0700 Subject: [PATCH 4/5] fix unit tests Signed-off-by: Yuchen Zhang --- tests/aiq/profiler/test_callback_handler.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/aiq/profiler/test_callback_handler.py b/tests/aiq/profiler/test_callback_handler.py index 025fe4c13..9d4e5eecf 100644 --- a/tests/aiq/profiler/test_callback_handler.py +++ b/tests/aiq/profiler/test_callback_handler.py @@ -516,7 +516,9 @@ def execute_agno_tool(tool_func, *args, **kwargs): # Call the tool function try: - result = tool_func(*args, **kwargs) + # Remove tool_name from kwargs before calling the function since it's only used for metadata + tool_kwargs = {k: v for k, v in kwargs.items() if k != "tool_name"} + result = tool_func(*args, **tool_kwargs) # Create end event payload end_payload = IntermediateStepPayload(event_type=IntermediateStepType.TOOL_END, @@ -606,7 +608,7 @@ def execute_agno_tool(tool_func, *args, **kwargs): # Verify event details assert start_event.payload.name == "TestTool" assert "args" in start_event.payload.metadata.tool_inputs - assert tool_args[0] in start_event.payload.metadata.tool_inputs["args"] + assert tool_args[0] in start_event.payload.metadata.tool_inputs.get("args", []) assert end_event.payload.name == "TestTool" assert "result" in end_event.payload.metadata.tool_outputs @@ -617,8 +619,6 @@ async def test_langchain_handler_tool_execution(reactive_stream: Subject): """ Test that the LangchainProfilerHandler properly stores and retrieves structured tool inputs for TOOL_START and TOOL_END events. - This test verifies the functionality added in the PR that stores - copy.deepcopy(inputs) instead of just the string representation. """ all_stats = [] @@ -641,7 +641,7 @@ async def test_langchain_handler_tool_execution(reactive_stream: Subject): } await handler.on_tool_start( - serialized={}, + serialized={"name": tool_name}, # Tool name should be in serialized dict input_str="test search query", # This was the old format run_id=run_id, inputs=structured_inputs, # This is the new structured format @@ -689,7 +689,5 @@ async def test_langchain_handler_tool_execution(reactive_stream: Subject): # Verify that the inputs are deep copied (not just referenced) # Modify original inputs and ensure event data is unchanged structured_inputs["query"] = "modified query" - assert tool_end_event.metadata.tool_inputs["query"] == "test search query" - assert tool_end_event.data.input["query"] == "test search query" - - print("✅ Langchain tool test passed: structured inputs properly stored and retrieved") + assert tool_end_event.metadata.tool_inputs.get("query") == "test search query" + assert tool_end_event.data.input.get("query") == "test search query" From e08f3e143d76c1a38fda0b29181c890c04d262a2 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 23 Jul 2025 15:28:07 -0700 Subject: [PATCH 5/5] clean up comments Signed-off-by: Yuchen Zhang --- tests/aiq/profiler/test_callback_handler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/aiq/profiler/test_callback_handler.py b/tests/aiq/profiler/test_callback_handler.py index 9d4e5eecf..d813657cd 100644 --- a/tests/aiq/profiler/test_callback_handler.py +++ b/tests/aiq/profiler/test_callback_handler.py @@ -629,7 +629,7 @@ async def test_langchain_handler_tool_execution(reactive_stream: Subject): tool_name = "TestSearchTool" run_id = uuid4() - # Create structured input data (this is what the PR aims to preserve) + # Create structured input data structured_inputs = { "query": "test search query", "max_results": 5, @@ -680,7 +680,6 @@ async def test_langchain_handler_tool_execution(reactive_stream: Subject): assert tool_end_event.framework == LLMFrameworkEnum.LANGCHAIN # Verify that structured inputs are preserved in TOOL_END event - # This is the key functionality being tested from the PR assert tool_end_event.metadata.tool_inputs == structured_inputs assert tool_end_event.metadata.tool_outputs == tool_output assert tool_end_event.data.input == structured_inputs