From 16513cab4bf46a2d25637ae3b6d1da18f7bff83b Mon Sep 17 00:00:00 2001 From: zhuermu Date: Sat, 11 Jan 2025 14:46:50 +0000 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20Update=20the=20plugin=20bedrockPyth?= =?UTF-8?q?on=20to=20support=20video=20input,=20enabling=20Bedrock?= =?UTF-8?q?=E2=80=99s=20multimodal=20model=20to=20simulate=20real-time=20v?= =?UTF-8?q?ideo=20conversation=20capabilities.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agents/examples/demo/property.json | 245 ++++++++++ .../extension/bedrock_llm_python/README.md | 95 +++- .../extension/bedrock_llm_python/__init__.py | 7 +- .../extension/bedrock_llm_python/addon.py | 18 + .../bedrock_llm_python/bedrock_llm.py | 84 ---- .../bedrock_llm_extension.py | 374 --------------- .../extension/bedrock_llm_python/extension.py | 431 ++++++++++++++++++ .../bedrock_llm_python/manifest.json | 104 ++++- .../bedrock_llm_python/property.json | 19 +- .../bedrock_llm_python/requirements.txt | 6 +- .../extension/bedrock_llm_python/utils.py | 119 +++++ .../extension/polly_tts/extension.py | 12 +- .../extension/polly_tts/polly_tts.py | 18 +- .../transcribe_asr_addon.py | 1 - .../transcribe_asr_extension.py | 22 +- .../transcribe_wrapper.py | 26 +- demo/src/app/api/agents/start/graph.ts | 20 +- demo/src/common/constant.ts | 6 +- 18 files changed, 1107 insertions(+), 500 deletions(-) create mode 100644 agents/ten_packages/extension/bedrock_llm_python/addon.py delete mode 100644 agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py delete mode 100644 agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py create mode 100644 agents/ten_packages/extension/bedrock_llm_python/extension.py create mode 100644 agents/ten_packages/extension/bedrock_llm_python/utils.py diff --git a/agents/examples/demo/property.json b/agents/examples/demo/property.json index 6d837b4d..7ac274c2 100644 --- a/agents/examples/demo/property.json +++ b/agents/examples/demo/property.json @@ -1491,6 +1491,251 @@ ] } ] + }, + { + "name": "va_nova_multimodal_aws", + "auto_start": true, + "nodes": [ + { + "type": "extension", + "name": "agora_rtc", + "addon": "agora_rtc", + "extension_group": "default", + "property": { + "app_id": "${env:AGORA_APP_ID}", + "token": "", + "channel": "ten_agent_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "publish_audio": true, + "publish_data": true, + "enable_agora_asr": false, + "agora_asr_vendor_name": "microsoft", + "agora_asr_language": "en-US", + "agora_asr_vendor_key": "${env:AZURE_STT_KEY|}", + "agora_asr_vendor_region": "${env:AZURE_STT_REGION|}", + "agora_asr_session_control_file_path": "session_control.conf", + "subscribe_video_pix_fmt": 4, + "subscribe_video": true + } + }, + { + "type": "extension", + "name": "stt", + "addon": "transcribe_asr_python", + "extension_group": "stt", + "property": { + "access_key": "${env:AWS_ACCESS_KEY_ID}", + "lang_code": "en-US", + "region": "us-east-1", + "sample_rate": "16000", + "secret_key": "${env:AWS_SECRET_ACCESS_KEY}" + } + }, + { + "type": "extension", + "name": "llm", + "addon": "bedrock_llm_python", + "extension_group": "chatgpt", + "property": { + "access_key_id": "${env:AWS_ACCESS_KEY_ID}", + "greeting": "TEN Agent connected. I am nova, How can I help you today?", + "max_memory_length": "10", + "max_tokens": 256, + "model": "us.amazon.nova-lite-v1:0", + "prompt": "Now you are an intelligent assistant with real-time interaction capabilities. I will provide you with a series of real-time video image information. Please understand these images as video frames. Based on the images and the user's input, engage in a conversation with the user, remembering the dialogue content in a concise and clear manner.", + "region": "us-east-1", + "secret_access_key": "${env:AWS_SECRET_ACCESS_KEY}", + "temperature": 0.7, + "topK": 10, + "topP": 0.5, + "is_memory_enabled": false, + "is_enable_video": true + } + }, + { + "type": "extension", + "name": "tts", + "addon": "polly_tts", + "extension_group": "tts", + "property": { + "region": "us-east-1", + "access_key": "${env:AWS_ACCESS_KEY_ID}", + "secret_key": "${env:AWS_SECRET_ACCESS_KEY}", + "engine": "generative", + "voice": "Ruth", + "sample_rate": 16000, + "lang_code": "en-US" + } + }, + { + "type": "extension", + "name": "interrupt_detector", + "addon": "interrupt_detector_python", + "extension_group": "default", + "property": {} + }, + { + "type": "extension", + "name": "message_collector", + "addon": "message_collector", + "extension_group": "transcriber", + "property": {} + } + ], + "connections": [ + { + "extension": "agora_rtc", + "cmd": [ + { + "name": "on_user_joined", + "dest": [ + { + "extension": "llm" + } + ] + }, + { + "name": "on_user_left", + "dest": [ + { + "extension": "llm" + } + ] + }, + { + "name": "on_connection_failure", + "dest": [ + { + "extension": "llm" + } + ] + } + ], + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension": "stt" + } + ] + } + ], + "video_frame": [ + { + "name": "video_frame", + "dest": [ + { + "extension": "llm" + } + ] + } + ] + }, + { + "extension": "stt", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension": "interrupt_detector" + }, + { + "extension": "message_collector" + } + ] + } + ] + }, + { + "extension": "llm", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension": "tts" + } + ] + } + ], + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension": "tts" + }, + { + "extension": "message_collector" + } + ] + } + ] + }, + { + "extension": "message_collector", + "data": [ + { + "name": "data", + "dest": [ + { + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension": "tts", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension": "agora_rtc" + } + ] + } + ], + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension": "interrupt_detector", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension": "llm" + } + ] + } + ], + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension": "llm" + } + ] + } + ] + } + ] } ], "log_level": 3 diff --git a/agents/ten_packages/extension/bedrock_llm_python/README.md b/agents/ten_packages/extension/bedrock_llm_python/README.md index 7d9bd713..1565bd56 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/README.md +++ b/agents/ten_packages/extension/bedrock_llm_python/README.md @@ -9,4 +9,97 @@ You can config this extension by providing following environments: | AWS_REGION | No | us-east-1 | The Region of Amazon Bedrock service you want to use. | | AWS_ACCESS_KEY_ID | No | - | Access Key of your IAM User, make sure you've set proper permissions to [invoke Bedrock models](https://docs.aws.amazon.com/bedrock/latest/userguide/security_iam_id-based-policy-examples.html) and gain [models access](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html) in Bedrock. Will use default credentials provider if not provided. Check [document](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html). | | AWS_SECRET_ACCESS_KEY | No | - | Secret Key of your IAM User, make sure you've set proper permissions to [invoke Bedrock models](https://docs.aws.amazon.com/bedrock/latest/userguide/security_iam_id-based-policy-examples.html) and gain [models access](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html) in Bedrock. Will use default credentials provider if not provided. Check [document](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html). | -| AWS_BEDROCK_MODEL | No | Claude 3.5(anthropic.claude-3-5-sonnet-20240620-v1:0) | Bedrock model id, check [docuement](https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns). | \ No newline at end of file +| AWS_BEDROCK_MODEL | No | Nova (https://docs.aws.amazon.com/nova/latest/userguide/what-is-nova.html) | Bedrock model id, check [docuement](https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns). | + +## Features + +- Real-time video and audio interaction similar to Gemini 2.0 +- Audio recognition using TEN framework's STT plugin +- Text-to-speech conversion using TEN framework's TTS plugin +- Integration with AWS Bedrock's Nova model +- Smart input truncation logic +- Multi-language support + +## Requirements +- Python 3.9+ +- AWS account with Bedrock access +- TEN framework with STT and TTS plugins +- Dependencies listed in requirements.txt + +## Installation + +1. Install dependencies: +```bash +pip install -r requirements.txt +``` + +2. Configure AWS credentials: +- Set up AWS credentials with Bedrock access +- Update the api_key in configuration + +## Configuration + +The extension can be configured through manifest.json properties: +- `base_uri`: Bedrock API endpoint +- `region`: AWS region for Bedrock +- `aws_access_key_id`: AWS access key ID +- `aws_secret_access_key`: AWS secret access key +- `model_id`: Bedrock Nova model ID +- `language`: Language code for STT/TTS +- See manifest.json for full configuration options + +## Input Truncation Logic + +The extension implements smart input truncation: + +1. Duration-based truncation: + - Automatically truncates input exceeding 30 seconds + +2. Silence-based truncation: + - Triggers when silence exceeds 2 seconds + +3. Manual truncation: + - Supports user-initiated truncation + +## Architecture + +1. Audio Processing: + - Uses TEN framework's STT plugin for audio recognition + - Buffers and processes audio in real-time + - Provides intermediate and final transcripts + +2. Nova Model Integration: + - Combines transcribed text with video input + - Sends to Bedrock's Nova model for processing + - Handles responses and error conditions + +3. Speech Synthesis: + - Converts Nova model responses to speech + - Uses TEN framework's TTS plugin + - Synchronizes with video output + +## API Usage + +### Commands + +1. Flush Command: +```python +cmd = Cmd.create("flush") +await ten_env.send_cmd(cmd) +``` + +2. User Events: +```python +# User joined +cmd = Cmd.create("on_user_joined") +await ten_env.send_cmd(cmd) + +# User left +cmd = Cmd.create("on_user_left") +await ten_env.send_cmd(cmd) +``` + +## Contributing +1. Fork the repository +2. Create a feature branch +3. Submit a pull request \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/__init__.py b/agents/ten_packages/extension/bedrock_llm_python/__init__.py index 31694384..72593ab2 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/__init__.py +++ b/agents/ten_packages/extension/bedrock_llm_python/__init__.py @@ -1 +1,6 @@ -from . import bedrock_llm_extension +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from . import addon diff --git a/agents/ten_packages/extension/bedrock_llm_python/addon.py b/agents/ten_packages/extension/bedrock_llm_python/addon.py new file mode 100644 index 00000000..8b2cb8da --- /dev/null +++ b/agents/ten_packages/extension/bedrock_llm_python/addon.py @@ -0,0 +1,18 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import BedrockLLMExtension + + +@register_addon_as_extension("bedrock_llm_python") +class LLMExtensionExtensionAddon(Addon): + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + ten_env.log_info("on_create_instance") + ten_env.on_create_instance_done(BedrockLLMExtension(name), context) diff --git a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py deleted file mode 100644 index 92232b8d..00000000 --- a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py +++ /dev/null @@ -1,84 +0,0 @@ -import boto3 -from ten import TenEnv - - -class BedrockLLMConfig: - def __init__( - self, - region: str, - access_key: str, - secret_key: str, - model: str, - prompt: str, - top_p: float, - temperature: float, - max_tokens: int, - ): - self.region = region - self.access_key = access_key - self.secret_key = secret_key - self.model = model - self.prompt = prompt - self.top_p = top_p - self.temperature = temperature - self.max_tokens = max_tokens - - @classmethod - def default_config(cls): - return cls( - region="us-east-1", - access_key="", - secret_key="", - model="anthropic.claude-3-5-sonnet-20240620-v1:0", # Defaults to Claude 3.5, supported model list: https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference.html - # system prompt - prompt="You are a voice assistant who talks in a conversational way and can chat with me like my friends. I will speak to you in English or Chinese, and you will answer in the corrected and improved version of my text with the language I use. Don’t talk like a robot, instead I would like you to talk like a real human with emotions. I will use your answer for text-to-speech, so don’t return me any meaningless characters. I want you to be helpful, when I’m asking you for advice, give me precise, practical and useful advice instead of being vague. When giving me a list of options, express the options in a narrative way instead of bullet points.", - top_p=1.0, - temperature=0.1, - max_tokens=512, - ) - - -class BedrockLLM: - client = None - - def __init__(self, config: BedrockLLMConfig, ten_env: TenEnv): - self.config = config - self.ten_env = ten_env - - if config.access_key and config.secret_key: - self.ten_env.log_info(f"BedrockLLM initialized with access key: {config.access_key}") - - self.client = boto3.client( - service_name="bedrock-runtime", - region_name=config.region, - aws_access_key_id=config.access_key, - aws_secret_access_key=config.secret_key, - ) - else: - self.ten_env.log_info( - "BedrockLLM initialized without access key, using default credentials provider chain.") - self.client = boto3.client( - service_name="bedrock-runtime", region_name=config.region - ) - - def get_converse_stream(self, messages): - bedrock_req_params = { - "modelId": self.config.model, - "messages": messages, - "inferenceConfig": { - "temperature": self.config.temperature, - "maxTokens": self.config.max_tokens, - "topP": self.config.top_p, - # "stopSequences": [], - }, - # "additionalModelRequestFields": additional_model_fields, - } - - if self.config.prompt: - bedrock_req_params["system"] = [{"text": self.config.prompt}] - - try: - response = self.client.converse_stream(**bedrock_req_params) - return response - except Exception as e: - raise RuntimeError(f"GetConverseStream failed, err: {e}") from e diff --git a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py deleted file mode 100644 index 80aced02..00000000 --- a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py +++ /dev/null @@ -1,374 +0,0 @@ -from .bedrock_llm import BedrockLLM, BedrockLLMConfig -from datetime import datetime -from threading import Thread -from ten import ( - Addon, - Extension, - register_addon_as_extension, - TenEnv, - Cmd, - Data, - StatusCode, - CmdResult, -) - - -CMD_IN_FLUSH = "flush" -CMD_OUT_FLUSH = "flush" -DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" -DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" -DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" -DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT = "end_of_segment" - -PROPERTY_REGION = "region" # Optional -PROPERTY_ACCESS_KEY = "access_key" # Optional -PROPERTY_SECRET_KEY = "secret_key" # Optional -PROPERTY_MODEL = "model" # Optional -PROPERTY_PROMPT = "prompt" # Optional -PROPERTY_TEMPERATURE = "temperature" # Optional -PROPERTY_TOP_P = "top_p" # Optional -PROPERTY_MAX_TOKENS = "max_tokens" # Optional -PROPERTY_GREETING = "greeting" # Optional -PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional - - -def get_current_time(): - # Get the current time - start_time = datetime.now() - # Get the number of microseconds since the Unix epoch - unix_microseconds = int(start_time.timestamp() * 1_000_000) - return unix_microseconds - - -def is_punctuation(char): - if char in [",", ",", ".", "。", "?", "?", "!", "!"]: - return True - return False - - -def parse_sentence(sentence, content): - remain = "" - found_punc = False - - for char in content: - if not found_punc: - sentence += char - else: - remain += char - - if not found_punc and is_punctuation(char): - found_punc = True - - return sentence, remain, found_punc - - -class BedrockLLMExtension(Extension): - memory = [] - max_memory_length = 10 - outdate_ts = 0 - bedrock_llm = None - - def on_start(self, ten_env: TenEnv) -> None: - ten_env.log_info("BedrockLLMExtension on_start") - # Prepare configuration - bedrock_llm_config = BedrockLLMConfig.default_config() - - for optional_str_param in [ - PROPERTY_REGION, - PROPERTY_ACCESS_KEY, - PROPERTY_SECRET_KEY, - PROPERTY_MODEL, - PROPERTY_PROMPT, - ]: - try: - value = ten_env.get_property_string(optional_str_param).strip() - if value: - bedrock_llm_config.__setattr__(optional_str_param, value) - except Exception as err: - ten_env.log_debug( - f"GetProperty optional {optional_str_param} failed, err: {err}. Using default value: {bedrock_llm_config.__getattribute__(optional_str_param)}" - ) - - for optional_float_param in [PROPERTY_TEMPERATURE, PROPERTY_TOP_P]: - try: - value = ten_env.get_property_float(optional_float_param) - if value: - bedrock_llm_config.__setattr__(optional_float_param, value) - except Exception as err: - ten_env.log_debug( - f"GetProperty optional {optional_float_param} failed, err: {err}. Using default value: {bedrock_llm_config.__getattribute__(optional_float_param)}" - ) - - try: - max_tokens = ten_env.get_property_int(PROPERTY_MAX_TOKENS) - if max_tokens > 0: - bedrock_llm_config.max_tokens = int(max_tokens) - except Exception as err: - ten_env.log_debug( - f"GetProperty optional {PROPERTY_MAX_TOKENS} failed, err: {err}. Using default value: {bedrock_llm_config.max_tokens}" - ) - - try: - greeting = ten_env.get_property_string(PROPERTY_GREETING) - except Exception as err: - ten_env.log_debug( - f"GetProperty optional {PROPERTY_GREETING} failed, err: {err}." - ) - - try: - prop_max_memory_length = ten_env.get_property_int( - PROPERTY_MAX_MEMORY_LENGTH - ) - if prop_max_memory_length > 0: - self.max_memory_length = int(prop_max_memory_length) - except Exception as err: - ten_env.log_debug( - f"GetProperty optional {PROPERTY_MAX_MEMORY_LENGTH} failed, err: {err}." - ) - - # Create bedrockLLM instance - try: - self.bedrock_llm = BedrockLLM(bedrock_llm_config, ten_env) - ten_env.log_info( - f"newBedrockLLM succeed with max_tokens: {bedrock_llm_config.max_tokens}, model: {bedrock_llm_config.model}" - ) - except Exception as err: - ten_env.log_error(f"newBedrockLLM failed, err: {err}") - - # Send greeting if available - if greeting: - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, greeting - ) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True - ) - ten_env.send_data(output_data) - ten_env.log_info(f"greeting [{greeting}] sent") - except Exception as err: - ten_env.log_info(f"greeting [{greeting}] send failed, err: {err}") - ten_env.on_start_done() - - def on_stop(self, ten_env: TenEnv) -> None: - ten_env.log_info("BedrockLLMExtension on_stop") - ten_env.on_stop_done() - - def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: - ten_env.log_info("BedrockLLMExtension on_cmd") - cmd_json = cmd.to_json() - ten_env.log_info(f"BedrockLLMExtension on_cmd json: {cmd_json}") - - cmd_name = cmd.get_name() - - if cmd_name == CMD_IN_FLUSH: - self.outdate_ts = get_current_time() - cmd_out = Cmd.create(CMD_OUT_FLUSH) - ten_env.send_cmd(cmd_out, None) - ten_env.log_info("BedrockLLMExtension on_cmd sent flush") - else: - ten_env.log_info(f"BedrockLLMExtension on_cmd unknown cmd: {cmd_name}") - cmd_result = CmdResult.create(StatusCode.ERROR) - cmd_result.set_property_string("detail", "unknown cmd") - ten_env.return_result(cmd_result, cmd) - return - - cmd_result = CmdResult.create(StatusCode.OK) - cmd_result.set_property_string("detail", "success") - ten_env.return_result(cmd_result, cmd) - - def on_data(self, ten_env: TenEnv, data: Data) -> None: - """ - on_data receives data from ten graph. - current suppotend data: - - name: text_data - example: - {name: text_data, properties: {text: "hello"} - """ - ten_env.log_info("BedrockLLMExtension on_data") - - # Assume 'data' is an object from which we can get properties - try: - is_final = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) - if not is_final: - ten_env.log_info("ignore non-final input") - return - except Exception as err: - ten_env.log_info( - f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL} failed, err: {err}" - ) - return - - # Get input text - try: - input_text = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_TEXT) - if not input_text: - ten_env.log_info("ignore empty text") - return - ten_env.log_info(f"OnData input text: [{input_text}]") - except Exception as err: - ten_env.log_info( - f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_TEXT} failed, err: {err}" - ) - return - - # Prepare memory. A conversation must alternate between user and assistant roles - while len(self.memory): - if len(self.memory) > self.max_memory_length: - ten_env.log_debug( - f"pop out first message, reason: memory length limit: `{self.memory[0]}`" - ) - self.memory.pop(0) - elif self.memory[0]["role"] == "assistant": - ten_env.log_debug( - f"pop out first message, reason: messages can not start with assistant: `{self.memory[0]}`" - ) - self.memory.pop(0) - else: - break - - if len(self.memory) and self.memory[-1]["role"] == "user": - # if last user input got empty response, append current user input. - ten_env.log_debug( - "found last message with role `user`, will append this input into last user input" - ) - self.memory[-1]["content"].append({"text": input_text}) - else: - self.memory.append({"role": "user", "content": [{"text": input_text}]}) - - def converse_stream_worker(start_time, input_text, memory): - try: - ten_env.log_info( - f"GetConverseStream for input text: [{input_text}] memory: {memory}" - ) - - # Get result from Bedrock - resp = self.bedrock_llm.get_converse_stream(memory) - if resp is None or resp.get("stream") is None: - ten_env.log_info( - f"GetConverseStream for input text: [{input_text}] failed" - ) - return - - stream = resp.get("stream") - sentence = "" - full_content = "" - first_sentence_sent = False - - for event in stream: - # allow 100ms buffer time, in case interruptor's flush cmd comes just after on_data event - if (start_time + 100_000) < self.outdate_ts: - ten_env.log_info( - f"GetConverseStream recv interrupt and flushing for input text: [{input_text}], startTs: {start_time}, outdateTs: {self.outdate_ts}, delta > 100ms" - ) - break - - if "contentBlockDelta" in event: - delta_types = event["contentBlockDelta"]["delta"].keys() - # ignore other types of content: e.g toolUse - if "text" in delta_types: - content = event["contentBlockDelta"]["delta"]["text"] - elif ( - "internalServerException" in event - or "modelStreamErrorException" in event - or "throttlingException" in event - or "validationException" in event - ): - ten_env.log_error(f"GetConverseStream Error occured: {event}") - break - else: - # ingore other events - continue - - full_content += content - - while True: - sentence, content, sentence_is_final = parse_sentence( - sentence, content - ) - if not sentence or not sentence_is_final: - ten_env.log_info(f"sentence [{sentence}] is empty or not final") - break - ten_env.log_info( - f"GetConverseStream recv for input text: [{input_text}] got sentence: [{sentence}]" - ) - - # send sentence - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence - ) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, False - ) - ten_env.send_data(output_data) - ten_env.log_info( - f"GetConverseStream recv for input text: [{input_text}] sent sentence [{sentence}]" - ) - except Exception as err: - ten_env.log_info( - f"GetConverseStream recv for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" - ) - break - - sentence = "" - if not first_sentence_sent: - first_sentence_sent = True - ten_env.log_info( - f"GetConverseStream recv for input text: [{input_text}] first sentence sent, first_sentence_latency {get_current_time() - start_time}ms" - ) - - if len(full_content.strip()): - # remember response as assistant content in memory - if memory and memory[-1]["role"] == "assistant": - memory[-1]["content"].append({"text": full_content}) - else: - memory.append( - {"role": "assistant", "content": [{"text": full_content}]} - ) - else: - # can not put empty model response into memory - ten_env.log_error( - f"GetConverseStream recv for input text: [{input_text}] failed: empty response [{full_content}]" - ) - return - - # send end of segment - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence - ) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True - ) - ten_env.send_data(output_data) - ten_env.log_info( - f"GetConverseStream for input text: [{input_text}] end of segment with sentence [{sentence}] sent" - ) - except Exception as err: - ten_env.log_info( - f"GetConverseStream for input text: [{input_text}] end of segment with sentence [{sentence}] send failed, err: {err}" - ) - - except Exception as e: - ten_env.log_info( - f"GetConverseStream for input text: [{input_text}] failed, err: {e}" - ) - - # Start thread to request and read responses from OpenAI - start_time = get_current_time() - thread = Thread( - target=converse_stream_worker, args=(start_time, input_text, self.memory) - ) - thread.start() - ten_env.log_info("BedrockLLMExtension on_data end") - - -@register_addon_as_extension("bedrock_llm_python") -class BedrockLLMExtensionAddon(Addon): - def on_create_instance(self, ten: TenEnv, addon_name: str, context) -> None: - ten.log_info("on_create_instance") - ten.on_create_instance_done(BedrockLLMExtension(addon_name), context) diff --git a/agents/ten_packages/extension/bedrock_llm_python/extension.py b/agents/ten_packages/extension/bedrock_llm_python/extension.py new file mode 100644 index 00000000..2cee8cc3 --- /dev/null +++ b/agents/ten_packages/extension/bedrock_llm_python/extension.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +# +# Agora Real Time Engagement +# Created by Cline in 2024-03. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +import asyncio +import json +import os +import time +import traceback +from enum import Enum +from typing import Optional, List, Dict, Any + +import boto3 +from ten import ( + AsyncTenEnv, + Extension, + Cmd, + StatusCode, + CmdResult, + Data, +) +from ten_ai_base import BaseConfig, ChatMemory, EVENT_MEMORY_EXPIRED, EVENT_MEMORY_APPENDED +from ten_ai_base.llm import AsyncLLMBaseExtension +from dataclasses import dataclass + +from .utils import ( + rgb2base64jpeg, + filter_images, + parse_sentence, + get_greeting_text, + merge_images +) + +# Constants +MAX_IMAGE_COUNT = 20 +ONE_BATCH_SEND_COUNT = 6 +VIDEO_FRAME_INTERVAL = 0.5 + +# Command definitions +CMD_IN_FLUSH = "flush" +CMD_IN_ON_USER_JOINED = "on_user_joined" +CMD_IN_ON_USER_LEFT = "on_user_left" +CMD_OUT_FLUSH = "flush" + +# Data property definitions +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT = "end_of_segment" + +class Role(str, Enum): + """Role definitions for chat participants.""" + User = "user" + Assistant = "assistant" + +@dataclass +class BedrockLLMConfig(BaseConfig): + """Configuration for BedrockV2V extension.""" + region: str = "us-east-1" + model_id: str = "us.amazon.nova-lite-v1:0" + access_key_id: str = "" + secret_access_key: str = "" + language: str = "en-US" + prompt: str = "You are an intelligent assistant with real-time interaction capabilities. You will be presented with a series of images that represent a video sequence. Describe what you see directly, as if you were observing the scene in real-time. Do not mention that you are looking at images or a video. Instead, narrate the scene and actions as they unfold. Engage in conversation with the user based on this visual input and their questions, maintaining a concise and clear." + temperature: float = 0.7 + max_tokens: int = 256 + tokP: str = 0.5 + topK: str = 10 + max_duration: int = 30 + vendor: str = "" + stream_id: int = 0 + dump: bool = False + max_history: int = 10 + is_memory_enabled: bool = False + is_enable_video: bool = False + greeting: str = "Hello, I'm here to help you. How can I assist you today?" + + def build_ctx(self) -> dict: + """Build context dictionary from configuration.""" + return { + "language": self.language, + "model": self.model_id, + } + +class BedrockLLMExtension(AsyncLLMBaseExtension): + """Extension for handling video-to-video processing using AWS Bedrock.""" + + def __init__(self, name: str): + super().__init__(name) + self.config: Optional[BedrockLLMConfig] = None + self.stopped: bool = False + self.memory: list = [] + self.users_count: int = 0 + self.bedrock_client = None + self.image_buffers: list = [] + self.image_queue = asyncio.Queue() + self.text_buffer: str = "" + self.input_start_time: float = 0 + self.processing_times = [] + self.ten_env = None + + async def on_init(self, ten_env: AsyncTenEnv) -> None: + """Initialize the extension.""" + await super().on_init(ten_env) + ten_env.log_info("BedrockV2VExtension initialized") + + async def on_start(self, ten_env: AsyncTenEnv) -> None: + """Start the extension and set up required components.""" + await super().on_start(ten_env) + ten_env.log_info("BedrockV2VExtension starting") + + try: + self.config = await BedrockLLMConfig.create_async(ten_env=ten_env) + ten_env.log_info(f"Configuration: {self.config}") + + if not self.config.access_key_id or not self.config.secret_access_key: + ten_env.log_error("AWS credentials (access_key_id and secret_access_key) are required") + return + + await self._setup_components(ten_env) + + except Exception as e: + traceback.print_exc() + ten_env.log_error(f"Failed to initialize: {e}") + + async def _setup_components(self, ten_env: AsyncTenEnv) -> None: + """Set up extension components.""" + self.memory = [] + self.ctx = self.config.build_ctx() + self.ten_env = ten_env + + self.loop = asyncio.get_event_loop() + self.loop.create_task(self._on_video(ten_env)) + + async def on_stop(self, ten_env: AsyncTenEnv) -> None: + """Stop the extension.""" + await super().on_stop(ten_env) + ten_env.log_info("BedrockV2VExtension stopping") + self.stopped = True + + async def on_data(self, ten_env: AsyncTenEnv, data) -> None: + """Handle incoming data.""" + ten_env.log_info("on_data receive begin...") + data_name = data.get_name() + ten_env.log_info(f"on_data name {data_name}") + + try: + is_final = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + input_text = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_TEXT) + + if not is_final: + ten_env.log_info("ignore non-final input") + return + + if not input_text: + ten_env.log_info("ignore empty text") + return + + ten_env.log_info(f"OnData input text: [{input_text}]") + self.text_buffer = input_text + await self._handle_input_truncation("is_final") + + except Exception as err: + ten_env.log_info(f"Error processing data: {err}") + + async def on_video_frame(self, ten_env: AsyncTenEnv, video_frame) -> None: + """Handle incoming video frames.""" + if not self.config.is_enable_video: + return + image_data = video_frame.get_buf() + image_width = video_frame.get_width() + image_height = video_frame.get_height() + await self.image_queue.put([image_data, image_width, image_height]) + + async def _on_video(self, ten_env: AsyncTenEnv): + """Process video frames from the queue.""" + while True: + try: + [image_data, image_width, image_height] = await self.image_queue.get() + + #ten_env.log_info(f"image_width: {image_width}, image_height: {image_height}, image_size: {len(bytes(image_data)) / 1024 / 1024}MB") + + frame_buffer = rgb2base64jpeg(image_data, image_width, image_height) + + self.image_buffers.append(frame_buffer) + + #ten_env.log_info(f"Processed frame, width: {image_width}, height: {image_height}, frame_buffer_size: {len(frame_buffer) / 1024 / 1024}MB") + + while len(self.image_buffers) > MAX_IMAGE_COUNT: + self.image_buffers.pop(0) + + # Skip remaining frames for the interval + while not self.image_queue.empty(): + await self.image_queue.get() + + await asyncio.sleep(VIDEO_FRAME_INTERVAL) + + except Exception as e: + traceback.print_exc() + ten_env.log_error(f"Error processing video frame: {e}") + + async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + """Handle incoming commands.""" + cmd_name = cmd.get_name() + ten_env.log_info(f"Command received: {cmd_name}") + + try: + if cmd_name == CMD_IN_FLUSH: + await ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH)) + elif cmd_name == CMD_IN_ON_USER_JOINED: + await self._handle_user_joined() + elif cmd_name == CMD_IN_ON_USER_LEFT: + self.users_count -= 1 + else: + await super().on_cmd(ten_env, cmd) + return + + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("detail", "success") + await ten_env.return_result(cmd_result, cmd) + + except Exception as e: + traceback.print_exc() + ten_env.log_error(f"Error handling command {cmd_name}: {e}") + cmd_result = CmdResult.create(StatusCode.ERROR) + cmd_result.set_property_string("detail", str(e)) + await ten_env.return_result(cmd_result, cmd) + + async def _handle_user_joined(self) -> None: + """Handle user joined event.""" + self.users_count += 1 + if self.users_count == 1: + await self._greeting() + + async def _handle_input_truncation(self, reason: str): + """Handle input truncation events.""" + try: + self.ten_env.log_info(f"Input truncated due to: {reason}") + + if self.text_buffer: + await self._call_nova_model(self.text_buffer, self.image_buffers) + + self._reset_state() + + except Exception as e: + traceback.print_exc() + self.ten_env.log_error(f"Error handling input truncation: {e}") + + def _reset_state(self): + """Reset internal state.""" + self.text_buffer = "" + self.image_buffers = [] + self.input_start_time = 0 + + async def _initialize_aws_clients(self): + """Initialize AWS clients.""" + try: + if not self.bedrock_client: + self.bedrock_client = boto3.client('bedrock-runtime', + aws_access_key_id=self.config.access_key_id, + aws_secret_access_key=self.config.secret_access_key, + region_name=self.config.region + ) + except Exception as e: + traceback.print_exc() + self.ten_env.log_error(f"Error initializing AWS clients: {e}") + raise + + async def _greeting(self) -> None: + """Send greeting message to the user.""" + if self.users_count == 1: + text = self.config.greeting or get_greeting_text(self.config.language) + self.ten_env.log_info(f"send greeting {text}") + await self._send_text_data(text, True, Role.Assistant) + + async def _send_text_data(self, text: str, end_of_segment: bool, role: Role): + """Send text data to the user.""" + try: + d = Data.create("text_data") + d.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, text) + d.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment) + d.set_property_string("role", role) + asyncio.create_task(self.ten_env.send_data(d)) + except Exception as e: + self.ten_env.log_error(f"Error sending text data: {e}") + + async def _call_nova_model(self, input_text: str, image_buffers: List[bytes]) -> None: + """Call Bedrock's Nova model with text and video input.""" + try: + if not self.bedrock_client: + await self._initialize_aws_clients() + + if not input_text: + self.ten_env.log_info("Text input is empty") + return + + contents = [] + + # Process images + if image_buffers: + filtered_buffers = filter_images(image_buffers, ONE_BATCH_SEND_COUNT) + for image_data in filtered_buffers: + contents.append({ + "image": { + "format": 'jpeg', + "source": { + "bytes": image_data + } + } + }) + # Prepare memory + while len(self.memory) > self.config.max_history: + self.memory.pop(0) + while len(self.memory) > 0 and self.memory[0]["role"] == "assistant": + self.memory.pop(0) + while len(self.memory) > 0 and self.memory[-1]["role"] == "user": + self.memory.pop(-1) + + # Prepare request + contents.append({"text": input_text}) + messages = [] + for m in self.memory: + # Convert string content to list format if needed + m_content = m["content"] + if isinstance(m_content, str): + m_content = [{"text": m_content}] + messages.append({ + "role": m["role"], + "content": m_content + }) + messages.append({ + "role": "user", + "content": contents + }) + + inf_params = { + "maxTokens": self.config.max_tokens, + "topP": self.config.tokP, + "temperature": self.config.temperature + } + + additional_config = { + "inferenceConfig": { + "topK": self.config.topK + } + } + + system = [{ + "text": self.config.prompt + }] + + # Make API call + start_time = time.time() + response = self.bedrock_client.converse_stream( + modelId=self.config.model_id, + system=system, + messages=messages, + inferenceConfig=inf_params, + additionalModelRequestFields=additional_config, + ) + full_content = await self._process_stream_response(response, start_time) + # async append memory + async def async_append_memory(): + if not self.config.is_memory_enabled: + return + image = merge_images(image_buffers) + contents = [] + if image: + contents.append({ + "image": { + "format": 'jpeg', + "source": { + "bytes": image + } + } + }) + contents.append({"text": input_text}) + self.memory.append({"role": Role.User, "content": contents}) + self.memory.append({"role": Role.Assistant, "content": [{"text": full_content}]}) + + asyncio.create_task(async_append_memory()) + except Exception as e: + traceback.print_exc() + self.ten_env.log_error(f"Error calling Nova model: {e}") + + except Exception as e: + self.ten_env.log_error(f"Error appending memory: {e}") + + async def _process_stream_response(self, response: Dict, start_time: float): + """Process streaming response from Nova model.""" + sentence = "" + full_content = "" + first_sentence_sent = False + + for event in response.get('stream'): + if "contentBlockDelta" in event: + if "text" in event["contentBlockDelta"]["delta"]: + content = event["contentBlockDelta"]["delta"]["text"] + full_content += content + + while True: + sentence, content, sentence_is_final = parse_sentence(sentence, content) + if not sentence or not sentence_is_final: + break + + self.ten_env.log_info(f"Processing sentence: [{sentence}]") + await self._send_text_data(sentence, False, Role.Assistant) + + if not first_sentence_sent: + first_sentence_sent = True + self.ten_env.log_info(f"First sentence latency: {(time.time() - start_time)*1000}ms") + + sentence = "" + + elif any(key in event for key in ["internalServerException", "modelStreamErrorException", + "throttlingException", "validationException"]): + self.ten_env.log_error(f"Stream error: {event}") + break + + elif 'metadata' in event: + if 'metrics' in event['metadata']: + self.ten_env.log_info(f"Nova model latency: {event['metadata']['metrics']['latencyMs']}ms") + + # Send final sentence + await self._send_text_data(sentence, True, Role.Assistant) + self.ten_env.log_info(f"Final sentence sent: [{sentence}]") + # Update metrics + self.processing_times.append(time.time() - start_time) + return full_content diff --git a/agents/ten_packages/extension/bedrock_llm_python/manifest.json b/agents/ten_packages/extension/bedrock_llm_python/manifest.json index 41eda41e..55928537 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/manifest.json +++ b/agents/ten_packages/extension/bedrock_llm_python/manifest.json @@ -9,30 +9,67 @@ "version": "0.6" } ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, "api": { "property": { - "access_key": { + "base_uri": { + "type": "string" + }, + "api_key": { "type": "string" }, - "secret_key": { + "api_version": { "type": "string" }, "model": { "type": "string" }, - "max_tokens": { - "type": "int64" + "language": { + "type": "string" }, "prompt": { "type": "string" }, + "temperature": { + "type": "float32" + }, + "max_tokens": { + "type": "int32" + }, + "server_vad": { + "type": "bool" + }, + "input_transcript": { + "type": "bool" + }, + "sample_rate": { + "type": "int32" + }, + "stream_id": { + "type": "int32" + }, + "dump": { + "type": "bool" + }, "greeting": { "type": "string" - }, - "max_memory_length": { - "type": "int64" } }, + "video_frame_in": [ + { + "name": "video_frame", + "property": {} + } + ], "data_in": [ { "name": "text_data", @@ -51,16 +88,69 @@ "type": "string" } } + }, + { + "name": "append", + "property": { + "text": { + "type": "string" + } + } } ], "cmd_in": [ { "name": "flush" + }, + { + "name": "tool_register", + "property": { + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "parameters": { + "type": "string" + } + }, + "required": [ + "name", + "description", + "parameters" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } } ], "cmd_out": [ { "name": "flush" + }, + { + "name": "tool_call", + "property": { + "name": { + "type": "string" + }, + "args": { + "type": "string" + } + }, + "required": [ + "name" + ] + } + ], + "audio_frame_out": [ + { + "name": "pcm_frame" } ] } diff --git a/agents/ten_packages/extension/bedrock_llm_python/property.json b/agents/ten_packages/extension/bedrock_llm_python/property.json index 119decfa..2a6dbf20 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/property.json +++ b/agents/ten_packages/extension/bedrock_llm_python/property.json @@ -1,10 +1,15 @@ { - "region": "us-east-1", - "access_key": "${env:AWS_ACCESS_KEY_ID}", - "secret_key": "${env:AWS_SECRET_ACCESS_KEY}", - "model": "anthropic.claude-3-5-sonnet-20240620-v1:0", + "region": "us-west-2", + "access_key_id": "${env:AWS_ACCESS_KEY_ID}", + "secret_access_key": "${env:AWS_SECRET_ACCESS_KEY}", + "model": "amazon.nova-pro-v1:0", + "temperature": 0.7, "max_tokens": 512, - "prompt": "", - "greeting": "TEN Agent connected. How can I help you today?", - "max_memory_length": 10 + "topP": 0.5, + "topK": 20, + "prompt": "Now you are an intelligent assistant with real-time interaction capabilities. I will provide you with a series of real-time video image information. Please understand these images as video frames. Based on the images and the user's input, engage in a conversation with the user, remembering the dialogue content in a concise and clear manner.", + "greeting": "TEN Agent connected. I am nova, How can I help you today?", + "max_memory_length": 10, + "is_memory_enabled": false, + "is_enable_video": false } \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/requirements.txt b/agents/ten_packages/extension/bedrock_llm_python/requirements.txt index f9f40f55..3fc6cfbe 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/requirements.txt +++ b/agents/ten_packages/extension/bedrock_llm_python/requirements.txt @@ -1,2 +1,4 @@ -pillow==10.4.0 -boto3==1.34.143 \ No newline at end of file +boto3 +numpy +python-dotenv +asyncio diff --git a/agents/ten_packages/extension/bedrock_llm_python/utils.py b/agents/ten_packages/extension/bedrock_llm_python/utils.py new file mode 100644 index 00000000..468a09c2 --- /dev/null +++ b/agents/ten_packages/extension/bedrock_llm_python/utils.py @@ -0,0 +1,119 @@ +"""Utility functions for BedrockV2V extension.""" +import base64 +from io import BytesIO +from PIL import Image +from typing import List, Tuple, Any + +def is_punctuation(char: str) -> bool: + """Check if a character is a punctuation mark.""" + return char in [",", ",", ".", "。", "?", "?", "!", "!"] + +def parse_sentence(sentence: str, content: str) -> Tuple[str, str, bool]: + """Parse a sentence and return the complete sentence, remaining content, and completion status.""" + remain = "" + found_punc = False + + for char in content: + if not found_punc: + sentence += char + else: + remain += char + + if not found_punc and is_punctuation(char): + found_punc = True + + return sentence, remain, found_punc + +def rgb2base64jpeg(rgb_data: bytes, width: int, height: int) -> bytes: + """Convert RGB data to JPEG format.""" + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 640) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + + return buffered.getvalue() + +def resize_image_keep_aspect(image: Image.Image, max_size: int = 512) -> Image.Image: + """Resize an image while maintaining its aspect ratio.""" + width, height = image.size + + if width <= max_size and height <= max_size: + return image + + aspect_ratio = width / height + + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + return image.resize((new_width, new_height)) + +def filter_images(image_array: List[Any], max_images: int = 10) -> List[Any]: + """Filter images to maintain a maximum count while preserving temporal distribution.""" + if len(image_array) <= max_images: + return image_array + + result = [] + skip = len(image_array) // max_images + + for i in range(0, len(image_array), skip): + result.append(image_array[i]) + if len(result) == max_images: + break + + return result + +# 将多张图片合并成一张图片, 图片的数量不超过max_images张,合并后的大小为width +def merge_images(image_array: List[Any], max_images: int = 4, width: int = 512) -> bytes: + """Merge multiple images into one image.""" + if len(image_array) == 0: + return b"" + if len(image_array) > max_images: + # Filter images to maintain a maximum count while preserving temporal distribution + image_array = filter_images(image_array, max_images) + + total_images = len(image_array) + # Calculate the number of rows and columns for the grid + rows = int((total_images - 1) / 2) + 1 + cols = 2 if total_images > 1 else 1 + + # Calculate the size of each image in the grid + image_width = width // cols + image_height = image_width + + # Create a new image to store the grid + grid = Image.new("RGB", (width, image_height * rows)) + + # Paste each image into the grid + for i, image in enumerate(image_array): + row = i // cols + col = i % cols + image = Image.open(BytesIO(image)) + image = resize_image_keep_aspect(image, image_width) + grid.paste(image, (col * image_width, row * image_height)) + + # Save the grid to a BytesIO object in JPEG format + buffered = BytesIO() + grid.save(buffered, format="JPEG") + + return buffered.getvalue() + + +def get_greeting_text(language: str) -> str: + """Get appropriate greeting text based on language.""" + greetings = { + "zh-CN": "你好。", + "ja-JP": "こんにちは", + "ko-KR": "안녕하세요", + "en-US": "Hi, there." + } + return greetings.get(language, "Hi, there.") diff --git a/agents/ten_packages/extension/polly_tts/extension.py b/agents/ten_packages/extension/polly_tts/extension.py index ad8a659c..05ae0a37 100644 --- a/agents/ten_packages/extension/polly_tts/extension.py +++ b/agents/ten_packages/extension/polly_tts/extension.py @@ -51,7 +51,7 @@ async def on_request_tts( self, ten_env: AsyncTenEnv, input_text: str, end_of_segment: bool ) -> None: try: - data = self.client.text_to_speech_stream(ten_env, input_text) + data = self.client.text_to_speech_stream(ten_env, input_text, end_of_segment) async for frame in data: await self.send_audio_out( ten_env, frame, sample_rate=self.client.config.sample_rate @@ -60,4 +60,12 @@ async def on_request_tts( ten_env.log_error(f"on_request_tts failed: {traceback.format_exc()}") async def on_cancel_tts(self, ten_env: AsyncTenEnv) -> None: - return await super().on_cancel_tts(ten_env) + """ + Cancel ongoing TTS operation + """ + await super().on_cancel_tts(ten_env) + try: + if self.client: + self.client._on_cancel_tts(ten_env) + except Exception: + ten_env.log_error(f"on_cancel_tts failed: {traceback.format_exc()}") diff --git a/agents/ten_packages/extension/polly_tts/polly_tts.py b/agents/ten_packages/extension/polly_tts/polly_tts.py index 26a5073f..11a6071f 100644 --- a/agents/ten_packages/extension/polly_tts/polly_tts.py +++ b/agents/ten_packages/extension/polly_tts/polly_tts.py @@ -14,7 +14,7 @@ class PollyTTSConfig(BaseConfig): region: str = "us-east-1" access_key: str = "" secret_key: str = "" - engine: str = "generative" + engine: str = "neural" voice: str = ( "Matthew" # https://docs.aws.amazon.com/polly/latest/dg/available-voices.html ) @@ -88,7 +88,7 @@ def _synthesize(self, text, ten_env: AsyncTenEnv): return audio_stream, visemes async def text_to_speech_stream( - self, ten_env: AsyncTenEnv, text: str + self, ten_env: AsyncTenEnv, text: str, end_of_segment: bool ) -> AsyncIterator[bytes]: inputText = text if len(inputText) == 0: @@ -98,5 +98,19 @@ async def text_to_speech_stream( with closing(audio_stream) as stream: for chunk in stream.iter_chunks(chunk_size=self.frame_size): yield chunk + if end_of_segment: + ten_env.log_debug("End of segment reached") except Exception: ten_env.log_error(traceback.format_exc()) + + def _on_cancel_tts(self, ten_env: AsyncTenEnv) -> None: + """ + Cancel ongoing TTS operation + """ + try: + if hasattr(self, 'audio_stream') and self.audio_stream: + self.audio_stream.close() + self.audio_stream = None + ten_env.log_debug("TTS cancelled successfully") + except Exception: + ten_env.log_error(f"Failed to cancel TTS: {traceback.format_exc()}") diff --git a/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_addon.py b/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_addon.py index 80ee94b4..879277cb 100644 --- a/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_addon.py +++ b/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_addon.py @@ -4,7 +4,6 @@ TenEnv, ) - @register_addon_as_extension("transcribe_asr_python") class TranscribeAsrExtensionAddon(Addon): def on_create_instance(self, ten: TenEnv, addon_name: str, context) -> None: diff --git a/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_extension.py b/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_extension.py index c2d17cfd..2a5df17c 100644 --- a/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_extension.py +++ b/agents/ten_packages/extension/transcribe_asr_python/transcribe_asr_extension.py @@ -63,14 +63,24 @@ def on_start(self, ten: TenEnv) -> None: ten.on_start_done() def put_pcm_frame(self, ten: TenEnv, pcm_frame: AudioFrame) -> None: + if self.stopped: + return + try: - asyncio.run_coroutine_threadsafe( - self.queue.put(pcm_frame), self.loop - ).result(timeout=0.1) - except asyncio.QueueFull: - ten.log_error("Queue is full, dropping frame") + # Use a simpler synchronous approach with put_nowait + if not self.loop.is_closed(): + if self.queue.qsize() < self.queue.maxsize: + self.loop.call_soon_threadsafe( + self.queue.put_nowait, pcm_frame + ) + else: + ten.log_error("Queue is full, dropping frame") + else: + ten.log_error("Event loop is closed, cannot process frame") except Exception as e: - ten.log_error(f"Error putting frame in queue: {e}") + import traceback + error_msg = f"Error putting frame in queue: {str(e)}\n{traceback.format_exc()}" + ten.log_error(error_msg) def on_audio_frame(self, ten: TenEnv, frame: AudioFrame) -> None: self.put_pcm_frame(ten, pcm_frame=frame) diff --git a/agents/ten_packages/extension/transcribe_asr_python/transcribe_wrapper.py b/agents/ten_packages/extension/transcribe_asr_python/transcribe_wrapper.py index 1a436d97..25a0d1f8 100644 --- a/agents/ten_packages/extension/transcribe_asr_python/transcribe_wrapper.py +++ b/agents/ten_packages/extension/transcribe_asr_python/transcribe_wrapper.py @@ -15,12 +15,15 @@ DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" DATA_OUT_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_OUT_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" +DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT = "end_of_segment" - -def create_and_send_data(ten: TenEnv, text_result: str, is_final: bool): +def create_and_send_data(ten: TenEnv, text_result: str, is_final: bool, stream_id: int = 0): stable_data = Data.create("text_data") stable_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_IS_FINAL, is_final) stable_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, text_result) + stable_data.set_property_int(DATA_OUT_TEXT_DATA_PROPERTY_STREAM_ID, stream_id) + stable_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT, is_final) ten.send_data(stable_data) @@ -75,10 +78,10 @@ async def cleanup(self): self.reset_stream() - async def create_stream(self) -> bool: + async def create_stream(self, stream_id) -> bool: try: self.stream = await self.get_transcribe_stream() - self.handler = TranscribeEventHandler(self.stream.output_stream, self.ten) + self.handler = TranscribeEventHandler(self.stream.output_stream, self.ten, stream_id) self.event_handler_task = asyncio.create_task(self.handler.handle_events()) except Exception as e: self.ten.log_error(str(e)) @@ -89,7 +92,7 @@ async def create_stream(self) -> bool: async def send_frame(self) -> None: while not self.stopped: try: - pcm_frame = await asyncio.wait_for(self.queue.get(), timeout=10.0) + pcm_frame = await asyncio.wait_for(self.queue.get(), timeout=3.0) if pcm_frame is None: self.ten.log_warn("send_frame: exit due to None value got.") @@ -99,10 +102,10 @@ async def send_frame(self) -> None: if not frame_buf: self.ten.log_warn("send_frame: empty pcm_frame detected.") continue - + stream_id = pcm_frame.get_property_int("stream_id") if not self.stream: self.ten.log_info("lazy init stream.") - if not await self.create_stream(): + if not await self.create_stream(stream_id): continue await self.stream.input_stream.send_audio_event(audio_chunk=frame_buf) @@ -110,11 +113,11 @@ async def send_frame(self) -> None: except asyncio.TimeoutError: if self.stream: await self.cleanup() - self.ten.log_debug( + self.ten.log_info( "send_frame: no data for 10s, will close current stream and create a new one when receving new frame." ) else: - self.ten.log_debug("send_frame: waiting for pcm frame.") + self.ten.log_info("send_frame: waiting for pcm frame.") except IOError as e: self.ten.log_error(f"Error in send_frame: {e}") except Exception as e: @@ -149,9 +152,10 @@ def stop(self) -> None: class TranscribeEventHandler(TranscriptResultStreamHandler): - def __init__(self, transcript_result_stream: TranscriptResultStream, ten: TenEnv): + def __init__(self, transcript_result_stream: TranscriptResultStream, ten: TenEnv, stream_id: int = 0): super().__init__(transcript_result_stream) self.ten = ten + self.stream_id = stream_id async def handle_transcript_event(self, transcript_event: TranscriptEvent) -> None: results = transcript_event.transcript.results @@ -172,4 +176,4 @@ async def handle_transcript_event(self, transcript_event: TranscriptEvent) -> No self.ten.log_info(f"got transcript: [{text_result}], is_final: [{is_final}]") - create_and_send_data(ten=self.ten, text_result=text_result, is_final=is_final) + create_and_send_data(ten=self.ten, text_result=text_result, is_final=is_final, stream_id=self.stream_id) diff --git a/demo/src/app/api/agents/start/graph.ts b/demo/src/app/api/agents/start/graph.ts index c65e719e..fd9e9714 100644 --- a/demo/src/app/api/agents/start/graph.ts +++ b/demo/src/app/api/agents/start/graph.ts @@ -226,7 +226,25 @@ export const getGraphProperties = ( "azure_synthesis_voice_name": voiceNameMap[language]["azure"][voiceType] } } + } else if (graphName == "va_nova_multimodal_aws") { + return { + "agora_rtc": { + "agora_asr_language": language, + }, + "llm": { + "greeting": combined_greeting, + }, + "tts": { + "voice": voiceNameMap[language]["polly"][voiceType], + "lang_code": voiceNameMap[language]["polly"]["langCode"], + "engine": voiceNameMap[language]["polly"]["langEngine"], + }, + "stt": { + "lang_code": language, + } + } } + return {} -} +} \ No newline at end of file diff --git a/demo/src/common/constant.ts b/demo/src/common/constant.ts index a6cc2e88..62e65fe6 100644 --- a/demo/src/common/constant.ts +++ b/demo/src/common/constant.ts @@ -94,6 +94,10 @@ export const GRAPH_OPTIONS: GraphOptionItem[] = [ label: "Voice Story Teller with Image Generator", value: "story_teller_stt_integrated", }, + { + label: "Voice Agent / STT + Nova Multimodal + TTS", + value: "va_nova_multimodal_aws", + }, ] export const isRagGraph = (graphName: string) => { @@ -179,4 +183,4 @@ export enum EMobileActiveTab { export const MOBILE_ACTIVE_TAB_MAP = { [EMobileActiveTab.AGENT]: "Agent", [EMobileActiveTab.CHAT]: "Chat", -} +} \ No newline at end of file From 768ab5398b94cf7c85ae6af5381664dd116a11c7 Mon Sep 17 00:00:00 2001 From: zhuermu Date: Sat, 11 Jan 2025 15:04:55 +0000 Subject: [PATCH 2/5] fix: update the type of property of the bedrock_llm_python --- .../extension/bedrock_llm_python/manifest.json | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/agents/ten_packages/extension/bedrock_llm_python/manifest.json b/agents/ten_packages/extension/bedrock_llm_python/manifest.json index 55928537..0c693989 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/manifest.json +++ b/agents/ten_packages/extension/bedrock_llm_python/manifest.json @@ -62,6 +62,21 @@ }, "greeting": { "type": "string" + }, + "max_memory_length": { + "type": "int64" + }, + "is_memory_enabled": { + "type": "bool" + }, + "topP": { + "type": "float32" + }, + "topK": { + "type": "int32" + }, + "is_enable_video": { + "type": "bool" } }, "video_frame_in": [ From 4d03ba04cfe1a98b65a8c8a222191a1ba0c54fc7 Mon Sep 17 00:00:00 2001 From: zhuermu Date: Sun, 12 Jan 2025 01:42:32 +0000 Subject: [PATCH 3/5] docs: comments for Updating Plugin bedrock_llm_python --- agents/ten_packages/extension/bedrock_llm_python/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/ten_packages/extension/bedrock_llm_python/utils.py b/agents/ten_packages/extension/bedrock_llm_python/utils.py index 468a09c2..7d30ccf4 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/utils.py +++ b/agents/ten_packages/extension/bedrock_llm_python/utils.py @@ -72,7 +72,7 @@ def filter_images(image_array: List[Any], max_images: int = 10) -> List[Any]: return result -# 将多张图片合并成一张图片, 图片的数量不超过max_images张,合并后的大小为width +# merge images into one image with a grid layout def merge_images(image_array: List[Any], max_images: int = 4, width: int = 512) -> bytes: """Merge multiple images into one image.""" if len(image_array) == 0: From 7fbb4f7564c8cd7b6678e755dda3935e57babf23 Mon Sep 17 00:00:00 2001 From: zhuermu Date: Mon, 13 Jan 2025 06:26:01 +0000 Subject: [PATCH 4/5] fix: update the string to int type of the property.json and polly tts support Chinese --- agents/examples/demo/property.json | 5 +++-- .../ten_packages/extension/bedrock_llm_python/extension.py | 4 ++-- demo/src/app/api/agents/start/graph.ts | 4 ++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/agents/examples/demo/property.json b/agents/examples/demo/property.json index 7ac274c2..aa5117ff 100644 --- a/agents/examples/demo/property.json +++ b/agents/examples/demo/property.json @@ -1517,7 +1517,8 @@ "agora_asr_vendor_region": "${env:AZURE_STT_REGION|}", "agora_asr_session_control_file_path": "session_control.conf", "subscribe_video_pix_fmt": 4, - "subscribe_video": true + "subscribe_video": true, + "max_memory_length":10 } }, { @@ -1541,7 +1542,7 @@ "property": { "access_key_id": "${env:AWS_ACCESS_KEY_ID}", "greeting": "TEN Agent connected. I am nova, How can I help you today?", - "max_memory_length": "10", + "max_memory_length": 10, "max_tokens": 256, "model": "us.amazon.nova-lite-v1:0", "prompt": "Now you are an intelligent assistant with real-time interaction capabilities. I will provide you with a series of real-time video image information. Please understand these images as video frames. Based on the images and the user's input, engage in a conversation with the user, remembering the dialogue content in a concise and clear manner.", diff --git a/agents/ten_packages/extension/bedrock_llm_python/extension.py b/agents/ten_packages/extension/bedrock_llm_python/extension.py index 2cee8cc3..e2e03493 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/extension.py +++ b/agents/ten_packages/extension/bedrock_llm_python/extension.py @@ -72,7 +72,7 @@ class BedrockLLMConfig(BaseConfig): vendor: str = "" stream_id: int = 0 dump: bool = False - max_history: int = 10 + max_memory_length: int = 10 is_memory_enabled: bool = False is_enable_video: bool = False greeting: str = "Hello, I'm here to help you. How can I assist you today?" @@ -311,7 +311,7 @@ async def _call_nova_model(self, input_text: str, image_buffers: List[bytes]) -> } }) # Prepare memory - while len(self.memory) > self.config.max_history: + while len(self.memory) > self.config.max_memory_length: self.memory.pop(0) while len(self.memory) > 0 and self.memory[0]["role"] == "assistant": self.memory.pop(0) diff --git a/demo/src/app/api/agents/start/graph.ts b/demo/src/app/api/agents/start/graph.ts index fd9e9714..7caa28f3 100644 --- a/demo/src/app/api/agents/start/graph.ts +++ b/demo/src/app/api/agents/start/graph.ts @@ -13,6 +13,8 @@ export const voiceNameMap: LanguageMap = { polly: { male: "Zhiyu", female: "Zhiyu", + langCode: "cmn-CN", + langEngine: "neural" }, openai: { male: "ash", @@ -31,6 +33,8 @@ export const voiceNameMap: LanguageMap = { polly: { male: "Matthew", female: "Ruth", + langCode: "en-US", + langEngine: "generative" }, openai: { male: "ash", From c355566908792adeb87be6943b936907c559941a Mon Sep 17 00:00:00 2001 From: zhuermu Date: Mon, 20 Jan 2025 05:53:19 +0000 Subject: [PATCH 5/5] fix: fix for lint --- .../extension/bedrock_llm_python/extension.py | 33 ++++++++++++++----- .../extension/bedrock_llm_python/utils.py | 1 - .../extension/polly_tts/extension.py | 2 +- .../extension/polly_tts/polly_tts.py | 3 +- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/agents/ten_packages/extension/bedrock_llm_python/extension.py b/agents/ten_packages/extension/bedrock_llm_python/extension.py index e2e03493..1c75fa62 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/extension.py +++ b/agents/ten_packages/extension/bedrock_llm_python/extension.py @@ -5,23 +5,20 @@ # Copyright (c) 2024 Agora IO. All rights reserved. # import asyncio -import json -import os import time import traceback from enum import Enum -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict import boto3 from ten import ( AsyncTenEnv, - Extension, Cmd, StatusCode, CmdResult, Data, ) -from ten_ai_base import BaseConfig, ChatMemory, EVENT_MEMORY_EXPIRED, EVENT_MEMORY_APPENDED +from ten_ai_base import BaseConfig from ten_ai_base.llm import AsyncLLMBaseExtension from dataclasses import dataclass @@ -100,6 +97,7 @@ def __init__(self, name: str): self.input_start_time: float = 0 self.processing_times = [] self.ten_env = None + self.ctx = None async def on_init(self, ten_env: AsyncTenEnv) -> None: """Initialize the extension.""" @@ -165,7 +163,7 @@ async def on_data(self, ten_env: AsyncTenEnv, data) -> None: except Exception as err: ten_env.log_info(f"Error processing data: {err}") - async def on_video_frame(self, ten_env: AsyncTenEnv, video_frame) -> None: + async def on_video_frame(self, _: AsyncTenEnv, video_frame) -> None: """Handle incoming video frames.""" if not self.config.is_enable_video: return @@ -227,7 +225,14 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: cmd_result = CmdResult.create(StatusCode.ERROR) cmd_result.set_property_string("detail", str(e)) await ten_env.return_result(cmd_result, cmd) + async def _handle_user_left(self) -> None: + """Handle user left event.""" + self.users_count -= 1 + if self.users_count == 0: + self._reset_state() + if self.users_count < 0: + self.users_count = 0 async def _handle_user_joined(self) -> None: """Handle user joined event.""" self.users_count += 1 @@ -385,9 +390,6 @@ async def async_append_memory(): traceback.print_exc() self.ten_env.log_error(f"Error calling Nova model: {e}") - except Exception as e: - self.ten_env.log_error(f"Error appending memory: {e}") - async def _process_stream_response(self, response: Dict, start_time: float): """Process streaming response from Nova model.""" sentence = "" @@ -429,3 +431,16 @@ async def _process_stream_response(self, response: Dict, start_time: float): # Update metrics self.processing_times.append(time.time() - start_time) return full_content + + async def on_call_chat_completion(self, async_ten_env, **kargs): + raise NotImplementedError + + async def on_data_chat_completion(self, async_ten_env, **kargs): + raise NotImplementedError + + async def on_tools_update( + self, ten_env: AsyncTenEnv, tool + ) -> None: + """Called when a new tool is registered. Implement this method to process the new tool.""" + ten_env.log_info(f"on tools update {tool}") + # await self._update_session() \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/utils.py b/agents/ten_packages/extension/bedrock_llm_python/utils.py index 7d30ccf4..8bf739ae 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/utils.py +++ b/agents/ten_packages/extension/bedrock_llm_python/utils.py @@ -1,5 +1,4 @@ """Utility functions for BedrockV2V extension.""" -import base64 from io import BytesIO from PIL import Image from typing import List, Tuple, Any diff --git a/agents/ten_packages/extension/polly_tts/extension.py b/agents/ten_packages/extension/polly_tts/extension.py index 05ae0a37..33ecf549 100644 --- a/agents/ten_packages/extension/polly_tts/extension.py +++ b/agents/ten_packages/extension/polly_tts/extension.py @@ -66,6 +66,6 @@ async def on_cancel_tts(self, ten_env: AsyncTenEnv) -> None: await super().on_cancel_tts(ten_env) try: if self.client: - self.client._on_cancel_tts(ten_env) + self.client.on_cancel_tts(ten_env) except Exception: ten_env.log_error(f"on_cancel_tts failed: {traceback.format_exc()}") diff --git a/agents/ten_packages/extension/polly_tts/polly_tts.py b/agents/ten_packages/extension/polly_tts/polly_tts.py index 11a6071f..5b118dba 100644 --- a/agents/ten_packages/extension/polly_tts/polly_tts.py +++ b/agents/ten_packages/extension/polly_tts/polly_tts.py @@ -50,6 +50,7 @@ def __init__(self, config: PollyTTSConfig, ten_env: AsyncTenEnv) -> None: * self.config.bytes_per_sample / 100 ) + self.audio_stream = None def _synthesize(self, text, ten_env: AsyncTenEnv): """ @@ -103,7 +104,7 @@ async def text_to_speech_stream( except Exception: ten_env.log_error(traceback.format_exc()) - def _on_cancel_tts(self, ten_env: AsyncTenEnv) -> None: + def on_cancel_tts(self, ten_env: AsyncTenEnv) -> None: """ Cancel ongoing TTS operation """