Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions dynamiq/callbacks/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,19 +1086,17 @@ def _try_initialize_next_json_field(self, buf: str, final_answer_only: bool) ->
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
)

def _emit_tool_input_state(self, buf: str) -> None:
"""Emit content for the current TOOL_INPUT state."""
def _emit_tool_input_state(self, buf: str) -> bool:
"""Emit content for the current TOOL_INPUT state. Returns True when complete."""
if self._fc_object_tool_input:
self._emit_json_object_field_content(buf, StreamingState.TOOL_INPUT)
else:
self._emit_json_field_content(buf, StreamingState.TOOL_INPUT)
return self._emit_json_object_field_content(buf, StreamingState.TOOL_INPUT)
return self._emit_json_field_content(buf, StreamingState.TOOL_INPUT)

def _emit_answer_state(self, buf: str) -> None:
"""Emit content for the current ANSWER state."""
def _emit_answer_state(self, buf: str) -> bool:
"""Emit content for the current ANSWER state. Returns True when complete."""
if self._fc_object_answer:
self._emit_json_object_field_content(buf, StreamingState.ANSWER)
else:
self._emit_json_field_content(buf, StreamingState.ANSWER)
return self._emit_json_object_field_content(buf, StreamingState.ANSWER)
return self._emit_json_field_content(buf, StreamingState.ANSWER)

def _process_json_mode(self, final_answer_only: bool) -> None:
"""
Expand Down
136 changes: 131 additions & 5 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import types
from concurrent.futures import as_completed
from typing import Any, Callable, Literal, Mapping
from typing import Any, Callable, Literal, Mapping, Union, get_args, get_origin

from litellm import get_supported_openai_params, supports_function_calling, supports_response_schema
from pydantic import BaseModel, Field, PrivateAttr, field_validator, model_validator
Expand Down Expand Up @@ -39,7 +40,7 @@
StreamingMode,
)
from dynamiq.utils import generate_uuid, serialize_files_in_value
from dynamiq.utils.json_parser import parse_llm_json_output
from dynamiq.utils.json_parser import parse_llm_json_output, repair_truncated_json
from dynamiq.utils.logger import logger


Expand Down Expand Up @@ -79,8 +80,11 @@ def parse_arguments(cls, v: Any) -> Any:
if isinstance(v, str):
try:
return json.loads(v, strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
except json.JSONDecodeError:
try:
return json.loads(repair_truncated_json(v), strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
return v or {}

def parse_as_tool_call(self) -> ToolCallArguments:
Expand Down Expand Up @@ -320,6 +324,106 @@ def _emit_tool_input_error(
)
self._streaming_tool_run_id = None

@staticmethod
def _annotation_accepts_none(annotation: Any) -> bool:
"""Return True if a Pydantic field annotation includes ``NoneType``."""
if annotation is type(None):
return True
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
return type(None) in get_args(annotation)
return False

@staticmethod
def _annotation_is_dict_like(annotation: Any) -> bool:
"""Return True if the annotation is ``dict`` / ``dict[...]`` or a union including one."""
if annotation is dict:
return True
origin = get_origin(annotation)
if origin is dict:
return True
if origin in (Union, types.UnionType):
return any(Agent._annotation_is_dict_like(arg) for arg in get_args(annotation))
return False

@staticmethod
def _extract_basemodel(annotation: Any) -> type[BaseModel] | None:
"""Return the BaseModel subclass in an annotation (handles ``Model | None``), else None."""
if isinstance(annotation, type) and issubclass(annotation, BaseModel):
return annotation
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
for arg in get_args(annotation):
if isinstance(arg, type) and issubclass(arg, BaseModel):
return arg
return None

def _coerce_json_fields(self, tool: Node, action_input: dict) -> dict:
"""Parse stringified free-form dict fields back into dicts.

Strict mode can't express a free-form ``dict[str, Any]`` as an object, so
the schema transforms ship those fields as JSON-encoded strings (see the
provider converters). Here we reverse that: if the tool declares a
dict-typed field and the model supplied a JSON string for it, parse it
back so the tool's Pydantic schema validates the real dict.

Recurses into nested ``BaseModel`` fields so a free-form dict declared on a
sub-model is coerced too (mirrors ``_strip_nulls_for_fields``). Without this
a stringified dict nested inside a sub-model would never be parsed back and
the nested model's validation would reject the ``str``.
"""
self._coerce_json_for_fields(tool.input_schema.model_fields, action_input)
return action_input

def _coerce_json_for_fields(self, fields: Mapping[str, Any], data: Any) -> None:
if not isinstance(data, dict):
return
for name, field in fields.items():
value = data.get(name)
if isinstance(value, str) and self._annotation_is_dict_like(field.annotation):
stripped = value.strip()
if stripped.startswith("{") and stripped.endswith("}"):
try:
data[name] = json.loads(stripped)
except json.JSONDecodeError:
pass # leave as string; Pydantic will surface the error
elif isinstance(value, dict):
nested_model = self._extract_basemodel(field.annotation)
if nested_model is not None:
self._coerce_json_for_fields(nested_model.model_fields, value)

def _strip_protocol_nulls(self, tool: Node, action_input: dict) -> dict:
"""Drop ``None`` values for fields whose Pydantic annotation rejects None.

OpenAI strict mode requires every property in ``required`` and uses
``"null"`` in the type union as the signal for "leave it at the default."
Fields with a non-nullable default (``encoding: str = "utf-8"``) can't
accept that ``None`` directly — so we drop the key, letting the tool's
Pydantic default apply. Fields that genuinely accept ``None``
(``encoding: str | None = None``) keep it.

Recurses into nested ``BaseModel`` fields so the same applies at depth
(e.g. ``config.port`` where ``DBConfig.port: int = 8080``).
"""
self._strip_nulls_for_fields(tool.input_schema.model_fields, action_input)
return action_input

def _strip_nulls_for_fields(self, fields: Mapping[str, Any], data: Any) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let clean/rework 'matreshka' code

if not isinstance(data, dict):
return
for name in list(data):
field = fields.get(name)
if field is None:
continue
value = data[name]
if value is None:
if not self._annotation_accepts_none(field.annotation):
del data[name]
elif isinstance(value, dict):
nested_model = self._extract_basemodel(field.annotation)
if nested_model is not None:
self._strip_nulls_for_fields(nested_model.model_fields, value)

def _should_delegate_final(
self,
tool: Node | None,
Expand Down Expand Up @@ -791,6 +895,7 @@ def _handle_function_calling_mode(
if len(actual_tool_calls) > 1 and self.parallel_tool_calls_enabled:
tool_items = []
for tc in actual_tool_calls:
tc_name = tc.function.name.strip()
args = tc.function.parse_as_tool_call()
tc_input = args.action_input
if isinstance(tc_input, str):
Expand All @@ -800,9 +905,13 @@ def _handle_function_calling_mode(
raise ActionParsingException(f"Error parsing action_input string. {e}", recoverable=True)
if not isinstance(tc_input, dict):
tc_input = {"input": tc_input}
tc_tool = self.tool_by_names.get(self.sanitize_tool_name(tc_name))
if tc_tool is not None:
self._coerce_json_fields(tc_tool, tc_input)
self._strip_protocol_nulls(tc_tool, tc_input)
tool_items.append(
ToolCallItem(
name=tc.function.name.strip(),
name=tc_name,
input=tc_input,
thought=args.thought,
)
Expand All @@ -826,6 +935,11 @@ def _handle_function_calling_mode(
if not isinstance(action_input, dict):
action_input = {"input": action_input}

tool = self.tool_by_names.get(self.sanitize_tool_name(action))
if tool is not None:
self._coerce_json_fields(tool, action_input)
self._strip_protocol_nulls(tool, action_input)

self.log_reasoning(thought, action, action_input, loop_num)
return thought, action, action_input

Expand Down Expand Up @@ -1504,12 +1618,24 @@ def _run_react_llm_step(self, config: RunnableConfig | None, loop_num: int, **kw

try:
native_parallel = self.parallel_tool_calls_enabled and self.inference_mode == InferenceMode.FUNCTION_CALLING
# In FUNCTION_CALLING mode with tools present, force a tool call so
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why force tool call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this is purpose of function calling inference mode for agent. Otherwise it may not call a tool, and there is no point of such iteration then.

# the model cannot bail out with a text-only response. Honour any
# explicit caller override (kwargs / self.llm.tool_choice).
forced_tool_choice = None
if (
self.inference_mode == InferenceMode.FUNCTION_CALLING
and self._tools
and "tool_choice" not in kwargs
and getattr(self.llm, "tool_choice", None) is None
):
forced_tool_choice = "required"
llm_result = self._run_llm(
messages=messages,
tools=self._tools,
response_format=self._response_format,
config=llm_config,
parallel_tool_calls=True if native_parallel else None,
**({"tool_choice": forced_tool_choice} if forced_tool_choice else {}),
Comment thread
maksymbuleshnyi marked this conversation as resolved.
**kwargs,
)
finally:
Expand Down
64 changes: 13 additions & 51 deletions dynamiq/nodes/agents/components/schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

FINAL_ANSWER_FUNCTION_SCHEMA = {
"type": "function",
"strict": True,
"function": {
"name": "provide_final_answer",
"description": "Function should be called when if you can answer the initial request"
Expand All @@ -38,6 +37,7 @@
},
},
"required": ["thought", "answer", "output_files"],
"additionalProperties": False,
},
},
}
Expand Down Expand Up @@ -75,7 +75,6 @@ def build_final_answer_function_schema(response_format: dict | type[BaseModel] |
return FINAL_ANSWER_FUNCTION_SCHEMA

answer_schema = unwrap_response_format(response_format)
strict = _is_strict_compatible(answer_schema)

parameters = {
"type": "object",
Expand All @@ -91,13 +90,11 @@ def build_final_answer_function_schema(response_format: dict | type[BaseModel] |
},
},
"required": ["thought", "answer", "output_files"],
"additionalProperties": False,
}
if strict:
parameters["additionalProperties"] = False

return {
"type": "function",
"strict": strict,
"function": {
"name": "provide_final_answer",
"description": (
Expand Down Expand Up @@ -239,8 +236,8 @@ def _resolve_type_schema(param: Any, _seen: set | None = None) -> dict[str, Any]
``properties`` so the LLM produces correctly structured output.
Generic ``dict`` types become bare ``{"type": "object"}``.

Tools whose schemas contain bare objects automatically get
``strict: false`` via ``_is_strict_compatible``.
Per-provider transforms (in ``BaseLLM`` subclasses) decide whether
Comment thread
cursor[bot] marked this conversation as resolved.
``strict`` is engaged for a given schema and provider.
"""
if param is type(None):
return {"type": "null"}
Expand Down Expand Up @@ -323,29 +320,6 @@ def _basemodel_to_schema(model: type[BaseModel], _seen: set | None = None) -> di
return result


def _is_strict_compatible(schema: Any) -> bool:
"""Return ``False`` if the schema contains an object that OpenAI strict mode
would reject — bare objects without ``properties``, or objects missing
``additionalProperties: False``."""
if not isinstance(schema, dict):
return True
schema_type = schema.get("type")
is_object = schema_type == "object" or (isinstance(schema_type, list) and "object" in schema_type)
if is_object:
if "properties" not in schema:
return False
if schema.get("additionalProperties") is not False:
return False
for value in schema.values():
if isinstance(value, dict) and not _is_strict_compatible(value):
return False
if isinstance(value, list):
for item in value:
if isinstance(item, dict) and not _is_strict_compatible(item):
return False
return True


def _is_nullable(annotation: Any) -> bool:
"""Return True if the annotation is a Union that includes NoneType."""
origin = get_origin(annotation)
Expand Down Expand Up @@ -414,10 +388,10 @@ def generate_function_calling_schemas(
schemas = [build_final_answer_function_schema(response_format)]

for tool in tools:
properties = {}
required_fields = []
input_params = tool.input_schema.model_fields.items()
if list(input_params):
properties: dict[str, Any] = {}
required_fields: list[str] = []
input_params = list(tool.input_schema.model_fields.items())
if input_params:
for name, field in tool.input_schema.model_fields.items():
generate_property_schema(properties, name, field)
if field.is_required() and name in properties:
Expand All @@ -427,25 +401,18 @@ def generate_function_calling_schemas(
properties["delegate_final"] = {
"type": "boolean",
"description": (
"Set to true to return the sub-agent's response verbatim "
"as the parent agent's final output."
"Set to true to return the sub-agent's response verbatim " "as the parent agent's final output."
),
}

has_optional = len(required_fields) < len(properties)
use_strict = _is_strict_compatible(properties) and not has_optional

action_input_schema: dict[str, Any] = {
"type": "object",
"description": "Tool parameters as a JSON object, not a string.",
"properties": properties,
"additionalProperties": False,
}
if use_strict:
action_input_schema["required"] = list(properties.keys())
action_input_schema["additionalProperties"] = False
else:
if required_fields:
action_input_schema["required"] = required_fields
if required_fields:
action_input_schema["required"] = required_fields

schema = {
"type": "function",
Expand All @@ -464,12 +431,8 @@ def generate_function_calling_schemas(
"additionalProperties": False,
"required": ["thought", "action_input"],
},
"strict": use_strict,
},
}

schemas.append(schema)

else:
schema = {
"type": "function",
Expand All @@ -491,10 +454,9 @@ def generate_function_calling_schemas(
"additionalProperties": False,
"required": ["thought", "action_input"],
},
"strict": True,
},
}

schemas.append(schema)
schemas.append(schema)

return schemas
9 changes: 6 additions & 3 deletions dynamiq/nodes/agents/prompts/react/instructions.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@
- Make sure to adhere to AGENT PERSONA & STYLE & ADDITIONAL BEHAVIORAL GUIDELINES.

## Single Action Per Turn
- Execute exactly ONE <action>/<action_input> pair per response, then wait for its Observation before continuing
- Do NOT include multiple action blocks or answer blocks in the same response
- After receiving an Observation, decide the next single action based on the result
- Emit EXACTLY ONE <output>...</output> block per response. Never write a second <output> block.
- A response either takes an action (<action>/<action_input>) OR gives an <answer> — never both in the same response.
- When you write an <action>, STOP immediately after </output>. Do NOT continue, do NOT write an "Observation:",
a tool result, or an <answer> — the Observation is given back to you by the system, not written by you.
- NEVER predict, assume, or fabricate the tool's Observation/result. Wait for the real Observation to be returned,
then decide your next single action (or final answer) based on it in your NEXT response.

## JSON Formatting Requirements
- Put JSON on single line within tags
Expand Down
Loading
Loading