From a589a3032e81c70dfbc6d21c8599f25c26f824a8 Mon Sep 17 00:00:00 2001 From: Hwuiwon Kim Date: Wed, 19 Nov 2025 23:14:25 -0500 Subject: [PATCH 1/2] feat: introduce Inworld WebSocket TTS service supporting multiple audio contexts, along with an example. --- .../07ab-interruptible-inworld-websocket.py | 130 +++++++ src/pipecat/services/inworld/tts.py | 339 +++++++++++++++++- 2 files changed, 467 insertions(+), 2 deletions(-) create mode 100644 examples/foundational/07ab-interruptible-inworld-websocket.py diff --git a/examples/foundational/07ab-interruptible-inworld-websocket.py b/examples/foundational/07ab-interruptible-inworld-websocket.py new file mode 100644 index 0000000000..1e0d9c049a --- /dev/null +++ b/examples/foundational/07ab-interruptible-inworld-websocket.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.inworld.tts import InworldWebsocketTTSService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + # Inworld WebSocket TTS Service with multi-context streaming support + tts = InworldWebsocketTTSService( + api_key=os.getenv("INWORLD_API_KEY", ""), + voice_id="Ashley", + model="inworld-tts-1", + # Params are optional: you can pass speaking_rate, temperature, etc. + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are very knowledgable about dogs. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index ab218b3c09..34d6e2ee49 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -55,14 +55,25 @@ ) """ +import asyncio import base64 import json -from typing import AsyncGenerator, Optional +import uuid +from typing import Any, AsyncGenerator, Dict, Optional import aiohttp from loguru import logger from pydantic import BaseModel +try: + import websockets + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Inworld WebSocket TTS, you need to `pip install websockets`.") + raise Exception(f"Missing module: {e}") + from pipecat.frames.frames import ( CancelFrame, EndFrame, @@ -73,7 +84,7 @@ TTSStartedFrame, TTSStoppedFrame, ) -from pipecat.services.tts_service import TTSService +from pipecat.services.tts_service import TTSService, WebsocketTTSService from pipecat.utils.tracing.service_decorators import traced_tts @@ -595,3 +606,327 @@ async def _process_audio_chunk(self, audio_chunk: bytes) -> AsyncGenerator[Frame sample_rate=self.sample_rate, # Configured sample rate (48kHz) num_channels=1, # Mono audio ) + + +class InworldWebsocketTTSService(WebsocketTTSService): + """Inworld AI WebSocket-based Text-to-Speech Service with context support. + + This service uses Inworld's WebSocket TTS API, which supports multiple + independent audio contexts per connection. Each context can be flushed or + closed independently, allowing concurrent synthesis streams over a single + websocket. + """ + + class InputParams(BaseModel): + """Optional WebSocket input parameters for Inworld TTS configuration. + + Parameters: + temperature: Voice temperature control for synthesis variability (range: [0, 2]). + speaking_rate: Speaking speed control (range: [0.5, 1.5]). + apply_text_normalization: Toggle Inworld's text normalization ("ON", "OFF", + or "APPLY_TEXT_NORMALIZATION_UNSPECIFIED"). + max_buffer_delay_ms: Maximum buffering delay (ms) before auto-flush. + buffer_char_threshold: Character threshold that triggers auto-flush. + """ + + temperature: Optional[float] = None + speaking_rate: Optional[float] = None + apply_text_normalization: Optional[str] = None + max_buffer_delay_ms: Optional[int] = None + buffer_char_threshold: Optional[int] = None + + def __init__( + self, + *, + api_key: str, + voice_id: str = "Ashley", + model: str = "inworld-tts-1", + url: str = "wss://api.inworld.ai/tts/v1/voice:stream", + sample_rate: Optional[int] = None, + encoding: str = "LINEAR16", + params: Optional[InputParams] = None, + **kwargs: Any, + ): + """Initialize the Inworld WebSocket TTS service.""" + super().__init__( + push_stop_frames=True, + pause_frame_processing=True, + sample_rate=sample_rate, + **kwargs, + ) + + params = params or InworldWebsocketTTSService.InputParams() + + self._api_key = api_key + self._url = url + self._settings: Dict[str, Any] = { + "voiceId": voice_id, + "modelId": model, + "audioConfig": { + "audioEncoding": encoding, + "sampleRateHertz": 0, + }, + } + + if params.temperature is not None: + self._settings["temperature"] = params.temperature + if params.speaking_rate is not None: + self._settings["audioConfig"]["speakingRate"] = params.speaking_rate + if params.apply_text_normalization is not None: + self._settings["applyTextNormalization"] = params.apply_text_normalization + + self._buffer_settings = { + "maxBufferDelayMs": params.max_buffer_delay_ms, + "bufferCharThreshold": params.buffer_char_threshold, + } + + self._receive_task = None + self._audio_context_task = None + self._contexts_queue: asyncio.Queue = asyncio.Queue() + self._contexts: Dict[str, asyncio.Queue] = {} + self._active_contexts = set() + + self.set_voice(voice_id) + self.set_model_name(model) + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics.""" + return True + + async def start(self, frame: StartFrame): + """Start the Inworld WebSocket TTS service.""" + await super().start(frame) + self._settings["audioConfig"]["sampleRateHertz"] = self.sample_rate + self._create_audio_context_task() + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the Inworld WebSocket TTS service.""" + await super().stop(frame) + await self._stop_audio_context_task() + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the Inworld WebSocket TTS service.""" + await super().cancel(frame) + await self._stop_audio_context_task() + await self._disconnect() + + async def flush_audio(self): + """Flush any pending audio for all active contexts.""" + for context_id in list(self._active_contexts): + await self._send_flush(context_id) + + def _create_audio_context_task(self): + if not self._audio_context_task: + self._contexts_queue = asyncio.Queue() + self._contexts = {} + self._audio_context_task = self.create_task(self._audio_context_task_handler()) + + async def _stop_audio_context_task(self): + if self._audio_context_task: + await self.cancel_task(self._audio_context_task) + self._audio_context_task = None + + async def _audio_context_task_handler(self): + """Process audio contexts in order of creation.""" + running = True + while running: + context_id = await self._contexts_queue.get() + + if context_id: + await self._handle_audio_context(context_id) + self._active_contexts.discard(context_id) + del self._contexts[context_id] + + silence = b"\x00" * self.sample_rate + frame = TTSAudioRawFrame( + audio=silence, sample_rate=self.sample_rate, num_channels=1 + ) + await self.push_frame(frame) + else: + running = False + + self._contexts_queue.task_done() + + async def _handle_audio_context(self, context_id: str): + AUDIO_CONTEXT_TIMEOUT = 3.0 + queue = self._contexts[context_id] + running = True + while running: + try: + frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT) + if frame: + await self.push_frame(frame) + running = frame is not None + except asyncio.TimeoutError: + logger.trace(f"{self} time out on audio context {context_id}") + break + + async def create_audio_context(self, context_id: str): + """Register a new audio context.""" + await self._contexts_queue.put(context_id) + self._contexts[context_id] = asyncio.Queue() + self._active_contexts.add(context_id) + logger.trace(f"{self} created audio context {context_id}") + + async def append_to_audio_context(self, context_id: str, frame: TTSAudioRawFrame): + """Append audio to a context queue.""" + if self.audio_context_available(context_id): + await self._contexts[context_id].put(frame) + else: + logger.warning(f"{self} unable to append audio to context {context_id}") + + async def remove_audio_context(self, context_id: str): + """Mark the context as complete.""" + if self.audio_context_available(context_id): + await self._contexts[context_id].put(None) + else: + logger.warning(f"{self} unable to remove context {context_id}") + + def audio_context_available(self, context_id: str) -> bool: + """Check whether an audio context exists.""" + return context_id in self._contexts + + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + async def _connect(self): + await self._connect_websocket() + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) + + async def _disconnect(self): + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + try: + if self._websocket and self._websocket.state is State.OPEN: + return + + logger.debug("Connecting to Inworld WebSocket TTS") + headers = [("Authorization", f"Basic {self._api_key}")] + self._websocket = await websocket_connect(self._url, extra_headers=headers) + await self._call_event_handler("on_connected") + except Exception as e: + logger.error(f"{self} connection exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} connection error: {e}")) + self._websocket = None + + async def _disconnect_websocket(self): + if self._websocket: + try: + await self._websocket.close() + except Exception as e: + logger.warning(f"{self} disconnect warning: {e}") + self._websocket = None + await self._call_event_handler("on_disconnected") + + async def _receive_messages(self): + """Handle incoming WebSocket messages from Inworld.""" + async for message in self._get_websocket(): + try: + msg = json.loads(message) + except json.JSONDecodeError: + logger.warning(f"{self} received non-JSON message") + continue + + result = msg.get("result", msg) + ctx_id = ( + msg.get("contextId") + or msg.get("context_id") + or result.get("contextId") + or result.get("context_id") + ) + + if "error" in msg: + await self.push_error(ErrorFrame(error=str(msg["error"]))) + continue + + audio_b64 = ( + result.get("audioContent") + or result.get("audioChunk") + or result.get("audio") + or result.get("audio_chunk") + ) + + if audio_b64: + await self.stop_ttfb_metrics() + audio = base64.b64decode(audio_b64) + if len(audio) > 44 and audio.startswith(b"RIFF"): + audio = audio[44:] + frame = TTSAudioRawFrame(audio, self.sample_rate, 1) + + if ctx_id: + if not self.audio_context_available(ctx_id): + await self.create_audio_context(ctx_id) + await self.append_to_audio_context(ctx_id, frame) + else: + await self.push_frame(frame) + + if result.get("isFinal") or result.get("final"): + if ctx_id and self.audio_context_available(ctx_id): + await self.remove_audio_context(ctx_id) + await self.push_frame(TTSStoppedFrame()) + + async def _send_context(self, context_id: str): + """Send context creation/configuration message.""" + config = { + "voiceId": self._settings["voiceId"], + "modelId": self._settings["modelId"], + "audioConfig": self._settings["audioConfig"], + } + + if "temperature" in self._settings: + config["temperature"] = self._settings["temperature"] + if "applyTextNormalization" in self._settings: + config["applyTextNormalization"] = self._settings["applyTextNormalization"] + if self._buffer_settings["maxBufferDelayMs"] is not None: + config["maxBufferDelayMs"] = self._buffer_settings["maxBufferDelayMs"] + if self._buffer_settings["bufferCharThreshold"] is not None: + config["bufferCharThreshold"] = self._buffer_settings["bufferCharThreshold"] + + msg = {"action": "create_context", "contextId": context_id, "config": config} + await self.send_with_retry(json.dumps(msg), self._report_error) + + async def _send_text(self, context_id: str, text: str): + msg = {"action": "synthesize", "contextId": context_id, "text": text} + await self.send_with_retry(json.dumps(msg), self._report_error) + + async def _send_flush(self, context_id: str): + msg = {"action": "flush", "contextId": context_id} + await self.send_with_retry(json.dumps(msg), self._report_error) + + @traced_tts + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Inworld's WebSocket API.""" + logger.debug(f"{self}: Generating WebSocket TTS [{text}]") + + try: + if not self._websocket or self._websocket.state is State.CLOSED: + await self._connect() + + context_id = str(uuid.uuid4()) + if not self.audio_context_available(context_id): + await self.create_audio_context(context_id) + + await self.start_ttfb_metrics() + yield TTSStartedFrame() + + await self._send_context(context_id) + await self._send_text(context_id, text) + await self._send_flush(context_id) + await self.start_tts_usage_metrics(text) + + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") + yield TTSStoppedFrame() From fe92cd4b38a81dd88ce0fd08af35df64ff159791 Mon Sep 17 00:00:00 2001 From: Hwuiwon Kim Date: Thu, 20 Nov 2025 10:35:11 -0500 Subject: [PATCH 2/2] fix --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5749e60da5..8e9bbb21ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added optional speaking rate control to `InworldTTSService`. +- Added Websocket support for `InworldTTSService`. + ### Changed - ⚠️ Breaking change: `LLMContext.create_image_message()`,