Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 140 additions & 7 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import types
from concurrent.futures import as_completed
from typing import Any, Callable, Literal, Mapping
from typing import Any, Callable, Literal, Mapping, Union, get_args, get_origin

from litellm import get_supported_openai_params, supports_function_calling, supports_response_schema
from pydantic import BaseModel, Field, PrivateAttr, field_validator, model_validator
Expand Down Expand Up @@ -39,7 +40,7 @@
StreamingMode,
)
from dynamiq.utils import generate_uuid, serialize_files_in_value
from dynamiq.utils.json_parser import parse_llm_json_output
from dynamiq.utils.json_parser import parse_llm_json_output, repair_truncated_json
from dynamiq.utils.logger import logger


Expand Down Expand Up @@ -79,8 +80,11 @@ def parse_arguments(cls, v: Any) -> Any:
if isinstance(v, str):
try:
return json.loads(v, strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
except json.JSONDecodeError:
try:
return json.loads(repair_truncated_json(v), strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
return v or {}

def parse_as_tool_call(self) -> ToolCallArguments:
Expand Down Expand Up @@ -320,6 +324,93 @@ def _emit_tool_input_error(
)
self._streaming_tool_run_id = None

@staticmethod
def _annotation_accepts_none(annotation: Any) -> bool:
"""Return True if a Pydantic field annotation includes ``NoneType``."""
if annotation is type(None):
return True
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
return type(None) in get_args(annotation)
return False

@staticmethod
def _annotation_is_dict_like(annotation: Any) -> bool:
"""Return True if the annotation is ``dict`` / ``dict[...]`` or a union including one."""
if annotation is dict:
return True
origin = get_origin(annotation)
if origin is dict:
return True
if origin in (Union, types.UnionType):
return any(Agent._annotation_is_dict_like(arg) for arg in get_args(annotation))
return False

@staticmethod
def _extract_basemodel(annotation: Any) -> type[BaseModel] | None:
"""Return the BaseModel subclass in an annotation (handles ``Model | None``), else None."""
if isinstance(annotation, type) and issubclass(annotation, BaseModel):
return annotation
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
for arg in get_args(annotation):
if isinstance(arg, type) and issubclass(arg, BaseModel):
return arg
return None

def _coerce_json_fields(self, tool: Node, action_input: dict) -> dict:
"""Parse stringified free-form dict fields back into dicts.

Strict mode can't express a free-form ``dict[str, Any]`` as an object, so
the schema transforms ship those fields as JSON-encoded strings (see the
provider converters). Here we reverse that: if the tool declares a
dict-typed field and the model supplied a JSON string for it, parse it
back so the tool's Pydantic schema validates the real dict.
"""
fields = tool.input_schema.model_fields
for name, field in fields.items():
value = action_input.get(name)
if isinstance(value, str) and self._annotation_is_dict_like(field.annotation):
stripped = value.strip()
if stripped.startswith("{") and stripped.endswith("}"):
try:
action_input[name] = json.loads(stripped)
except json.JSONDecodeError:
pass # leave as string; Pydantic will surface the error
return action_input
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

def _strip_protocol_nulls(self, tool: Node, action_input: dict) -> dict:
"""Drop ``None`` values for fields whose Pydantic annotation rejects None.

OpenAI strict mode requires every property in ``required`` and uses
``"null"`` in the type union as the signal for "leave it at the default."
Fields with a non-nullable default (``encoding: str = "utf-8"``) can't
accept that ``None`` directly — so we drop the key, letting the tool's
Pydantic default apply. Fields that genuinely accept ``None``
(``encoding: str | None = None``) keep it.

Recurses into nested ``BaseModel`` fields so the same applies at depth
(e.g. ``config.port`` where ``DBConfig.port: int = 8080``).
"""
self._strip_nulls_for_fields(tool.input_schema.model_fields, action_input)
return action_input

def _strip_nulls_for_fields(self, fields: Mapping[str, Any], data: Any) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let clean/rework 'matreshka' code

if not isinstance(data, dict):
return
for name in list(data):
field = fields.get(name)
if field is None:
continue
value = data[name]
if value is None:
if not self._annotation_accepts_none(field.annotation):
del data[name]
elif isinstance(value, dict):
nested_model = self._extract_basemodel(field.annotation)
if nested_model is not None:
self._strip_nulls_for_fields(nested_model.model_fields, value)

def _should_delegate_final(
self,
tool: Node | None,
Expand Down Expand Up @@ -791,6 +882,7 @@ def _handle_function_calling_mode(
if len(actual_tool_calls) > 1 and self.parallel_tool_calls_enabled:
tool_items = []
for tc in actual_tool_calls:
tc_name = tc.function.name.strip()
args = tc.function.parse_as_tool_call()
tc_input = args.action_input
if isinstance(tc_input, str):
Expand All @@ -800,9 +892,20 @@ def _handle_function_calling_mode(
raise ActionParsingException(f"Error parsing action_input string. {e}", recoverable=True)
if not isinstance(tc_input, dict):
tc_input = {"input": tc_input}
tc_tool = self.tool_by_names.get(self.sanitize_tool_name(tc_name))
if tc_tool is not None:
self._coerce_json_fields(tc_tool, tc_input)
self._strip_protocol_nulls(tc_tool, tc_input)
try:
tc_tool.input_schema.model_validate(tc_input)
except Exception as e:
raise ActionParsingException(
f"Tool call for '{tc_name}' has invalid arguments: {e}",
recoverable=True,
)
tool_items.append(
ToolCallItem(
name=tc.function.name.strip(),
name=tc_name,
input=tc_input,
thought=args.thought,
)
Expand All @@ -826,6 +929,18 @@ def _handle_function_calling_mode(
if not isinstance(action_input, dict):
action_input = {"input": action_input}

tool = self.tool_by_names.get(self.sanitize_tool_name(action))
if tool is not None:
self._coerce_json_fields(tool, action_input)
self._strip_protocol_nulls(tool, action_input)
try:
tool.input_schema.model_validate(action_input)
except Exception as e:
raise ActionParsingException(
f"Tool call for '{action}' has invalid arguments: {e}",
recoverable=True,
)

self.log_reasoning(thought, action, action_input, loop_num)
return thought, action, action_input

Expand Down Expand Up @@ -866,8 +981,14 @@ def _handle_structured_output_mode(
self._requested_output_files = self._parse_output_files_csv(
llm_generated_output_json.get("output_files") or ""
)
self.log_final_output(thought, action_input, loop_num)
return thought, "final_answer", action_input
# action_input is now an object (per schema); the final answer lives
# under the ``answer`` key. Fall back to the raw value for backward
# compatibility with older models that still emit a plain string.
final_answer: Any = action_input
if isinstance(action_input, dict) and "answer" in action_input:
final_answer = action_input["answer"]
self.log_final_output(thought, final_answer, loop_num)
return thought, "final_answer", final_answer
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated

try:
if isinstance(action_input, str):
Expand Down Expand Up @@ -1504,12 +1625,24 @@ def _run_react_llm_step(self, config: RunnableConfig | None, loop_num: int, **kw

try:
native_parallel = self.parallel_tool_calls_enabled and self.inference_mode == InferenceMode.FUNCTION_CALLING
# In FUNCTION_CALLING mode with tools present, force a tool call so
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why force tool call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this is purpose of function calling inference mode for agent. Otherwise it may not call a tool, and there is no point of such iteration then.

# the model cannot bail out with a text-only response. Honour any
# explicit caller override (kwargs / self.llm.tool_choice).
forced_tool_choice = None
if (
self.inference_mode == InferenceMode.FUNCTION_CALLING
and self._tools
and "tool_choice" not in kwargs
and getattr(self.llm, "tool_choice", None) is None
):
forced_tool_choice = "required"
llm_result = self._run_llm(
messages=messages,
tools=self._tools,
response_format=self._response_format,
config=llm_config,
parallel_tool_calls=True if native_parallel else None,
**({"tool_choice": forced_tool_choice} if forced_tool_choice else {}),
Comment thread
maksymbuleshnyi marked this conversation as resolved.
**kwargs,
)
finally:
Expand Down
73 changes: 19 additions & 54 deletions dynamiq/nodes/agents/components/schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

FINAL_ANSWER_FUNCTION_SCHEMA = {
"type": "function",
"strict": True,
"function": {
"name": "provide_final_answer",
"description": "Function should be called when if you can answer the initial request"
Expand All @@ -38,6 +37,7 @@
},
},
"required": ["thought", "answer", "output_files"],
"additionalProperties": False,
},
},
}
Expand Down Expand Up @@ -75,7 +75,6 @@ def build_final_answer_function_schema(response_format: dict | type[BaseModel] |
return FINAL_ANSWER_FUNCTION_SCHEMA

answer_schema = unwrap_response_format(response_format)
strict = _is_strict_compatible(answer_schema)

parameters = {
"type": "object",
Expand All @@ -91,13 +90,11 @@ def build_final_answer_function_schema(response_format: dict | type[BaseModel] |
},
},
"required": ["thought", "answer", "output_files"],
"additionalProperties": False,
}
if strict:
parameters["additionalProperties"] = False

return {
"type": "function",
"strict": strict,
"function": {
"name": "provide_final_answer",
"description": (
Expand Down Expand Up @@ -172,7 +169,10 @@ def generate_structured_output_schemas(
"""
tool_names = [sanitize_tool_name(tool.name) for tool in tools]

action_input_description = "Input for chosen action."
action_input_description = (
"Input for the chosen action, as a JSON object whose keys match the tool's parameters. "
"For `finish`, set this to an object with a single `answer` key containing the final answer string."
)

if delegation_allowed and any(isinstance(tool, SubAgentTool) for tool in tools):
action_input_description += (
Expand All @@ -184,7 +184,6 @@ def generate_structured_output_schemas(
"type": "json_schema",
"json_schema": {
"name": "plan_next_action",
"strict": True,
"schema": {
"type": "object",
"required": ["thought", "action", "action_input", "output_files"],
Expand All @@ -198,8 +197,9 @@ def generate_structured_output_schemas(
"description": f"Next action to make (choose from [{tool_names}, finish]).",
},
"action_input": {
"type": "string",
"type": "object",
"description": action_input_description,
"additionalProperties": True,
},
"output_files": {
"type": "string",
Expand Down Expand Up @@ -239,8 +239,8 @@ def _resolve_type_schema(param: Any, _seen: set | None = None) -> dict[str, Any]
``properties`` so the LLM produces correctly structured output.
Generic ``dict`` types become bare ``{"type": "object"}``.

Tools whose schemas contain bare objects automatically get
``strict: false`` via ``_is_strict_compatible``.
Per-provider transforms (in ``BaseLLM`` subclasses) decide whether
Comment thread
cursor[bot] marked this conversation as resolved.
``strict`` is engaged for a given schema and provider.
"""
if param is type(None):
return {"type": "null"}
Expand Down Expand Up @@ -323,29 +323,6 @@ def _basemodel_to_schema(model: type[BaseModel], _seen: set | None = None) -> di
return result


def _is_strict_compatible(schema: Any) -> bool:
"""Return ``False`` if the schema contains an object that OpenAI strict mode
would reject — bare objects without ``properties``, or objects missing
``additionalProperties: False``."""
if not isinstance(schema, dict):
return True
schema_type = schema.get("type")
is_object = schema_type == "object" or (isinstance(schema_type, list) and "object" in schema_type)
if is_object:
if "properties" not in schema:
return False
if schema.get("additionalProperties") is not False:
return False
for value in schema.values():
if isinstance(value, dict) and not _is_strict_compatible(value):
return False
if isinstance(value, list):
for item in value:
if isinstance(item, dict) and not _is_strict_compatible(item):
return False
return True


def _is_nullable(annotation: Any) -> bool:
"""Return True if the annotation is a Union that includes NoneType."""
origin = get_origin(annotation)
Expand Down Expand Up @@ -414,10 +391,10 @@ def generate_function_calling_schemas(
schemas = [build_final_answer_function_schema(response_format)]

for tool in tools:
properties = {}
required_fields = []
input_params = tool.input_schema.model_fields.items()
if list(input_params):
properties: dict[str, Any] = {}
required_fields: list[str] = []
input_params = list(tool.input_schema.model_fields.items())
if input_params:
for name, field in tool.input_schema.model_fields.items():
generate_property_schema(properties, name, field)
if field.is_required() and name in properties:
Expand All @@ -427,25 +404,18 @@ def generate_function_calling_schemas(
properties["delegate_final"] = {
"type": "boolean",
"description": (
"Set to true to return the sub-agent's response verbatim "
"as the parent agent's final output."
"Set to true to return the sub-agent's response verbatim " "as the parent agent's final output."
),
}

has_optional = len(required_fields) < len(properties)
use_strict = _is_strict_compatible(properties) and not has_optional

action_input_schema: dict[str, Any] = {
"type": "object",
"description": "Tool parameters as a JSON object, not a string.",
"properties": properties,
"additionalProperties": False,
}
if use_strict:
action_input_schema["required"] = list(properties.keys())
action_input_schema["additionalProperties"] = False
else:
if required_fields:
action_input_schema["required"] = required_fields
if required_fields:
action_input_schema["required"] = required_fields

schema = {
"type": "function",
Expand All @@ -464,12 +434,8 @@ def generate_function_calling_schemas(
"additionalProperties": False,
"required": ["thought", "action_input"],
},
"strict": use_strict,
},
}

schemas.append(schema)

else:
schema = {
"type": "function",
Expand All @@ -491,10 +457,9 @@ def generate_function_calling_schemas(
"additionalProperties": False,
"required": ["thought", "action_input"],
},
"strict": True,
},
}

schemas.append(schema)
schemas.append(schema)

return schemas
Loading
Loading