Skip to content
Open
86 changes: 79 additions & 7 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import types
from concurrent.futures import as_completed
from typing import Any, Callable, Literal, Mapping
from typing import Any, Callable, Literal, Mapping, Union, get_args, get_origin

from litellm import get_supported_openai_params, supports_function_calling, supports_response_schema
from pydantic import BaseModel, Field, PrivateAttr, field_validator, model_validator
Expand Down Expand Up @@ -39,7 +40,7 @@
StreamingMode,
)
from dynamiq.utils import generate_uuid, serialize_files_in_value
from dynamiq.utils.json_parser import parse_llm_json_output
from dynamiq.utils.json_parser import parse_llm_json_output, repair_truncated_json
from dynamiq.utils.logger import logger


Expand Down Expand Up @@ -79,18 +80,23 @@ def parse_arguments(cls, v: Any) -> Any:
if isinstance(v, str):
try:
return json.loads(v, strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
except json.JSONDecodeError:
try:
return json.loads(repair_truncated_json(v), strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
return v or {}

def parse_as_tool_call(self) -> ToolCallArguments:
args = dict(self.arguments)
thought = args.pop("thought", "")
try:
return ToolCallArguments.model_validate(self.arguments)
return ToolCallArguments(thought=thought, action_input=args)
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated
except Exception:
raise ActionParsingException(
"Your tool call is missing required fields. "
"Every tool call must include 'thought' (your reasoning) "
"and 'action_input' (the tool parameters as an object).",
"and the tool's parameters at the top level.",
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated
recoverable=True,
)

Expand Down Expand Up @@ -320,6 +326,38 @@ def _emit_tool_input_error(
)
self._streaming_tool_run_id = None

@staticmethod
def _annotation_accepts_none(annotation: Any) -> bool:
"""Return True if a Pydantic field annotation includes ``NoneType``."""
if annotation is type(None):
return True
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
return type(None) in get_args(annotation)
return False

def _strip_protocol_nulls(self, tool: Node, action_input: dict) -> dict:
"""Drop ``None`` values for fields whose Pydantic annotation rejects None.

OpenAI strict mode requires every property in ``required`` and uses
``"null"`` in the type union as the documented signal for "not specified."
Tools with non-nullable defaults (``encoding: str = "utf-8"``) can't
accept that ``None`` directly — but their Pydantic default should kick
in if the key is absent. Drop those keys so validation/execution use
defaults instead of failing.

Nullable fields (``encoding: str | None = None``) keep their None — the
tool genuinely accepts it.
"""
fields = tool.input_schema.model_fields
for name in list(action_input):
if action_input[name] is not None:
continue
field = fields.get(name)
if field is not None and not self._annotation_accepts_none(field.annotation):
del action_input[name]
return action_input

def _should_delegate_final(
self,
tool: Node | None,
Expand Down Expand Up @@ -791,6 +829,7 @@ def _handle_function_calling_mode(
if len(actual_tool_calls) > 1 and self.parallel_tool_calls_enabled:
tool_items = []
for tc in actual_tool_calls:
tc_name = tc.function.name.strip()
args = tc.function.parse_as_tool_call()
tc_input = args.action_input
if isinstance(tc_input, str):
Expand All @@ -800,9 +839,19 @@ def _handle_function_calling_mode(
raise ActionParsingException(f"Error parsing action_input string. {e}", recoverable=True)
if not isinstance(tc_input, dict):
tc_input = {"input": tc_input}
tc_tool = self.tool_by_names.get(self.sanitize_tool_name(tc_name))
if tc_tool is not None:
self._strip_protocol_nulls(tc_tool, tc_input)
try:
tc_tool.input_schema.model_validate(tc_input)
except Exception as e:
raise ActionParsingException(
f"Tool call for '{tc_name}' has invalid arguments: {e}",
recoverable=True,
)
tool_items.append(
ToolCallItem(
name=tc.function.name.strip(),
name=tc_name,
input=tc_input,
thought=args.thought,
)
Expand All @@ -826,6 +875,17 @@ def _handle_function_calling_mode(
if not isinstance(action_input, dict):
action_input = {"input": action_input}

tool = self.tool_by_names.get(self.sanitize_tool_name(action))
if tool is not None:
self._strip_protocol_nulls(tool, action_input)
try:
tool.input_schema.model_validate(action_input)
except Exception as e:
raise ActionParsingException(
f"Tool call for '{action}' has invalid arguments: {e}",
recoverable=True,
)

self.log_reasoning(thought, action, action_input, loop_num)
return thought, action, action_input

Expand Down Expand Up @@ -1504,12 +1564,24 @@ def _run_react_llm_step(self, config: RunnableConfig | None, loop_num: int, **kw

try:
native_parallel = self.parallel_tool_calls_enabled and self.inference_mode == InferenceMode.FUNCTION_CALLING
# In FUNCTION_CALLING mode with tools present, force a tool call so
# the model cannot bail out with a text-only response. Honour any
# explicit caller override (kwargs / self.llm.tool_choice).
forced_tool_choice = None
if (
self.inference_mode == InferenceMode.FUNCTION_CALLING
and self._tools
and "tool_choice" not in kwargs
and getattr(self.llm, "tool_choice", None) is None
):
forced_tool_choice = "required"
llm_result = self._run_llm(
messages=messages,
tools=self._tools,
response_format=self._response_format,
config=llm_config,
parallel_tool_calls=True if native_parallel else None,
**({"tool_choice": forced_tool_choice} if forced_tool_choice else {}),
Comment thread
maksymbuleshnyi marked this conversation as resolved.
**kwargs,
)
finally:
Expand Down
146 changes: 38 additions & 108 deletions dynamiq/nodes/agents/components/schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

FINAL_ANSWER_FUNCTION_SCHEMA = {
"type": "function",
"strict": True,
"function": {
"name": "provide_final_answer",
"description": "Function should be called when if you can answer the initial request"
Expand All @@ -38,6 +37,7 @@
},
},
"required": ["thought", "answer", "output_files"],
"additionalProperties": False,
},
},
}
Expand Down Expand Up @@ -75,7 +75,6 @@ def build_final_answer_function_schema(response_format: dict | type[BaseModel] |
return FINAL_ANSWER_FUNCTION_SCHEMA

answer_schema = unwrap_response_format(response_format)
strict = _is_strict_compatible(answer_schema)

parameters = {
"type": "object",
Expand All @@ -91,13 +90,11 @@ def build_final_answer_function_schema(response_format: dict | type[BaseModel] |
},
},
"required": ["thought", "answer", "output_files"],
"additionalProperties": False,
}
if strict:
parameters["additionalProperties"] = False

return {
"type": "function",
"strict": strict,
"function": {
"name": "provide_final_answer",
"description": (
Expand Down Expand Up @@ -239,8 +236,8 @@ def _resolve_type_schema(param: Any, _seen: set | None = None) -> dict[str, Any]
``properties`` so the LLM produces correctly structured output.
Generic ``dict`` types become bare ``{"type": "object"}``.

Tools whose schemas contain bare objects automatically get
``strict: false`` via ``_is_strict_compatible``.
Per-provider transforms (in ``BaseLLM`` subclasses) decide whether
``strict`` is engaged for a given schema and provider.
"""
if param is type(None):
return {"type": "null"}
Expand Down Expand Up @@ -323,29 +320,6 @@ def _basemodel_to_schema(model: type[BaseModel], _seen: set | None = None) -> di
return result


def _is_strict_compatible(schema: Any) -> bool:
"""Return ``False`` if the schema contains an object that OpenAI strict mode
would reject — bare objects without ``properties``, or objects missing
``additionalProperties: False``."""
if not isinstance(schema, dict):
return True
schema_type = schema.get("type")
is_object = schema_type == "object" or (isinstance(schema_type, list) and "object" in schema_type)
if is_object:
if "properties" not in schema:
return False
if schema.get("additionalProperties") is not False:
return False
for value in schema.values():
if isinstance(value, dict) and not _is_strict_compatible(value):
return False
if isinstance(value, list):
for item in value:
if isinstance(item, dict) and not _is_strict_compatible(item):
return False
return True


def _is_nullable(annotation: Any) -> bool:
"""Return True if the annotation is a Union that includes NoneType."""
origin = get_origin(annotation)
Expand Down Expand Up @@ -414,87 +388,43 @@ def generate_function_calling_schemas(
schemas = [build_final_answer_function_schema(response_format)]

for tool in tools:
properties = {}
required_fields = []
input_params = tool.input_schema.model_fields.items()
if list(input_params):
for name, field in tool.input_schema.model_fields.items():
generate_property_schema(properties, name, field)
if field.is_required() and name in properties:
required_fields.append(name)

if isinstance(tool, SubAgentTool) and delegation_allowed:
properties["delegate_final"] = {
"type": "boolean",
"description": (
"Set to true to return the sub-agent's response verbatim "
"as the parent agent's final output."
),
}

has_optional = len(required_fields) < len(properties)
use_strict = _is_strict_compatible(properties) and not has_optional

action_input_schema: dict[str, Any] = {
"type": "object",
"description": "Tool parameters as a JSON object, not a string.",
"properties": properties,
properties: dict[str, Any] = {}
required_fields: list[str] = []
for name, field in tool.input_schema.model_fields.items():
generate_property_schema(properties, name, field)
if field.is_required() and name in properties:
required_fields.append(name)

if isinstance(tool, SubAgentTool) and delegation_allowed:
properties["delegate_final"] = {
"type": "boolean",
"description": (
"Set to true to return the sub-agent's response verbatim " "as the parent agent's final output."
),
}
if use_strict:
action_input_schema["required"] = list(properties.keys())
action_input_schema["additionalProperties"] = False
else:
if required_fields:
action_input_schema["required"] = required_fields

schema = {
"type": "function",
"function": {
"name": sanitize_tool_name(tool.name),
"description": tool.description[:1024],
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Your reasoning about using this tool.",
},
"action_input": action_input_schema,
},
"additionalProperties": False,
"required": ["thought", "action_input"],
},
"strict": use_strict,
},
}

schemas.append(schema)

else:
schema = {
"type": "function",
"function": {
"name": sanitize_tool_name(tool.name),
"description": tool.description[:1024],
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Your reasoning about using this tool.",
},
"action_input": {
"type": "string",
"description": "Input for the selected tool in JSON string format.",
},
},
"additionalProperties": False,
"required": ["thought", "action_input"],
},
"strict": True,
flat_properties: dict[str, Any] = {
"thought": {
"type": "string",
"description": "Your reasoning about using this tool.",
},
}
flat_properties.update(properties)

schema = {
"type": "function",
"function": {
"name": sanitize_tool_name(tool.name),
"description": tool.description[:1024],
"parameters": {
"type": "object",
"properties": flat_properties,
"required": ["thought", *required_fields],
"additionalProperties": False,
},
}
},
}

schemas.append(schema)
schemas.append(schema)

return schemas
Loading
Loading