Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions dynamiq/callbacks/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,58 +850,32 @@ def _process_structured_output_mode(self, final_answer_only: bool) -> None:
action_value = buf[v_start + 1 : end_quote]
if action_value.strip().lower() == "finish":
self._answer_started = True
if self._current_state is None:
action_input_start = self._find_field_string_value_start(
buf, JSONStreamingField.ACTION_INPUT.value, end_quote + 1
)
if action_input_start != -1:
self._current_state = StreamingState.ANSWER
self._state_start_index = action_input_start
self._state_last_emit_index = max(
self._state_last_emit_index, action_input_start
)
else:
self._tool_input_started = True
self._current_action_name = action_value.strip()
self.agent._streaming_tool_run_id = generate_uuid()
if self._current_state is None:
action_input_start = self._find_field_string_value_start(
buf, JSONStreamingField.ACTION_INPUT.value, end_quote + 1
)
if action_input_start != -1:
self._current_state = StreamingState.TOOL_INPUT
self._state_start_index = action_input_start
self._state_last_emit_index = max(
self._state_last_emit_index, action_input_start
)

if not self._state_has_emitted.get(StreamingState.REASONING, False):
self._initialize_json_field_state(
buf, JSONStreamingField.THOUGHT.value, StreamingState.REASONING, final_answer_only
)

if self._answer_started:
self._initialize_json_field_state(buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.ANSWER)

if self._tool_input_started and not self._answer_started:
self._initialize_json_field_state(buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT)
# Initialize the action field (TOOL_INPUT for a tool call, ANSWER for finish).
# action_input is a JSON object in the current schema, so this falls back from the
# string form to the brace-delimited object form (see _so_initialize_action_field).
self._so_initialize_action_field(buf)

if self._current_state == StreamingState.REASONING:
if self._emit_json_field_content(buf, StreamingState.REASONING):
# Reasoning completed — immediately try to initialize ANSWER/TOOL_INPUT
# in the same call, in case this is the last chunk.
if self._answer_started:
self._initialize_json_field_state(buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.ANSWER)
elif self._tool_input_started:
self._initialize_json_field_state(
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
)
# Reasoning completed — try to initialize ANSWER/TOOL_INPUT in the same
# call, in case this is the last chunk.
self._so_initialize_action_field(buf)

if self._current_state == StreamingState.ANSWER:
if self._emit_json_field_content(buf, StreamingState.ANSWER):
if self._emit_answer_state(buf):
self._so_action_emitted = True
elif self._current_state == StreamingState.TOOL_INPUT:
if self._emit_json_field_content(buf, StreamingState.TOOL_INPUT):
if self._emit_tool_input_state(buf):
self._so_action_emitted = True

def _process_function_calling_mode(self, final_answer_only: bool) -> None:
Expand Down Expand Up @@ -1086,19 +1060,48 @@ def _try_initialize_next_json_field(self, buf: str, final_answer_only: bool) ->
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
)

def _emit_tool_input_state(self, buf: str) -> None:
"""Emit content for the current TOOL_INPUT state."""
def _emit_tool_input_state(self, buf: str) -> bool:
"""Emit content for the current TOOL_INPUT state. Returns True when complete."""
if self._fc_object_tool_input:
self._emit_json_object_field_content(buf, StreamingState.TOOL_INPUT)
else:
self._emit_json_field_content(buf, StreamingState.TOOL_INPUT)
return self._emit_json_object_field_content(buf, StreamingState.TOOL_INPUT)
return self._emit_json_field_content(buf, StreamingState.TOOL_INPUT)

def _emit_answer_state(self, buf: str) -> None:
"""Emit content for the current ANSWER state."""
def _emit_answer_state(self, buf: str) -> bool:
"""Emit content for the current ANSWER state. Returns True when complete."""
if self._fc_object_answer:
self._emit_json_object_field_content(buf, StreamingState.ANSWER)
else:
self._emit_json_field_content(buf, StreamingState.ANSWER)
return self._emit_json_object_field_content(buf, StreamingState.ANSWER)
return self._emit_json_field_content(buf, StreamingState.ANSWER)

def _so_initialize_action_field(self, buf: str) -> None:
"""Initialize the ANSWER/TOOL_INPUT streaming state for structured output.

``action_input`` is a JSON object in the current schema (tool args, or
``{"answer": ...}`` for finish), though older models may still emit a plain
string.

Answer path: the final answer lives in the nested ``answer`` string (or, for
older models, ``action_input`` emitted as a plain string). We deliberately do
NOT fall back to the brace-delimited ``action_input`` object here. Object mode
could win a chunk-boundary race — locking in before the nested ``answer`` key
has streamed into the buffer — and would then emit the raw ``{"answer": ...}``
wrapper instead of just the answer text, diverging from the non-streaming
output which extracts ``action_input["answer"]``.

Tool-input path: ``action_input`` is genuinely an object, so the object form
is the correct fallback.
"""
if self._current_state is not None:
return
if self._answer_started:
if self._initialize_json_field_state(buf, JSONStreamingField.ANSWER.value, StreamingState.ANSWER):
return
self._initialize_json_field_state(buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.ANSWER)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
elif self._tool_input_started:
if self._initialize_json_field_state(buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT):
return
self._initialize_json_object_field_state(
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
)
Comment thread
maksymbuleshnyi marked this conversation as resolved.
Outdated

def _process_json_mode(self, final_answer_only: bool) -> None:
"""
Expand Down
136 changes: 131 additions & 5 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import types
from concurrent.futures import as_completed
from typing import Any, Callable, Literal, Mapping
from typing import Any, Callable, Literal, Mapping, Union, get_args, get_origin

from litellm import get_supported_openai_params, supports_function_calling, supports_response_schema
from pydantic import BaseModel, Field, PrivateAttr, field_validator, model_validator
Expand Down Expand Up @@ -39,7 +40,7 @@
StreamingMode,
)
from dynamiq.utils import generate_uuid, serialize_files_in_value
from dynamiq.utils.json_parser import parse_llm_json_output
from dynamiq.utils.json_parser import parse_llm_json_output, repair_truncated_json
from dynamiq.utils.logger import logger


Expand Down Expand Up @@ -79,8 +80,11 @@ def parse_arguments(cls, v: Any) -> Any:
if isinstance(v, str):
try:
return json.loads(v, strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
except json.JSONDecodeError:
try:
return json.loads(repair_truncated_json(v), strict=False)
except json.JSONDecodeError as e:
raise ValueError(f"Tool call arguments are not valid JSON: {e}")
return v or {}

def parse_as_tool_call(self) -> ToolCallArguments:
Expand Down Expand Up @@ -320,6 +324,106 @@ def _emit_tool_input_error(
)
self._streaming_tool_run_id = None

@staticmethod
def _annotation_accepts_none(annotation: Any) -> bool:
"""Return True if a Pydantic field annotation includes ``NoneType``."""
if annotation is type(None):
return True
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
return type(None) in get_args(annotation)
return False

@staticmethod
def _annotation_is_dict_like(annotation: Any) -> bool:
"""Return True if the annotation is ``dict`` / ``dict[...]`` or a union including one."""
if annotation is dict:
return True
origin = get_origin(annotation)
if origin is dict:
return True
if origin in (Union, types.UnionType):
return any(Agent._annotation_is_dict_like(arg) for arg in get_args(annotation))
return False

@staticmethod
def _extract_basemodel(annotation: Any) -> type[BaseModel] | None:
"""Return the BaseModel subclass in an annotation (handles ``Model | None``), else None."""
if isinstance(annotation, type) and issubclass(annotation, BaseModel):
return annotation
origin = get_origin(annotation)
if origin in (Union, types.UnionType):
for arg in get_args(annotation):
if isinstance(arg, type) and issubclass(arg, BaseModel):
return arg
return None

def _coerce_json_fields(self, tool: Node, action_input: dict) -> dict:
"""Parse stringified free-form dict fields back into dicts.

Strict mode can't express a free-form ``dict[str, Any]`` as an object, so
the schema transforms ship those fields as JSON-encoded strings (see the
provider converters). Here we reverse that: if the tool declares a
dict-typed field and the model supplied a JSON string for it, parse it
back so the tool's Pydantic schema validates the real dict.

Recurses into nested ``BaseModel`` fields so a free-form dict declared on a
sub-model is coerced too (mirrors ``_strip_nulls_for_fields``). Without this
a stringified dict nested inside a sub-model would never be parsed back and
the nested model's validation would reject the ``str``.
"""
self._coerce_json_for_fields(tool.input_schema.model_fields, action_input)
return action_input

def _coerce_json_for_fields(self, fields: Mapping[str, Any], data: Any) -> None:
if not isinstance(data, dict):
return
for name, field in fields.items():
value = data.get(name)
if isinstance(value, str) and self._annotation_is_dict_like(field.annotation):
stripped = value.strip()
if stripped.startswith("{") and stripped.endswith("}"):
try:
data[name] = json.loads(stripped)
except json.JSONDecodeError:
pass # leave as string; Pydantic will surface the error
elif isinstance(value, dict):
nested_model = self._extract_basemodel(field.annotation)
if nested_model is not None:
self._coerce_json_for_fields(nested_model.model_fields, value)

def _strip_protocol_nulls(self, tool: Node, action_input: dict) -> dict:
"""Drop ``None`` values for fields whose Pydantic annotation rejects None.

OpenAI strict mode requires every property in ``required`` and uses
``"null"`` in the type union as the signal for "leave it at the default."
Fields with a non-nullable default (``encoding: str = "utf-8"``) can't
accept that ``None`` directly — so we drop the key, letting the tool's
Pydantic default apply. Fields that genuinely accept ``None``
(``encoding: str | None = None``) keep it.

Recurses into nested ``BaseModel`` fields so the same applies at depth
(e.g. ``config.port`` where ``DBConfig.port: int = 8080``).
"""
self._strip_nulls_for_fields(tool.input_schema.model_fields, action_input)
return action_input

def _strip_nulls_for_fields(self, fields: Mapping[str, Any], data: Any) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let clean/rework 'matreshka' code

if not isinstance(data, dict):
return
for name in list(data):
field = fields.get(name)
if field is None:
continue
value = data[name]
if value is None:
if not self._annotation_accepts_none(field.annotation):
del data[name]
elif isinstance(value, dict):
nested_model = self._extract_basemodel(field.annotation)
if nested_model is not None:
self._strip_nulls_for_fields(nested_model.model_fields, value)

def _should_delegate_final(
self,
tool: Node | None,
Expand Down Expand Up @@ -791,6 +895,7 @@ def _handle_function_calling_mode(
if len(actual_tool_calls) > 1 and self.parallel_tool_calls_enabled:
tool_items = []
for tc in actual_tool_calls:
tc_name = tc.function.name.strip()
args = tc.function.parse_as_tool_call()
tc_input = args.action_input
if isinstance(tc_input, str):
Expand All @@ -800,9 +905,13 @@ def _handle_function_calling_mode(
raise ActionParsingException(f"Error parsing action_input string. {e}", recoverable=True)
if not isinstance(tc_input, dict):
tc_input = {"input": tc_input}
tc_tool = self.tool_by_names.get(self.sanitize_tool_name(tc_name))
if tc_tool is not None:
self._coerce_json_fields(tc_tool, tc_input)
self._strip_protocol_nulls(tc_tool, tc_input)
tool_items.append(
ToolCallItem(
name=tc.function.name.strip(),
name=tc_name,
input=tc_input,
thought=args.thought,
)
Expand All @@ -826,6 +935,11 @@ def _handle_function_calling_mode(
if not isinstance(action_input, dict):
action_input = {"input": action_input}

tool = self.tool_by_names.get(self.sanitize_tool_name(action))
if tool is not None:
self._coerce_json_fields(tool, action_input)
self._strip_protocol_nulls(tool, action_input)

self.log_reasoning(thought, action, action_input, loop_num)
return thought, action, action_input

Expand Down Expand Up @@ -1504,12 +1618,24 @@ def _run_react_llm_step(self, config: RunnableConfig | None, loop_num: int, **kw

try:
native_parallel = self.parallel_tool_calls_enabled and self.inference_mode == InferenceMode.FUNCTION_CALLING
# In FUNCTION_CALLING mode with tools present, force a tool call so
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why force tool call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this is purpose of function calling inference mode for agent. Otherwise it may not call a tool, and there is no point of such iteration then.

# the model cannot bail out with a text-only response. Honour any
# explicit caller override (kwargs / self.llm.tool_choice).
forced_tool_choice = None
if (
self.inference_mode == InferenceMode.FUNCTION_CALLING
and self._tools
and "tool_choice" not in kwargs
and getattr(self.llm, "tool_choice", None) is None
):
forced_tool_choice = "required"
llm_result = self._run_llm(
messages=messages,
tools=self._tools,
response_format=self._response_format,
config=llm_config,
parallel_tool_calls=True if native_parallel else None,
**({"tool_choice": forced_tool_choice} if forced_tool_choice else {}),
Comment thread
maksymbuleshnyi marked this conversation as resolved.
**kwargs,
)
finally:
Expand Down
Loading
Loading