Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/foundational/14a-function-calling-anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-7-sonnet-latest",
wait_for_all=True,
params=AnthropicLLMService.InputParams(
max_tokens=16000,
extra={
"thinking": {"type": "enabled", "budget_tokens": 10000},
},
),
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
Expand Down
27 changes: 27 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,33 @@ class LLMContextFrame(Frame):
context: "LLMContext"


@dataclass
class LLMThinkingTextFrame(DataFrame):
"""Reasoning frame generated by LLM services."""

thinking: str

def __post_init__(self):
super().__post_init__()
# LLM services send text frames with all necessary spaces included
self.includes_inter_frame_spaces = True

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, thinking: {self.thinking})"


@dataclass
class LLMThinkingSignatureFrame(DataFrame):
"""Reasoning signature frame generated by LLM services."""

signature: str

def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, signature: {self.signature})"


@dataclass
class LLMMessagesFrame(DataFrame):
"""Frame containing LLM messages for chat completion.
Expand Down
49 changes: 49 additions & 0 deletions src/pipecat/processors/aggregators/llm_response_universal.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
LLMRunFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
LLMThinkingSignatureFrame,
LLMThinkingTextFrame,
SpeechControlParamsFrame,
StartFrame,
TextFrame,
Expand Down Expand Up @@ -591,6 +593,7 @@ def __init__(
self._started = 0
self._function_calls_in_progress: Dict[str, Optional[FunctionCallInProgressFrame]] = {}
self._context_updated_tasks: Set[asyncio.Task] = set()
self._thinking: List[TextPartForConcatenation] = []

@property
def has_function_calls_in_progress(self) -> bool:
Expand All @@ -601,6 +604,11 @@ def has_function_calls_in_progress(self) -> bool:
"""
return bool(self._function_calls_in_progress)

async def reset(self):
"""Reset the aggregation state."""
await super().reset()
self._thinking = []

async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for assistant response aggregation and function call management.

Expand All @@ -619,6 +627,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self._handle_llm_end(frame)
elif isinstance(frame, TextFrame):
await self._handle_text(frame)
elif isinstance(frame, LLMThinkingTextFrame):
await self._handle_thinking(frame)
elif isinstance(frame, LLMThinkingSignatureFrame):
await self._handle_thinking_signature(frame)
elif isinstance(frame, LLMRunFrame):
await self._handle_llm_run(frame)
elif isinstance(frame, LLMMessagesAppendFrame):
Expand Down Expand Up @@ -663,6 +675,14 @@ async def push_aggregation(self):
timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601())
await self.push_frame(timestamp_frame)

def thinking_string(self) -> str:
"""Get the current thinking as a string.

Returns:
The concatenated thinking string.
"""
return concatenate_aggregated_text(self._thinking)

async def _handle_llm_run(self, frame: LLMRunFrame):
await self.push_context_frame(FrameDirection.UPSTREAM)

Expand Down Expand Up @@ -824,6 +844,35 @@ async def _handle_text(self, frame: TextFrame):
)
)

async def _handle_thinking(self, frame: LLMThinkingTextFrame):
if not self._started:
return

# Make sure we really have text (spaces count, too!)
if len(frame.thinking) == 0:
return

self._thinking.append(
TextPartForConcatenation(
frame.thinking, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)

async def _handle_thinking_signature(self, frame: LLMThinkingSignatureFrame):
if not self._started:
return

thinking = self.thinking_string()

self._context.add_message(
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": thinking, "signature": frame.signature},
],
}
)

def _context_updated_task_finished(self, task: asyncio.Task):
self._context_updated_tasks.discard(task)

Expand Down
6 changes: 6 additions & 0 deletions src/pipecat/services/anthropic/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMTextFrame,
LLMThinkingSignatureFrame,
LLMThinkingTextFrame,
LLMUpdateSettingsFrame,
UserImageRawFrame,
)
Expand Down Expand Up @@ -380,6 +382,10 @@ async def _process_context(self, context: OpenAILLMContext | LLMContext):
completion_tokens_estimate += self._estimate_tokens(
event.delta.partial_json
)
elif hasattr(event.delta, "thinking"):
await self.push_frame(LLMThinkingTextFrame(event.delta.thinking))
elif hasattr(event.delta, "signature"):
await self.push_frame(LLMThinkingSignatureFrame(event.delta.signature))
elif event.type == "content_block_start":
if event.content_block.type == "tool_use":
tool_use_block = event.content_block
Expand Down