Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions dynamiq/callbacks/inner_thoughts_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""Inline ``thought`` extraction for streaming FC arguments.

Splits the LLM's streaming JSON object into two output streams: the tool's
real params (with ``thought`` removed) and just the thought value content.
"""

INNER_THOUGHTS_DEFAULT_KEY = "thought"


class JSONInnerThoughtsExtractor:
"""Streaming JSON parser that routes the ``thought`` field into a separate buffer."""

def __init__(
self,
inner_thoughts_key: str = INNER_THOUGHTS_DEFAULT_KEY,
wait_for_first_key: bool = False,
) -> None:
self.inner_thoughts_key = inner_thoughts_key
self.wait_for_first_key = wait_for_first_key

# Cumulative buffers across all process_fragment calls.
self.main_buffer: str = ""
self.inner_thoughts_buffer: str = ""
self.main_json_held_buffer: str = ""

# Parser state.
self.state: str = "start"
self.in_string: bool = False
self.escaped: bool = False
self.current_key: str = ""
self.is_inner_thoughts_value: bool = False
self.inner_thoughts_processed: bool = False
self.hold_main_json: bool = wait_for_first_key

# Top-level transitions only fire at depth == 1; deeper structures pass through.
self.depth: int = 0

@property
def thought_complete(self) -> bool:
"""Whether the thought field's value has been fully processed."""
return self.inner_thoughts_processed

@property
def held_main_buffer(self) -> str:
"""Held bytes not yet flushed; drained at end-of-stream when thought was missing."""
return self.main_json_held_buffer

def process_fragment(self, fragment: str) -> tuple[str, str]:
"""Feed a chunk; returns ``(main_delta, thought_delta)`` for this fragment."""
updates_main: list[str] = []
updates_thought: list[str] = []
for c in fragment:
main_chunk, thought_chunk = self._process_char(c)
if main_chunk:
updates_main.append(main_chunk)
if thought_chunk:
updates_thought.append(thought_chunk)
return "".join(updates_main), "".join(updates_thought)

def _emit_main(self, s: str) -> str:
"""Append to main buffer (held or live); return the delta to surface."""
if self.hold_main_json:
self.main_json_held_buffer += s
return ""
self.main_buffer += s
return s

def _flush_held_buffer(self) -> str:
"""Move held bytes to the live buffer and surface them as a delta."""
if not self.main_json_held_buffer:
self.hold_main_json = False
return ""
delta = self.main_json_held_buffer
self.main_buffer += delta
self.main_json_held_buffer = ""
self.hold_main_json = False
return delta

def _emit_thought(self, s: str) -> str:
self.inner_thoughts_buffer += s
return s

def _process_char(self, c: str) -> tuple[str, str]:
"""Process a single character and return its ``(main, thought)`` delta."""

if self.escaped:
self.escaped = False
return self._consume_value_char(c)

if c == "\\":
self.escaped = True
if self.in_string:
return self._consume_value_char(c)
return "", ""

if c == '"':
return self._handle_quote()

if self.in_string:
return self._consume_value_char(c)

# Structural characters (outside any string).
if c == "{":
return self._handle_open_object()

if c == "[":
return self._handle_open_bracket()

if c == "}":
return self._handle_close_object()

if c == "]":
return self._handle_close_bracket()

if self.depth >= 2:
# Inside nested object/array — passthrough to value's target buffer.
return self._consume_value_char(c)

if c == ":" and self.state == "colon":
return self._handle_colon()

if c == "," and self.state == "comma_or_end":
return self._handle_comma()

if self.state == "value":
# Non-string scalar in top-level value (number, bool, null).
return self._consume_value_char(c)

# Whitespace or non-structural chars outside any value — ignore.
return "", ""

def _consume_value_char(self, c: str) -> tuple[str, str]:
"""Route char to thought or main based on whose value we're in."""
if self.in_string and self.state == "key":
self.current_key += c
return "", ""
if self.is_inner_thoughts_value and self.depth == 1:
return "", self._emit_thought(c)
return self._emit_main(c), ""

def _handle_quote(self) -> tuple[str, str]:
"""Handle an unescaped ``"``."""
self.in_string = not self.in_string

if self.in_string:
# Opening quote.
if self.depth >= 2:
# Inside nested value — passthrough.
if self.is_inner_thoughts_value and self.depth == 1:
return "", self._emit_thought('"')
return self._emit_main('"'), ""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unreachable condition in _handle_quote nested thought routing

Medium Severity

The condition if self.is_inner_thoughts_value and self.depth == 1 on line 149 is inside a block guarded by if self.depth >= 2 on line 147. Since depth >= 2 and depth == 1 are mutually exclusive, the inner branch can never execute — it's dead code. This means quotes inside nested thought values (if thought were ever an object or array) would be incorrectly routed to _emit_main instead of _emit_thought. Other methods like _handle_open_object, _handle_close_object, and _consume_value_char correctly handle nested thought routing, but _handle_quote does not — both the opening quote path (line 149) and the closing quote path (line 170-171) unconditionally route to main at depth >= 2.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dfb36c2. Configure here.


if self.state in ("start", "comma_or_end"):
# Start of a new top-level key — flush held bytes if thought is done.
main_delta = ""
if self.wait_for_first_key and self.hold_main_json and self.inner_thoughts_processed:
main_delta = self._flush_held_buffer()
self.state = "key"
self.current_key = ""
return main_delta, ""

if self.state == "value":
if self.is_inner_thoughts_value:
return "", ""
return self._emit_main('"'), ""

return "", ""

# Closing quote.
if self.depth >= 2:
return self._emit_main('"'), ""

if self.state == "key":
self.state = "colon"
return "", ""

if self.state == "value":
if self.is_inner_thoughts_value:
self.inner_thoughts_processed = True
self.state = "comma_or_end"
return "", ""
self.state = "comma_or_end"
return self._emit_main('"'), ""

return "", ""

def _handle_open_object(self) -> tuple[str, str]:
self.depth += 1
if self.depth == 1:
# Outermost ``{``.
return self._emit_main("{"), ""
# Nested object literal as a value — passthrough.
if self.is_inner_thoughts_value and self.depth == 2:
return "", self._emit_thought("{")
return self._emit_main("{"), ""

def _handle_open_bracket(self) -> tuple[str, str]:
self.depth += 1
if self.is_inner_thoughts_value and self.depth == 2:
return "", self._emit_thought("[")
return self._emit_main("["), ""

def _handle_close_object(self) -> tuple[str, str]:
self.depth -= 1
if self.depth >= 1:
# Closing a nested object — passthrough.
if self.is_inner_thoughts_value and self.depth == 1:
return "", self._emit_thought("}")
return self._emit_main("}"), ""

# Outermost `}` — strip dangling comma (thought-last case) before closing.
if self.hold_main_json:
stripped = self.main_json_held_buffer.rstrip()
if stripped.endswith(","):
self.main_json_held_buffer = stripped[:-1]
self.main_json_held_buffer += "}"
self.state = "end"
return "", ""

if self.main_buffer.rstrip().endswith(","):
stripped = self.main_buffer.rstrip()
self.main_buffer = stripped[:-1]
self.main_buffer += "}"
self.state = "end"
return "}", ""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming deltas contain irrevocable dangling comma when thought is last

Medium Severity

When wait_for_first_key=False (the default) and thought is the last field, _handle_close_object retroactively strips a trailing comma from main_buffer but returns "}" as the delta. The comma was already returned as a delta by _handle_comma in a previous call and cannot be revoked. Concatenating all deltas from process_fragment produces invalid JSON like {"a":"x",}, even though main_buffer is correctly patched to {"a":"x"}. The cumulative buffer and the sum of streaming deltas are inconsistent.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dfb36c2. Configure here.


def _handle_close_bracket(self) -> tuple[str, str]:
self.depth -= 1
if self.is_inner_thoughts_value and self.depth == 1:
return "", self._emit_thought("]")
return self._emit_main("]"), ""

def _handle_colon(self) -> tuple[str, str]:
"""Top-level `:` — colon → value transition."""
self.state = "value"
self.is_inner_thoughts_value = self.current_key == self.inner_thoughts_key
if self.is_inner_thoughts_value:
# Skip the `"thought":` prefix from main.
return "", ""
return self._emit_main(f'"{self.current_key}":'), ""

def _handle_comma(self) -> tuple[str, str]:
"""Top-level `,` — separates fields."""
if self.is_inner_thoughts_value:
# Drop comma after thought to avoid a dangling separator.
self.is_inner_thoughts_value = False
self.state = "start"
return "", ""

self.state = "start"
return self._emit_main(","), ""
66 changes: 51 additions & 15 deletions dynamiq/callbacks/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from dynamiq.callbacks import BaseCallbackHandler
from dynamiq.callbacks.base import get_run_id
from dynamiq.callbacks.inner_thoughts_extractor import JSONInnerThoughtsExtractor
from dynamiq.types.streaming import (
AgentToolData,
AgentToolInputDeltaData,
Expand Down Expand Up @@ -369,6 +370,9 @@ def __init__(self, agent: "Agent", config, loop_num: int, **kwargs):
self._current_action_name: str | None = None
self._fc_object_tool_input: bool = False
self._fc_object_answer: bool = False
# FC inline thought extractor + per-chunk delta.
self._fc_extractor: JSONInnerThoughtsExtractor | None = None
self._latest_fc_args_delta: str = ""
self._brace_depth: int = 0
self._brace_scan_index: int = 0
self._so_action_emitted: bool = False
Expand Down Expand Up @@ -417,6 +421,7 @@ def on_node_execute_stream(self, serialized: dict[str, Any], chunk: dict[str, An
new_id = generate_uuid()
self.agent._streaming_tool_run_id = new_id
self.agent._streaming_tool_run_ids.append(new_id)
self._latest_fc_args_delta = text_delta or ""
else:
text_delta = self._extract_text_delta(chunk)

Expand Down Expand Up @@ -452,7 +457,26 @@ def on_node_execute_end(self, serialized: dict[str, Any], output_data: dict[str,

def _flush_buffer(self) -> None:
"""Flush the remaining buffer content by streaming it as one chunk."""
if not self._buffer or len(self._buffer) <= self._state_last_emit_index:
if not self._buffer:
self._flush_chunk_buffer()
return

# FC fallback: drain extractor's held buffer when thought was missing.
if (
self.mode_name == InferenceMode.FUNCTION_CALLING.value
and self._fc_extractor is not None
and self._tool_input_started
and not self._answer_started
and not self._state_has_emitted.get(StreamingState.TOOL_INPUT, False)
):
held = self._fc_extractor.held_main_buffer
if held:
self._emit(held, step=StreamingState.TOOL_INPUT)
self._state_last_emit_index = len(self._buffer)
self._flush_chunk_buffer()
return

if len(self._buffer) <= self._state_last_emit_index:
self._flush_chunk_buffer()
return

Expand All @@ -475,6 +499,8 @@ def _reset_tool_call_state(self) -> None:
self._current_action_name = None
self._fc_object_tool_input = False
self._fc_object_answer = False
self._fc_extractor = None
self._latest_fc_args_delta = ""
self._brace_depth = 0
self._brace_scan_index = 0
self._state_has_emitted = {
Expand Down Expand Up @@ -905,8 +931,27 @@ def _process_structured_output_mode(self, final_answer_only: bool) -> None:
self._so_action_emitted = True

def _process_function_calling_mode(self, final_answer_only: bool) -> None:
"""Process function calling mode."""
self._process_json_mode(final_answer_only)
"""FC mode: route `thought` to REASONING, tool args to TOOL_INPUT."""
if self._answer_started:
self._process_json_mode(final_answer_only)
return

if self._fc_extractor is None:
self._fc_extractor = JSONInnerThoughtsExtractor(
inner_thoughts_key=JSONStreamingField.THOUGHT.value,
wait_for_first_key=True,
)

delta = self._latest_fc_args_delta
self._latest_fc_args_delta = ""
if not delta:
return

main_delta, thought_delta = self._fc_extractor.process_fragment(delta)
if thought_delta and not final_answer_only:
self._emit(thought_delta, step=StreamingState.REASONING)
if main_delta:
self._emit(main_delta, step=StreamingState.TOOL_INPUT)

def _find_unescaped_quote_end(self, input_string: str, start_quote_index: int) -> int:
"""
Expand Down Expand Up @@ -1060,10 +1105,10 @@ def _initialize_json_object_field_state(self, buf: str, field_name: str, state:
return False

def _try_initialize_next_json_field(self, buf: str, final_answer_only: bool) -> None:
"""Try to initialize the next JSON field state (thought, answer, or action_input).
"""Try to initialize the next JSON field state (thought or answer).

Each initializer is a no-op when _current_state is already set, so this is safe
to call multiple times within a single chunk processing cycle.
Used by the ANSWER path (provide_final_answer). FC tool calls use the
inline extractor in ``_process_function_calling_mode`` instead.
"""
if not self._state_has_emitted.get(StreamingState.REASONING, False):
self._initialize_json_field_state(
Expand All @@ -1077,15 +1122,6 @@ def _try_initialize_next_json_field(self, buf: str, final_answer_only: bool) ->
buf, JSONStreamingField.ANSWER.value, StreamingState.ANSWER
)

if self._tool_input_started and not self._answer_started:
if not self._initialize_json_field_state(
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
):
if self._current_state is None:
self._initialize_json_object_field_state(
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
)

def _emit_tool_input_state(self, buf: str) -> None:
"""Emit content for the current TOOL_INPUT state."""
if self._fc_object_tool_input:
Expand Down
Loading
Loading