Skip to content
Open
99 changes: 90 additions & 9 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import types
from concurrent.futures import as_completed
from typing import Any, Callable, Literal, Mapping
from typing import Any, Callable, Literal, Mapping, Union, get_args, get_origin

from litellm import get_supported_openai_params, supports_function_calling, supports_response_schema
from pydantic import BaseModel, Field, PrivateAttr, field_validator, model_validator
Expand Down Expand Up @@ -39,7 +40,7 @@
StreamingMode,
)
from dynamiq.utils import generate_uuid, serialize_files_in_value
from dynamiq.utils.json_parser import parse_llm_json_output
from dynamiq.utils.json_parser import parse_llm_json_output, repair_truncated_json
from dynamiq.utils.logger import logger


Expand Down Expand Up @@ -79,18 +80,23 @@ def parse_arguments(cls, v: Any) -> Any:
if isinstance(v, str):
try:
return json.loads(v, strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
except json.JSONDecodeError:
try:
return json.loads(repair_truncated_json(v), strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
return v or {}

def parse_as_tool_call(self) -> ToolCallArguments:
args = dict(self.arguments)
thought = args.pop("thought", "")
try:
return ToolCallArguments.model_validate(self.arguments)
return ToolCallArguments(thought=thought, action_input=args)
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated
except Exception:
raise ActionParsingException(
"Your tool call is missing required fields. "
"Every tool call must include 'thought' (your reasoning) "
"and 'action_input' (the tool parameters as an object).",
"and the tool's parameters at the top level.",
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated
recoverable=True,
)

Expand Down Expand Up @@ -320,6 +326,38 @@ def _emit_tool_input_error(
)
self._streaming_tool_run_id = None

@staticmethod
def _annotation_accepts_none(annotation: Any) -> bool:
"""Return True if a Pydantic field annotation includes ``NoneType``."""
if annotation is type(None):
return True
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
return type(None) in get_args(annotation)
return False

def _strip_protocol_nulls(self, tool: Node, action_input: dict) -> dict:
"""Drop ``None`` values for fields whose Pydantic annotation rejects None.

OpenAI strict mode requires every property in ``required`` and uses
``"null"`` in the type union as the documented signal for "not specified."
Tools with non-nullable defaults (``encoding: str = "utf-8"``) can't
accept that ``None`` directly — but their Pydantic default should kick
in if the key is absent. Drop those keys so validation/execution use
defaults instead of failing.

Nullable fields (``encoding: str | None = None``) keep their None — the
tool genuinely accepts it.
"""
fields = tool.input_schema.model_fields
for name in list(action_input):
if action_input[name] is not None:
continue
field = fields.get(name)
if field is not None and not self._annotation_accepts_none(field.annotation):
del action_input[name]
return action_input

def _should_delegate_final(
self,
tool: Node | None,
Expand Down Expand Up @@ -791,6 +829,7 @@ def _handle_function_calling_mode(
if len(actual_tool_calls) > 1 and self.parallel_tool_calls_enabled:
tool_items = []
for tc in actual_tool_calls:
tc_name = tc.function.name.strip()
args = tc.function.parse_as_tool_call()
tc_input = args.action_input
if isinstance(tc_input, str):
Expand All @@ -800,9 +839,19 @@ def _handle_function_calling_mode(
raise ActionParsingException(f"Error parsing action_input string. {e}", recoverable=True)
if not isinstance(tc_input, dict):
tc_input = {"input": tc_input}
tc_tool = self.tool_by_names.get(self.sanitize_tool_name(tc_name))
if tc_tool is not None:
self._strip_protocol_nulls(tc_tool, tc_input)
try:
tc_tool.input_schema.model_validate(tc_input)
except Exception as e:
raise ActionParsingException(
f"Tool call for '{tc_name}' has invalid arguments: {e}",
recoverable=True,
)
tool_items.append(
ToolCallItem(
name=tc.function.name.strip(),
name=tc_name,
input=tc_input,
thought=args.thought,
)
Expand All @@ -826,6 +875,17 @@ def _handle_function_calling_mode(
if not isinstance(action_input, dict):
action_input = {"input": action_input}

tool = self.tool_by_names.get(self.sanitize_tool_name(action))
if tool is not None:
self._strip_protocol_nulls(tool, action_input)
try:
tool.input_schema.model_validate(action_input)
except Exception as e:
raise ActionParsingException(
f"Tool call for '{action}' has invalid arguments: {e}",
recoverable=True,
)

self.log_reasoning(thought, action, action_input, loop_num)
return thought, action, action_input

Expand Down Expand Up @@ -866,8 +926,14 @@ def _handle_structured_output_mode(
self._requested_output_files = self._parse_output_files_csv(
llm_generated_output_json.get("output_files") or ""
)
self.log_final_output(thought, action_input, loop_num)
return thought, "final_answer", action_input
# action_input is now an object (per schema); the final answer lives
# under the ``answer`` key. Fall back to the raw value for backward
# compatibility with older models that still emit a plain string.
final_answer: Any = action_input
if isinstance(action_input, dict) and "answer" in action_input:
final_answer = action_input["answer"]
self.log_final_output(thought, final_answer, loop_num)
return thought, "final_answer", final_answer
Comment thread
maksymbuleshnyi marked this conversation as resolved.

try:
if isinstance(action_input, str):
Expand Down Expand Up @@ -1504,14 +1570,29 @@ def _run_react_llm_step(self, config: RunnableConfig | None, loop_num: int, **kw

try:
native_parallel = self.parallel_tool_calls_enabled and self.inference_mode == InferenceMode.FUNCTION_CALLING
# In FUNCTION_CALLING mode with tools present, force a tool call so
# the model cannot bail out with a text-only response. Honour any
# explicit caller override (kwargs / self.llm.tool_choice).
forced_tool_choice = None
if (
self.inference_mode == InferenceMode.FUNCTION_CALLING
and self._tools
and "tool_choice" not in kwargs
and getattr(self.llm, "tool_choice", None) is None
):
forced_tool_choice = "required"
llm_result = self._run_llm(
messages=messages,
tools=self._tools,
response_format=self._response_format,
config=llm_config,
parallel_tool_calls=True if native_parallel else None,
**({"tool_choice": forced_tool_choice} if forced_tool_choice else {}),
Comment thread
maksymbuleshnyi marked this conversation as resolved.
**kwargs,
)
# DEBUG: raw LLM response right after the call, before any parsing
print(f"\n[LLM RESPONSE loop={loop_num}] mode={self.inference_mode}")
print(f"[LLM RESPONSE loop={loop_num}] output={llm_result.output!r}\n")
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated
finally:
if not original_streaming_enabled:
try:
Expand Down
Loading
Loading