Skip to content

Add: OpenAI Truncate Message Capabilities, Significantly improves the overall reasoning capacity, Also decreases the amount of tokens used #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ai01/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from ._models import AgentsEvents
from ._api import AgentEventTypes, AgentState
from .agent import Agent, AgentOptions

__all__ = [
"Agent",
"AgentOptions",
"AgentsEvents",
"AgentEventTypes",
"AgentState",
]

# Cleanup docs of unexported modules
Expand Down
16 changes: 16 additions & 0 deletions ai01/agent/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Literal

AgentEventTypes = Literal[
"listening",
"speaking",
"idle",
"connected",
"disconnected",
"error",
]

AgentState = Literal[
"speaking",
"listening",
"idle"
]
14 changes: 0 additions & 14 deletions ai01/agent/_models.py

This file was deleted.

43 changes: 29 additions & 14 deletions ai01/agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import logging
from dataclasses import dataclass
from typing import Optional

from pydantic import BaseModel
from ai01.providers.openai.audio_track import AudioTrack
from ai01.rtc import RTC, RTCOptions
from ai01.utils import logger
from ai01.utils.emitter import EnhancedEventEmitter

from ..providers.openai.audio_track import AudioTrack
from ..rtc import RTC, RTCOptions
from ..utils.emitter import EnhancedEventEmitter
from . import _api
from ._exceptions import RoomNotConnectedError, RoomNotCreatedError


class AgentOptions(BaseModel):
@dataclass
class AgentOptions:
""" "
Every Agent is created with a set of options that define the configuration for the Agent.

Expand All @@ -31,10 +33,7 @@ class AgentOptions(BaseModel):
class Config:
arbitrary_types_allowed = True



logger = logging.getLogger("Agent")
class Agent(EnhancedEventEmitter):
class Agent(EnhancedEventEmitter[_api.AgentEventTypes]):
"""
Agents is defined as the higher level user which is its own entity and has exposed APIs to
interact with different Models and Outer World using dRTC.
Expand All @@ -46,6 +45,9 @@ class Agent(EnhancedEventEmitter):

def __init__(self, options: AgentOptions):
super(Agent, self).__init__()

# State of the Agent.
self._state: _api.AgentState = 'idle'

# Options is the configuration for the Agent.
self.options = options
Expand Down Expand Up @@ -78,6 +80,19 @@ def room(self):
raise RoomNotCreatedError()

return self.__rtc.room

def _update_state(self, state: _api.AgentState):
"""
Update the State of the Agent.
"""
self._state = state

if self._state == 'listening':
self.emit('listening')
elif self._state == 'speaking':
self.emit('speaking')
elif self._state == 'idle':
self.emit('idle')

async def join(self):
"""
Expand All @@ -98,8 +113,6 @@ def on_room_join():
print("Room successfully joined!")
```
"""
self.logger.info("Joining Agent to the dRTC Network")

room = await self.__rtc.join()

if not room:
Expand All @@ -111,11 +124,13 @@ async def connect(self):
"""
Connects the Agent to the Room, This is only available after the Agent is joined to the dRTC Network.
"""
self.logger.info("Connecting Agent to the Room")

room = self.__rtc.room

if not room:
raise RoomNotCreatedError()

await room.connect()

self.logger.info("🔔 Agent Connected to the Huddle01 Room")

self.emit('connected')
158 changes: 72 additions & 86 deletions ai01/providers/openai/audio_track.py
Original file line number Diff line number Diff line change
@@ -1,142 +1,128 @@
from __future__ import annotations

import asyncio
import base64
import fractions
import logging
import threading
from contextlib import contextmanager
from dataclasses import dataclass

import numpy as np
from aiortc.mediastreams import MediaStreamError, MediaStreamTrack
from av import AudioFrame
from av.audio.fifo import AudioFifo
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class AudioTrackOptions(BaseModel):
"""Audio Track Options"""

sample_rate: int = 24000
"""
Sample Rate is the number of samples of audio carried per second, measured in Hz, Default is 24000.
"""

channels: int = 1
"""
Channels is the number of audio channels, Default is 1, which is mono.
"""

sample_width: int = 2
"""
Sample Width is the number of bytes per sample, Default is 2, which is 16 bits.
"""

from ai01 import rtc
from ai01.utils import logger

# Constants
AUDIO_PTIME = 0.020 # 20ms
DEFAULT_SAMPLE_RATE = 24000
DEFAULT_CHANNELS = 1
DEFAULT_SAMPLE_WIDTH = 2

@dataclass
class AudioTrackOptions:
sample_rate: int = DEFAULT_SAMPLE_RATE
channels: int = DEFAULT_CHANNELS
sample_width: int = DEFAULT_SAMPLE_WIDTH

class AudioFIFOManager:
def __init__(self):
self.fifo = AudioFifo()
self.lock = threading.Lock()

@contextmanager
def fifo_operation(self):
with self.lock:
yield self.fifo

def flush(self):
with self.lock:
self.fifo = AudioFifo()

class AudioTrack(MediaStreamTrack):
kind = "audio"

def __init__(self, options=AudioTrackOptions()):
print("AudioTrack __init__")
super().__init__()

# Audio configuration
self.sample_rate = options.sample_rate
self.channels = options.channels
self.sample_width = options.sample_width # 2 bytes per sample (16 bits)
self.sample_width = options.sample_width # 2 bytes per sample (16-bit PCM)

self._start = None
self._timestamp = 0
self.frame_samples = rtc.get_frame_size(self.sample_rate, AUDIO_PTIME)

self.AUDIO_PTIME = 0.020 # 20ms audio packetization
self.frame_samples = int(self.AUDIO_PTIME * self.sample_rate)
self._pushed_duration = 0.0
self._total_played_time = None

# Audio FIFO buffer
self.audio_fifo = AudioFifo()
self.fifo_lock = threading.Lock()
self.fifo_manager = AudioFIFOManager()

def __repr__(self) -> str:
return f"<AudioTrack kind={self.kind} state={self.readyState}> sample_rate={self.sample_rate} channels={self.channels} sample_width={self.sample_width}>"

def enqueue_audio(self, base64_audio: str):
"""Process and add audio data directly to the AudioFifo"""
if self.readyState != "live":
return

return f"<AudioTrack kind={self.kind} state={self.readyState}> sample_rate={self.sample_rate} channels={self.channels} sample_width={self.sample_width}>"

@property
def audio_samples(self) -> int:
"""
Audio Samples Returns the number of audio samples that have been played.
"""
if self._total_played_time is not None:
return int(self._total_played_time * self.sample_rate)
queued_duration = self.fifo_manager.fifo.samples / self.sample_rate

return int((self._pushed_duration - queued_duration) * self.sample_rate)

def enqueue_audio(self, content_index:int, audio: AudioFrame):
try:
audio_bytes = base64.b64decode(base64_audio)
audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
audio_array = audio_array.reshape(self.channels, -1)

frame = AudioFrame.from_ndarray(
audio_array,
format="s16",
layout="mono" if self.channels == 1 else "stereo",
)
frame.sample_rate = self.sample_rate
frame.time_base = fractions.Fraction(1, self.sample_rate)

with self.fifo_lock:
self.audio_fifo.write(frame)

if self.readyState != "live":
return MediaStreamError("AudioTrack is not live")

with self.fifo_manager.fifo_operation() as fifo:
fifo.write(audio)
self._pushed_duration += audio.samples / self.sample_rate
except Exception as e:
logger.error(f"Error in enqueue_audio: {e}", exc_info=True)

def flush_audio(self):
"""Flush the audio FIFO buffer"""
with self.fifo_lock:
self.audio_fifo = AudioFifo()
self.fifo_manager.flush()

async def recv(self) -> AudioFrame:
"""Receive the next audio frame"""
if self.readyState != "live":
raise MediaStreamError

if self._start is None:
self._start = asyncio.get_event_loop().time()
self._timestamp = 0

samples = self.frame_samples
self._timestamp += samples
self._timestamp += self.frame_samples

target_time = self._start + (self._timestamp / self.sample_rate)
current_time = asyncio.get_event_loop().time()

wait = target_time - current_time

if wait > 0:
await asyncio.sleep(wait)

try:
# Read frames from the FIFO buffer
with self.fifo_lock:
frame = self.audio_fifo.read(samples)
with self.fifo_manager.fifo_operation() as fifo:
frame = fifo.read(self.frame_samples)

if frame is None:
# If no data is available, generate silence
frame = AudioFrame(
format="s16",
layout="mono" if self.channels == 1 else "stereo",
samples=samples,
)
for p in frame.planes:
p.update(np.zeros(samples, dtype=np.int16).tobytes())
silence_buffer = np.zeros(self.frame_samples, dtype=np.int16).tobytes()

frame.sample_rate = self.sample_rate
frame.time_base = fractions.Fraction(1, self.sample_rate)
else:
# Update frame properties
frame.sample_rate = self.sample_rate
frame.time_base = fractions.Fraction(1, self.sample_rate)
frame = rtc.convert_to_audio_frame(
silence_buffer,
self.sample_rate,
self.channels,
len(silence_buffer) // 2
)

# Set frame PTS
frame.pts = self._timestamp

self._total_played_time = self._timestamp / self.sample_rate

return frame

except Exception as e:
logger.error(f"Error in recv: {e}", exc_info=True)
raise MediaStreamError("Error processing audio frame")

def stop(self) -> None:
"""Stop the track"""
if self.readyState == "live":
super().stop()
raise MediaStreamError("Error processing audio frame")
5 changes: 4 additions & 1 deletion ai01/providers/openai/realtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
AudioFormat,
ClientEvent,
ClientEventType,
EventTypes,
Modality,
RealTimeModelOptions,
RealTimeModels,
ServerEvent,
ServerEventType,
ToolChoice,
Voice,
)
from .realtime_model import RealTimeModel, RealTimeModelOptions
from .realtime_model import RealTimeModel

__all__ = [
"api",
Expand All @@ -25,6 +27,7 @@
"ToolChoice",
"ClientEventType",
"ServerEventType",
"EventTypes",
]


Expand Down
Loading