From 16db11407f24d95f2861030493519a5badba1813 Mon Sep 17 00:00:00 2001 From: Sid Kattoju Date: Tue, 3 Mar 2026 15:12:43 -0500 Subject: [PATCH 1/5] fix: correct node dependency ordering in vacation planner graph Add explicit depends_on for destination_research, hotel_research, and flight_research nodes so the pipeline executes sequentially instead of fan-out racing ahead of its data source. Made-with: Cursor --- backend/agent_templates/travel_vacation_planner.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/backend/agent_templates/travel_vacation_planner.yaml b/backend/agent_templates/travel_vacation_planner.yaml index f32182f7..cca5e0fc 100644 --- a/backend/agent_templates/travel_vacation_planner.yaml +++ b/backend/agent_templates/travel_vacation_planner.yaml @@ -34,6 +34,8 @@ templates: - id: destination_research_task type: mcp_tool_map + depends_on: + - places_list_task server: travel_research tool: tavily_travel_search items_path: outputs.places_list_task @@ -45,14 +47,14 @@ templates: type: llm depends_on: - destination_research_task - - hotel_research_task - - flight_research_task prompt: | Create 2-3 itinerary options for the trip. Include day-by-day highlights and note must-see places. - id: hotel_research_task type: mcp_tool + depends_on: + - itinerary_options_task server: hotel tool: google_hotels_search args: @@ -64,6 +66,8 @@ templates: - id: flight_research_task type: mcp_tool + depends_on: + - hotel_research_task server: flight tool: google_flights_search args: From 7844db47d1d22dc8d8174a82f536685ecef71e66 Mon Sep 17 00:00:00 2001 From: Sid Kattoju Date: Tue, 3 Mar 2026 15:16:04 -0500 Subject: [PATCH 2/5] feat: suppress SSE events for internal graph nodes Add an `internal` flag to graph node configs. When set to true, the node still executes and its output flows through graph state to downstream nodes, but no node_started/response/node_completed SSE events are emitted to the frontend. This keeps intermediate data (e.g. the places_list_task JSON array) out of the user-facing chat. Mark places_list_task as internal in the vacation planner template. Made-with: Cursor --- .../travel_vacation_planner.yaml | 1 + backend/app/services/runners/graph_engine.py | 8 ++- tests/unit/test_graph_engine.py | 59 +++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/backend/agent_templates/travel_vacation_planner.yaml b/backend/agent_templates/travel_vacation_planner.yaml index cca5e0fc..ebdd2e81 100644 --- a/backend/agent_templates/travel_vacation_planner.yaml +++ b/backend/agent_templates/travel_vacation_planner.yaml @@ -28,6 +28,7 @@ templates: nodes: - id: places_list_task type: llm + internal: true prompt: | List 5 specific places, neighborhoods, or attractions in {inputs.destination} that match {inputs.interests}. Return a JSON array of strings only. diff --git a/backend/app/services/runners/graph_engine.py b/backend/app/services/runners/graph_engine.py index 34ddd06f..0e8b487f 100644 --- a/backend/app/services/runners/graph_engine.py +++ b/backend/app/services/runners/graph_engine.py @@ -628,6 +628,7 @@ async def _run(state: GraphState) -> dict: "name": step_id, "summary": _summarize_output(result), "raw": result, + "internal": bool(step.get("internal", False)), } ], } @@ -661,6 +662,12 @@ async def run_streaming( tasks = node_state.get("tasks_output", []) for task in tasks: task_name = task.get("name", node_name) + total_tasks += 1 + + if task.get("internal"): + logger.debug("Suppressing SSE for internal node: %s", task_name) + continue + yield _sse("node_started", {"node": task_name}, session_id) raw = task.get("raw", "") @@ -676,7 +683,6 @@ async def run_streaming( ) yield _sse("node_completed", {"node": task_name}, session_id) - total_tasks += 1 if total_tasks > 0: yield _sse( diff --git a/tests/unit/test_graph_engine.py b/tests/unit/test_graph_engine.py index c0fabd5a..5cf1ef15 100644 --- a/tests/unit/test_graph_engine.py +++ b/tests/unit/test_graph_engine.py @@ -360,6 +360,65 @@ async def mock_ainvoke(messages): nodes_started = [p["node"] for p in parsed if p["type"] == "node_started"] assert nodes_started == ["step1", "step2"] + @pytest.mark.asyncio + async def test_internal_node_suppressed(self): + """Nodes with internal: true produce no SSE events but still run.""" + from backend.app.services.runners.graph_engine import GraphEngine + + config = { + "nodes": [ + { + "id": "hidden", + "type": "llm", + "internal": True, + "prompt": "List items", + }, + { + "id": "visible", + "type": "llm", + "depends_on": ["hidden"], + "prompt": "Use {outputs.hidden}", + }, + ] + } + + mock_llm = AsyncMock() + call_count = 0 + + async def mock_ainvoke(messages): + nonlocal call_count + call_count += 1 + resp = MagicMock() + resp.content = f"Result {call_count}" + return resp + + mock_llm.ainvoke = mock_ainvoke + + engine = GraphEngine(config=config, llm=mock_llm) + events = [] + async for event in engine.run_streaming({}, "sess"): + events.append(event) + + parsed = [ + json.loads(e[len("data: ") : -2]) for e in events if e.startswith("data: ") + ] + + node_names = [p["node"] for p in parsed if p["type"] == "node_started"] + assert "hidden" not in node_names + assert "visible" in node_names + + responses = [ + p for p in parsed if p["type"] == "response" and p.get("id") == "hidden" + ] + assert len(responses) == 0 + + visible_responses = [ + p for p in parsed if p["type"] == "response" and p.get("id") == "visible" + ] + assert len(visible_responses) == 1 + + assert call_count == 2, "Both nodes should execute even if one is internal" + def test_empty_nodes_raises(self): """GraphEngine raises on empty node list.""" from backend.app.services.runners.graph_engine import GraphEngine From 7b4a346888f0543b8624a7e9c6c01a52003b49cb Mon Sep 17 00:00:00 2001 From: Sid Kattoju Date: Tue, 3 Mar 2026 15:35:44 -0500 Subject: [PATCH 3/5] fix: exclude internal node outputs from downstream LLM context Internal node outputs were still leaking to users because _run_llm_node dumps all prior outputs into the prompt as "Outputs so far". Downstream LLMs (e.g. itinerary_options_task) would see the raw places JSON and echo it in their response. Filter internal node IDs out of the outputs dict before passing it to LLM nodes. MCP tool nodes still receive the full outputs so items_path resolution continues to work. Made-with: Cursor --- backend/app/services/runners/graph_engine.py | 9 ++++++++- tests/unit/test_graph_engine.py | 12 +++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/backend/app/services/runners/graph_engine.py b/backend/app/services/runners/graph_engine.py index 0e8b487f..6e7518e9 100644 --- a/backend/app/services/runners/graph_engine.py +++ b/backend/app/services/runners/graph_engine.py @@ -486,6 +486,9 @@ def __init__(self, config: Dict[str, Any], llm: Any) -> None: self.nodes = nodes self.edges = cfg.get("edges") or [] self.entry_id: Optional[str] = cfg.get("entry") + self._internal_ids: set = { + str(n["id"]) for n in self.nodes if n.get("internal") + } def _build_graph(self): """Build a compiled LangGraph StateGraph from the config.""" @@ -592,6 +595,7 @@ def _make_step_fn(self, step: dict): mcp_servers = self.mcp_cfg.get("servers") or {} mcp_transport = str(self.mcp_cfg.get("transport") or "streamable-http").lower() llm = self.llm + internal_ids = self._internal_ids async def _run(state: GraphState) -> dict: inputs = state.get("inputs") or {} @@ -602,7 +606,10 @@ async def _run(state: GraphState) -> dict: result = "" if step_type in ("llm", "prompt"): - result = await _run_llm_node(llm, step, inputs, outputs) + visible_outputs = { + k: v for k, v in outputs.items() if k not in internal_ids + } + result = await _run_llm_node(llm, step, inputs, visible_outputs) elif step_type in ("mcp_tool", "mcp"): result = await _run_mcp_tool_node( diff --git a/tests/unit/test_graph_engine.py b/tests/unit/test_graph_engine.py index 5cf1ef15..c8ada9e5 100644 --- a/tests/unit/test_graph_engine.py +++ b/tests/unit/test_graph_engine.py @@ -362,7 +362,8 @@ async def mock_ainvoke(messages): @pytest.mark.asyncio async def test_internal_node_suppressed(self): - """Nodes with internal: true produce no SSE events but still run.""" + """Nodes with internal: true produce no SSE events and their output + is excluded from the LLM context of downstream nodes.""" from backend.app.services.runners.graph_engine import GraphEngine config = { @@ -384,10 +385,14 @@ async def test_internal_node_suppressed(self): mock_llm = AsyncMock() call_count = 0 + captured_prompts: list = [] async def mock_ainvoke(messages): nonlocal call_count call_count += 1 + for msg in messages: + if isinstance(msg, dict): + captured_prompts.append(msg.get("content", "")) resp = MagicMock() resp.content = f"Result {call_count}" return resp @@ -419,6 +424,11 @@ async def mock_ainvoke(messages): assert call_count == 2, "Both nodes should execute even if one is internal" + visible_prompt = captured_prompts[1] + assert ( + "hidden" not in visible_prompt + ), "Internal node output should not appear in downstream LLM context" + def test_empty_nodes_raises(self): """GraphEngine raises on empty node list.""" from backend.app.services.runners.graph_engine import GraphEngine From 7e93ae8799856df7ce74c11fcda6425dbeabe010 Mon Sep 17 00:00:00 2001 From: Sid Kattoju Date: Tue, 3 Mar 2026 15:40:44 -0500 Subject: [PATCH 4/5] fix: auto-detect internal nodes from items_path references Nodes that exist solely to produce a list for mcp_tool_map fan-out (referenced via items_path) are now automatically treated as internal even when the graph_config stored in the database lacks the explicit internal: true flag. This fixes the case where agents created before the YAML update still expose intermediate data to users. Also consolidates the internal flag in task output to use the merged _internal_ids set (explicit + auto-detected) instead of only checking the step config. Made-with: Cursor --- backend/app/services/runners/graph_engine.py | 12 ++++- tests/unit/test_graph_engine.py | 52 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/backend/app/services/runners/graph_engine.py b/backend/app/services/runners/graph_engine.py index 6e7518e9..af1730a1 100644 --- a/backend/app/services/runners/graph_engine.py +++ b/backend/app/services/runners/graph_engine.py @@ -489,6 +489,16 @@ def __init__(self, config: Dict[str, Any], llm: Any) -> None: self._internal_ids: set = { str(n["id"]) for n in self.nodes if n.get("internal") } + # Auto-detect: nodes referenced only via items_path in mcp_tool_map + # nodes are intermediate data producers, not user-facing output. + items_source_ids = set() + for n in self.nodes: + if str(n.get("type", "")).strip().lower() in ("mcp_tool_map", "mcp_map"): + ip = str(n.get("items_path", "")) + m = _OUTPUT_REF_RE.search(ip) + if m: + items_source_ids.add(m.group(1)) + self._internal_ids |= items_source_ids def _build_graph(self): """Build a compiled LangGraph StateGraph from the config.""" @@ -635,7 +645,7 @@ async def _run(state: GraphState) -> dict: "name": step_id, "summary": _summarize_output(result), "raw": result, - "internal": bool(step.get("internal", False)), + "internal": step_id in internal_ids, } ], } diff --git a/tests/unit/test_graph_engine.py b/tests/unit/test_graph_engine.py index c8ada9e5..61d1030f 100644 --- a/tests/unit/test_graph_engine.py +++ b/tests/unit/test_graph_engine.py @@ -429,6 +429,58 @@ async def mock_ainvoke(messages): "hidden" not in visible_prompt ), "Internal node output should not appear in downstream LLM context" + @pytest.mark.asyncio + async def test_items_path_source_auto_internal(self): + """Nodes referenced only via items_path are auto-detected as internal, + even without an explicit internal: true flag.""" + from backend.app.services.runners.graph_engine import GraphEngine + + config = { + "nodes": [ + { + "id": "places", + "type": "llm", + "prompt": "List places", + }, + { + "id": "research", + "type": "mcp_tool_map", + "depends_on": ["places"], + "server": "travel", + "tool": "search", + "items_path": "outputs.places", + "query_template": "Research {item}", + }, + ], + "mcp": { + "transport": "streamable-http", + "servers": { + "travel": {"url": "http://localhost:7001/mcp"}, + }, + }, + } + + mock_llm = AsyncMock() + mock_resp = MagicMock() + mock_resp.content = '["Tokyo", "Kyoto"]' + mock_llm.ainvoke.return_value = mock_resp + + engine = GraphEngine(config=config, llm=mock_llm) + assert "places" in engine._internal_ids + + events = [] + async for event in engine.run_streaming({}, "sess"): + events.append(event) + + parsed = [ + json.loads(e[len("data: ") : -2]) for e in events if e.startswith("data: ") + ] + + node_names = [p["node"] for p in parsed if p["type"] == "node_started"] + assert ( + "places" not in node_names + ), "items_path source node should be auto-suppressed" + def test_empty_nodes_raises(self): """GraphEngine raises on empty node list.""" from backend.app.services.runners.graph_engine import GraphEngine From 5eb36d406186da7219a7ab07641b09aa2d792e2a Mon Sep 17 00:00:00 2001 From: Sid Kattoju Date: Tue, 3 Mar 2026 15:48:19 -0500 Subject: [PATCH 5/5] perf: run hotel and flight research in parallel Both tasks only depend on itinerary_options_task, not on each other. Changing flight_research_task to depend on itinerary_options_task instead of hotel_research_task lets them fan out concurrently. Made-with: Cursor --- backend/agent_templates/travel_vacation_planner.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/agent_templates/travel_vacation_planner.yaml b/backend/agent_templates/travel_vacation_planner.yaml index ebdd2e81..33c9a12c 100644 --- a/backend/agent_templates/travel_vacation_planner.yaml +++ b/backend/agent_templates/travel_vacation_planner.yaml @@ -68,7 +68,7 @@ templates: - id: flight_research_task type: mcp_tool depends_on: - - hotel_research_task + - itinerary_options_task server: flight tool: google_flights_search args: