From 99b8e4505e7ac7a1acc981fe5fcb85850c3204ba Mon Sep 17 00:00:00 2001 From: Yauheni Date: Mon, 30 Dec 2024 20:44:52 +0100 Subject: [PATCH 1/5] feat: improve voice logic (COR-4354) --- packages/chat/package.json | 2 + .../views/VoiceWidget/VoiceWidget.view.tsx | 10 +- .../VoiceWidget/hooks/Voice.controller.ts | 334 ------------------ .../hooks/use-voice-controller.hook.ts | 13 - .../hooks/use-voice-service.hook.ts | 10 + .../VoiceWidget/services/audio.service.ts | 186 ++++++++++ .../services/legacy-audio.service.ts | 182 ++++++++++ .../VoiceWidget/services/recorder.service.ts | 85 +++++ .../VoiceWidget/services/socket.service.ts | 143 ++++++++ .../VoiceWidget/services/voice.service.ts | 139 ++++++++ yarn.lock | 11 + 11 files changed, 763 insertions(+), 352 deletions(-) delete mode 100644 packages/chat/src/views/VoiceWidget/hooks/Voice.controller.ts delete mode 100644 packages/chat/src/views/VoiceWidget/hooks/use-voice-controller.hook.ts create mode 100644 packages/chat/src/views/VoiceWidget/hooks/use-voice-service.hook.ts create mode 100644 packages/chat/src/views/VoiceWidget/services/audio.service.ts create mode 100644 packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts create mode 100644 packages/chat/src/views/VoiceWidget/services/recorder.service.ts create mode 100644 packages/chat/src/views/VoiceWidget/services/socket.service.ts create mode 100644 packages/chat/src/views/VoiceWidget/services/voice.service.ts diff --git a/packages/chat/package.json b/packages/chat/package.json index 69615bc97..056362b03 100644 --- a/packages/chat/package.json +++ b/packages/chat/package.json @@ -86,6 +86,7 @@ "chroma-js": "2.4.2", "clsx": "1.2.1", "cuid": "2.1.8", + "lodash.debounce": "^4.0.8", "react": "18.2.0", "react-dom": "18.2.0", "react-markdown": "9.0.0", @@ -117,6 +118,7 @@ "@testing-library/jest-dom": "6.4.2", "@testing-library/react": "15.0.2", "@types/chroma-js": "2.1.4", + "@types/lodash.debounce": "^4.0.9", "@types/node": "20.12.7", "@types/react": "18.2.8", "@types/react-dom": "18.2.4", diff --git a/packages/chat/src/views/VoiceWidget/VoiceWidget.view.tsx b/packages/chat/src/views/VoiceWidget/VoiceWidget.view.tsx index 330efeb65..d71f23a76 100644 --- a/packages/chat/src/views/VoiceWidget/VoiceWidget.view.tsx +++ b/packages/chat/src/views/VoiceWidget/VoiceWidget.view.tsx @@ -5,7 +5,7 @@ import type { VoiceState } from '@/constant/voice.constant'; import { VOICE_STATE } from '@/constant/voice.constant'; import { RuntimeStateAPIContext } from '@/contexts'; -import { useVoiceController } from './hooks/use-voice-controller.hook'; +import { useVoiceService } from './hooks/use-voice-service.hook'; export const VoiceWidget = () => { const { assistant, config } = useContext(RuntimeStateAPIContext); @@ -15,14 +15,14 @@ export const VoiceWidget = () => { throw new Error('Voice is not configured in the config'); } - const voiceController = useVoiceController({ + const voiceService = useVoiceService({ url: config.voice.url, userID: config.userID, assistantID: config.verify.projectID, accessToken: config.voice.accessToken, }); - useEffect(() => voiceController.onStateUpdate((state) => setState(state)), [voiceController]); + useEffect(() => voiceService.onStateUpdate((state) => setState(state)), [voiceService]); return ( { footer={assistant.common.footerLink} settings={assistant.voice} poweredBy={assistant.common.poweredBy} - onEndCall={voiceController.endConversation} - onStartCall={voiceController.startConversation} + onEndCall={voiceService.endConversation} + onStartCall={voiceService.startConversation} /> ); }; diff --git a/packages/chat/src/views/VoiceWidget/hooks/Voice.controller.ts b/packages/chat/src/views/VoiceWidget/hooks/Voice.controller.ts deleted file mode 100644 index 64bc59801..000000000 --- a/packages/chat/src/views/VoiceWidget/hooks/Voice.controller.ts +++ /dev/null @@ -1,334 +0,0 @@ -import { VOICE_STATE, VoiceState } from '@/constant/voice.constant'; - -export interface VoiceControllerOptions { - url: string; - userID?: string; - accessToken: string; - assistantID: string; -} - -export class VoiceController { - url: string; - - state: string = VOICE_STATE.IDLE; - - userID = 'test'; - - listeners: Array<(state: VoiceState) => void> = []; - - accessToken: string; - - assistantID: string; - - webSocket: WebSocket | null = null; - - lastMarkIndex = 0; - - lastSentMarkIndex = 0; - - audioQueue = Promise.resolve(); - - talkingTimeout: NodeJS.Timeout | null = null; - - inputAudioStream: MediaStream | null = null; - - mediaRecorder: MediaRecorder | null = null; - - currentAudioElement: { - outputAudioElement: HTMLAudioElement; - outputSourceBuffer: SourceBuffer | null; - } = { - outputAudioElement: new Audio(), - outputSourceBuffer: null, - }; - - constructor({ url, userID, accessToken, assistantID }: VoiceControllerOptions) { - this.url = `${url}/voice/socket`; - this.userID = userID ?? this.userID; - this.assistantID = assistantID; - this.accessToken = accessToken; - } - - sendMark = () => { - if (this.lastMarkIndex === this.lastSentMarkIndex) return; - - this.lastSentMarkIndex = this.lastMarkIndex; - - this.webSocket?.send( - JSON.stringify({ - type: 'mark', - payload: { - markIndex: this.lastMarkIndex, - }, - }) - ); - }; - - base64ToArrayBuffer(base64: string) { - const binaryString = window.atob(base64); - const len = binaryString.length; - const bytes = new Uint8Array(len); - - for (let i = 0; i < len; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - - return bytes.buffer; - } - - waitForBufferUpdateEnd = (outputSourceBuffer: SourceBuffer, chunk: ArrayBuffer) => - new Promise((resolve, reject) => { - function onUpdateEnd() { - outputSourceBuffer.removeEventListener('updateend', onUpdateEnd); - outputSourceBuffer.removeEventListener('error', onError); - resolve(null); - } - - function onError(error: any) { - outputSourceBuffer.removeEventListener('updateend', onUpdateEnd); - outputSourceBuffer.removeEventListener('error', onError); - reject(error); - } - - outputSourceBuffer.addEventListener('updateend', onUpdateEnd); - outputSourceBuffer.addEventListener('error', onError); - - outputSourceBuffer.appendBuffer(chunk); - }); - - async handleIncomingChunk(base64Chunk: string, markIndex: number) { - const { outputSourceBuffer } = this.currentAudioElement; - - if (!outputSourceBuffer) return; - - this.audioQueue = this.audioQueue.then(async () => { - const chunkBuffer = this.base64ToArrayBuffer(base64Chunk); - - await this.waitForBufferUpdateEnd(outputSourceBuffer, chunkBuffer); - - // 3. Append buffer - if (markIndex > this.lastMarkIndex) { - this.lastMarkIndex = markIndex; - } - - return undefined; - }); - } - - clearAudio() { - this.currentAudioElement.outputSourceBuffer?.abort(); - this.currentAudioElement.outputAudioElement.pause(); - this.currentAudioElement.outputAudioElement.currentTime = 0; - this.currentAudioElement.outputAudioElement.src = ''; - - this.sendMark(); - this.currentAudioElement = this.createAudioElement(); - } - - createAudioElement() { - const outputAudioElement = new Audio(); - - const ref: { - outputAudioElement: HTMLAudioElement; - outputSourceBuffer: SourceBuffer | null; - } = { - outputAudioElement, - outputSourceBuffer: null, - }; - - const outputMediaSource = new MediaSource(); - - outputAudioElement.src = URL.createObjectURL(outputMediaSource); - - // Flag to track if we're initialized - let sourceOpen = false; - - // Called when outputMediaSource is ready to accept data. - function onSourceOpen() { - if (!sourceOpen) { - sourceOpen = true; - ref.outputSourceBuffer = outputMediaSource.addSourceBuffer('audio/mpeg'); - - // (Optional) Set the mode to 'segments' - // outputSourceBuffer.mode = 'segments'; - - console.info('SourceBuffer created: audio/mpeg'); - } - } - - // Listen for MediaSource to be ready - outputMediaSource.addEventListener('sourceopen', onSourceOpen); - outputAudioElement.addEventListener('waiting', this.sendMark); - outputAudioElement.addEventListener('stalled', this.sendMark); - outputAudioElement.addEventListener('timeupdate', () => { - console.log('timeupdate', outputAudioElement.currentTime, this.state); - - if (this.state !== VOICE_STATE.TALKING && outputAudioElement === this.currentAudioElement.outputAudioElement) { - this.updateState(VOICE_STATE.TALKING); - } - - if (this.talkingTimeout) { - clearTimeout(this.talkingTimeout); - } - - this.talkingTimeout = setTimeout(() => { - this.updateState(VOICE_STATE.LISTENING); - this.talkingTimeout = null; - }, 500); - }); - - outputAudioElement.play(); - - return ref; - } - - async startInputStream() { - try { - this.inputAudioStream = await navigator.mediaDevices.getUserMedia({ - audio: true, - }); - } catch (e) { - return console.error('Microphone access denied.', e); - } - - console.info('Got audio stream:', this.inputAudioStream); - - this.mediaRecorder = new MediaRecorder(this.inputAudioStream); - - console.info('Got media recorder:', this.mediaRecorder); - - this.mediaRecorder.addEventListener('dataavailable', (event) => { - if (event.data.size > 0 && this.webSocket && this.webSocket.readyState === WebSocket.OPEN) { - this.webSocket.send(event.data); - } - }); - - this.mediaRecorder.addEventListener('error', (event) => { - console.error('MediaRecorder error:', event.error); - }); - - this.mediaRecorder.start(500); - console.info('Started audio streaming...'); - - return this.inputAudioStream; - } - - stopInputStream() { - if (this.mediaRecorder && this.mediaRecorder.state !== 'inactive') { - this.mediaRecorder.stop(); - } - if (this.inputAudioStream) { - this.inputAudioStream.getTracks().forEach((track) => track.stop()); - } - - console.info('Stopped audio streaming.'); - } - - initWebSocket = () => - new Promise((resolve) => { - if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) { - return; - } - - this.webSocket = new WebSocket(this.url); - - this.webSocket.onopen = () => { - console.info('WebSocket opened!'); - resolve({}); - }; - - this.webSocket.onmessage = (event) => { - // If we get a text message, treat as text/JSON - const message = JSON.parse(event.data); - - console.info(event.data, message); - - if (message.type === 'audio') { - this.handleIncomingChunk(message.payload.audio, message.payload.markIndex); - } else if (message.type === 'end') { - console.info('Conversation ended by server.', 'system'); - this.stopInputStream(); - } else if (message.type === 'interrupt') { - this.clearAudio(); - } - }; - - this.webSocket.onclose = () => { - console.info('WebSocket disconnected!', 'system'); - }; - - this.webSocket.onerror = (err) => { - console.error('WebSocket error:', err); - }; - }); - - async start() { - this.updateState(VOICE_STATE.INITIALIZING); - - await this.initWebSocket(); - - this.currentAudioElement = this.createAudioElement(); - - const messageObj = { - type: 'start', - payload: { - userID: this.userID, - assistantID: this.assistantID, - authorization: this.accessToken, - }, - }; - - this.webSocket?.send(JSON.stringify(messageObj)); - - await this.startInputStream(); - - console.log('listening'); - - this.updateState(VOICE_STATE.LISTENING); - } - - end() { - this.stopInputStream(); - - this.audioQueue = Promise.resolve(); - this.webSocket?.close(); - this.webSocket = null; - this.lastMarkIndex = 0; - this.lastSentMarkIndex = 0; - this.inputAudioStream = null; - this.mediaRecorder = null; - this.currentAudioElement = { - outputAudioElement: new Audio(), - outputSourceBuffer: null, - }; - - if (this.talkingTimeout) { - clearTimeout(this.talkingTimeout); - this.talkingTimeout = null; - } - - this.updateState(VOICE_STATE.ENDED); - } - - startConversation = () => { - this.start(); - }; - - endConversation = () => { - this.end(); - }; - - updateState = (state: VoiceState) => { - this.state = state; - - this.listeners.forEach((listener) => listener(state)); - }; - - onStateUpdate = (cb: (state: VoiceState) => void) => { - this.listeners.push(cb); - - return () => { - this.listeners = this.listeners.filter((listener) => listener !== cb); - }; - }; -} diff --git a/packages/chat/src/views/VoiceWidget/hooks/use-voice-controller.hook.ts b/packages/chat/src/views/VoiceWidget/hooks/use-voice-controller.hook.ts deleted file mode 100644 index 0d3387c9d..000000000 --- a/packages/chat/src/views/VoiceWidget/hooks/use-voice-controller.hook.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { useCreateConst } from '@/hooks/cache.hook'; - -import type { VoiceControllerOptions } from './Voice.controller'; -import { VoiceController } from './Voice.controller'; - -interface IUserVoiceController extends VoiceControllerOptions{ -} - -export const useVoiceController = (options: IUserVoiceController) => { - const voiceController = useCreateConst(() => new VoiceController(options)); - - return voiceController; -}; diff --git a/packages/chat/src/views/VoiceWidget/hooks/use-voice-service.hook.ts b/packages/chat/src/views/VoiceWidget/hooks/use-voice-service.hook.ts new file mode 100644 index 000000000..ba2cce76a --- /dev/null +++ b/packages/chat/src/views/VoiceWidget/hooks/use-voice-service.hook.ts @@ -0,0 +1,10 @@ +import { useCreateConst } from '@/hooks/cache.hook'; +import { VoiceService, VoiceServiceOptions } from '../services/voice.service'; + +interface IUserVoiceService extends VoiceServiceOptions {} + +export const useVoiceService = (options: IUserVoiceService) => { + const voiceService = useCreateConst(() => new VoiceService(options)); + + return voiceService; +}; diff --git a/packages/chat/src/views/VoiceWidget/services/audio.service.ts b/packages/chat/src/views/VoiceWidget/services/audio.service.ts new file mode 100644 index 000000000..8b2314d78 --- /dev/null +++ b/packages/chat/src/views/VoiceWidget/services/audio.service.ts @@ -0,0 +1,186 @@ +import debounce from 'lodash.debounce'; + +interface QueueItem { + markIndex: number; + arrayBuffer: ArrayBuffer; +} + +export class AudioService { + private audio: HTMLAudioElement; + + private stopped = false; + + private mediaSource: MediaSource | null = null; + + private lastMarkIndex = 0; + + private lastSentMarkIndex = 0; + + private onMark: (markIndex: number) => void; + + private onTalking: () => void; + + private onListening: () => void; + + private queue: QueueItem[] = []; + + private activeItem: QueueItem | null = null; + + private sourceBuffer: SourceBuffer | null = null; + + constructor(options: { onMark: (markIndex: number) => void; onListening: () => void; onTalking: () => void }) { + this.onMark = options.onMark; + this.onTalking = options.onTalking; + this.onListening = options.onListening; + + this.audio = new Audio(); + this.audio.addEventListener('ended', this.onAudioEnded); + this.audio.addEventListener('waiting', this.onAudioWaiting); + this.audio.addEventListener('stalled', this.onAudioStalled); + this.audio.addEventListener('playing', this.onAudioPlaying); + } + + private onAudioWaiting = () => { + this.sendMark(); + }; + + private onAudioStalled = () => { + this.sendMark(); + }; + + private onAudioPlaying = () => { + this.onTalking(); + }; + + private onAudioEnded = () => { + this.onListening(); + }; + + private sendMark() { + if (this.lastMarkIndex === this.lastSentMarkIndex) return; + + this.lastSentMarkIndex = this.lastMarkIndex; + + this.onMark(this.lastMarkIndex); + } + + private base64ToArrayBuffer(base64: string) { + const binaryString = window.atob(base64); + const len = binaryString.length; + const bytes = new Uint8Array(len); + + for (let i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + + return bytes.buffer; + } + + private onBufferUpdated(markIndex: number) { + this.activeItem = null; + + if (markIndex > this.lastMarkIndex) { + this.lastMarkIndex = markIndex; + } + + if (this.queue.length) { + this.playQueue(); + } else { + this.sendMark(); + this.onListening(); + this.stopAudioDebounced(); + } + } + + private async updateSourceBuffer(markIndex: number, arrayBuffer: ArrayBuffer) { + await this.startAudio(); + + this.sourceBuffer?.addEventListener('error', () => this.onBufferUpdated(markIndex), { once: true }); + this.sourceBuffer?.addEventListener('updateend', () => this.onBufferUpdated(markIndex), { once: true }); + + this.sourceBuffer?.appendBuffer(arrayBuffer); + } + + private async playQueue() { + if (this.stopped || this.activeItem || !this.queue.length) return; + + this.activeItem = this.queue.shift()!; + + this.updateSourceBuffer(this.activeItem.markIndex, this.activeItem.arrayBuffer); + } + + addChunk(base64Chunk: string, markIndex: number) { + if (this.stopped) return; + + const arrayBuffer = this.base64ToArrayBuffer(base64Chunk); + + this.queue.push({ markIndex, arrayBuffer }); + this.playQueue(); + } + + private stopAudio() { + if (this.stopped) return; + + this.sourceBuffer?.abort(); + this.sourceBuffer = null; + + if (this.mediaSource?.readyState === 'open') { + this.mediaSource?.endOfStream(); + } + this.mediaSource = null; + + URL.revokeObjectURL(this.audio.src); + + this.audio.pause(); + this.audio.currentTime = 0; + this.audio.src = ''; + } + + private stopAudioDebounced = debounce(this.stopAudio, 500); + + private startAudio() { + if (this.stopped || this.mediaSource) return Promise.resolve(); + + return new Promise((resolve) => { + const mediaSource = new MediaSource(); + + mediaSource.addEventListener( + 'sourceopen', + () => { + this.sourceBuffer = mediaSource.addSourceBuffer('audio/mpeg'); + + resolve(); + }, + { once: true } + ); + + this.audio.src = URL.createObjectURL(mediaSource); + + this.audio.play(); + this.mediaSource = mediaSource; + }); + } + + interrupt() { + if (this.stopped) return; + + this.queue = []; + this.activeItem = null; + + this.stopAudio(); + + this.sendMark(); + + this.startAudio(); + } + + stop() { + this.stopAudio(); + + this.queue = []; + this.stopped = true; + this.activeItem = null; + this.lastMarkIndex = 0; + this.lastSentMarkIndex = 0; + } +} diff --git a/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts b/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts new file mode 100644 index 000000000..2cf58c9f9 --- /dev/null +++ b/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts @@ -0,0 +1,182 @@ +export class AudioService { + webSocket: WebSocket | null = null; + + private lastMarkIndex = 0; + + private lastSentMarkIndex = 0; + + private audioQueue = Promise.resolve(); + + private talkingTimeout: NodeJS.Timeout | null = null; + + private currentAudioElement: { + outputAudioElement: HTMLAudioElement; + outputSourceBuffer: SourceBuffer | null; + } = { + outputAudioElement: new Audio(), + outputSourceBuffer: null, + }; + + private onMark: (markIndex: number) => void; + + private onListening: () => void; + + private onTimeUpdate: () => void; + + constructor(options: { onMark: (markIndex: number) => void; onListening: () => void; onTimeUpdate: () => void }) { + this.onMark = options.onMark; + this.onListening = options.onListening; + this.onTimeUpdate = options.onTimeUpdate; + } + + sendMark = () => { + if (this.lastMarkIndex === this.lastSentMarkIndex) return; + + this.lastSentMarkIndex = this.lastMarkIndex; + + this.onMark(this.lastMarkIndex); + }; + + base64ToArrayBuffer(base64: string) { + const binaryString = window.atob(base64); + const len = binaryString.length; + const bytes = new Uint8Array(len); + + for (let i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + + return bytes.buffer; + } + + waitForBufferUpdateEnd = (outputSourceBuffer: SourceBuffer, chunk: ArrayBuffer) => + new Promise((resolve, reject) => { + function onUpdateEnd() { + outputSourceBuffer.removeEventListener('updateend', onUpdateEnd); + outputSourceBuffer.removeEventListener('error', onError); + resolve(null); + } + + function onError(error: any) { + outputSourceBuffer.removeEventListener('updateend', onUpdateEnd); + outputSourceBuffer.removeEventListener('error', onError); + reject(error); + } + + outputSourceBuffer.addEventListener('updateend', onUpdateEnd); + outputSourceBuffer.addEventListener('error', onError); + + outputSourceBuffer.appendBuffer(chunk); + }); + + async addChunk(base64Chunk: string, markIndex: number) { + const { outputSourceBuffer } = this.currentAudioElement; + + if (!outputSourceBuffer) return; + + this.audioQueue = this.audioQueue.then(async () => { + const chunkBuffer = this.base64ToArrayBuffer(base64Chunk); + + await this.waitForBufferUpdateEnd(outputSourceBuffer, chunkBuffer); + + // 3. Append buffer + if (markIndex > this.lastMarkIndex) { + this.lastMarkIndex = markIndex; + } + + return undefined; + }); + } + + cleanup() { + this.currentAudioElement.outputSourceBuffer?.abort(); + this.currentAudioElement.outputAudioElement.pause(); + this.currentAudioElement.outputAudioElement.currentTime = 0; + this.currentAudioElement.outputAudioElement.src = ''; + } + + interrupt() { + this.cleanup(); + + this.sendMark(); + this.currentAudioElement = this.createAudioElement(); + } + + createAudioElement() { + const outputAudioElement = new Audio(); + + const ref: { + outputAudioElement: HTMLAudioElement; + outputSourceBuffer: SourceBuffer | null; + } = { + outputAudioElement, + outputSourceBuffer: null, + }; + + const outputMediaSource = new MediaSource(); + + outputAudioElement.src = URL.createObjectURL(outputMediaSource); + + // Flag to track if we're initialized + let sourceOpen = false; + + // Called when outputMediaSource is ready to accept data. + function onSourceOpen() { + if (!sourceOpen) { + sourceOpen = true; + ref.outputSourceBuffer = outputMediaSource.addSourceBuffer('audio/mpeg'); + + ref.outputSourceBuffer.abort + + // (Optional) Set the mode to 'segments' + // outputSourceBuffer.mode = 'segments'; + + console.info('SourceBuffer created: audio/mpeg'); + } + } + + // Listen for MediaSource to be ready + outputMediaSource.addEventListener('sourceopen', onSourceOpen); + outputAudioElement.addEventListener('waiting', this.sendMark); + outputAudioElement.addEventListener('stalled', this.sendMark); + outputAudioElement.addEventListener('timeupdate', () => { + if (outputAudioElement === this.currentAudioElement.outputAudioElement) { + this.onTimeUpdate(); + } + + if (this.talkingTimeout) { + clearTimeout(this.talkingTimeout); + } + + this.talkingTimeout = setTimeout(() => { + this.onListening(); + this.talkingTimeout = null; + }, 500); + }); + + outputAudioElement.play(); + + return ref; + } + + start() { + this.currentAudioElement = this.createAudioElement(); + } + + stop() { + this.cleanup(); + + this.audioQueue = Promise.resolve(); + this.lastMarkIndex = 0; + this.lastSentMarkIndex = 0; + this.currentAudioElement = { + outputAudioElement: new Audio(), + outputSourceBuffer: null, + }; + + if (this.talkingTimeout) { + clearTimeout(this.talkingTimeout); + this.talkingTimeout = null; + } + } +} diff --git a/packages/chat/src/views/VoiceWidget/services/recorder.service.ts b/packages/chat/src/views/VoiceWidget/services/recorder.service.ts new file mode 100644 index 000000000..6d15a46ac --- /dev/null +++ b/packages/chat/src/views/VoiceWidget/services/recorder.service.ts @@ -0,0 +1,85 @@ +export class RecorderService { + private mediaRecorder: MediaRecorder | null = null; + + private inputAudioStream: MediaStream | null = null; + + private createAudioStreamPromise: Promise | null = null; + + constructor(private readonly onDataAvailable: (data: Blob) => void) {} + + private createAudioStream() { + if (this.createAudioStreamPromise) { + console.info('Returning existing input audio stream promise.'); + + return this.createAudioStreamPromise; + } + + if (this.inputAudioStream) { + return Promise.resolve(this.inputAudioStream); + } + + this.createAudioStreamPromise = navigator.mediaDevices + .getUserMedia({ audio: true }) + .catch((err) => { + console.error('Microphone access denied.', err); + + throw err; + }) + .finally(() => { + this.createAudioStreamPromise = null; + }); + + return this.createAudioStreamPromise; + } + + private onMediaRecorderError = (event: ErrorEvent) => { + console.error('MediaRecorder error:', event.error); + }; + + private onMediaRecorderDataAvailable = (event: BlobEvent) => { + console.info('Got data:', event.data); + + if (event.data.size > 0) { + this.onDataAvailable(event.data); + } + }; + + async start() { + if (this.inputAudioStream) { + console.info('Audio stream already exists, reusing.'); + + return; + } + + this.inputAudioStream = await this.createAudioStream(); + + console.info('Got audio stream:', this.inputAudioStream); + + this.mediaRecorder = new MediaRecorder(this.inputAudioStream); + + console.info('Got media recorder:', this.mediaRecorder); + + this.mediaRecorder.addEventListener('error', this.onMediaRecorderError); + this.mediaRecorder.addEventListener('dataavailable', this.onMediaRecorderDataAvailable); + + this.mediaRecorder?.start(500); + + console.info('Started audio streaming...'); + } + + stop() { + if (this.mediaRecorder?.state !== 'inactive') { + this.mediaRecorder?.stop(); + } + + this.inputAudioStream?.getTracks().forEach((track) => track.stop()); + + this.mediaRecorder?.removeEventListener('error', this.onMediaRecorderError); + this.mediaRecorder?.removeEventListener('dataavailable', this.onMediaRecorderDataAvailable); + + this.mediaRecorder = null; + this.inputAudioStream = null; + + console.info('Stopped audio streaming.'); + } +} diff --git a/packages/chat/src/views/VoiceWidget/services/socket.service.ts b/packages/chat/src/views/VoiceWidget/services/socket.service.ts new file mode 100644 index 000000000..df65b9daf --- /dev/null +++ b/packages/chat/src/views/VoiceWidget/services/socket.service.ts @@ -0,0 +1,143 @@ +interface SocketAudioInputMessage { + type: 'audio'; + payload: { audio: string; markIndex: number }; +} + +interface SocketEndInputMessage { + type: 'end'; +} + +interface SocketInterruptInputMessage { + type: 'interrupt'; +} + +export type SocketInputMessage = SocketAudioInputMessage | SocketEndInputMessage | SocketInterruptInputMessage; + +interface SocketMarkOutputMessage { + type: 'mark'; + payload: { markIndex: number }; +} + +interface SocketStartOutputMessage { + type: 'start'; + payload: { userID: string; assistantID: string; authorization: string }; +} + +export type SocketOutputMessage = Blob | SocketMarkOutputMessage | SocketStartOutputMessage; + +export class SocketService { + private url: string; + + private socket: WebSocket | null = null; + + private onError: (error: unknown) => void; + + private onMessage: (message: SocketInputMessage) => void; + + private connectionPromise: Promise | null = null; + + constructor(options: { + url: string; + onError: (error: unknown) => void; + onMessage: (message: SocketInputMessage) => void; + }) { + this.url = options.url; + this.onError = options.onError; + this.onMessage = options.onMessage; + } + + private createSocket() { + if (this.connectionPromise) { + return this.connectionPromise; + } + + if (this.socket) { + return Promise.resolve(this.socket); + } + + this.connectionPromise = new Promise((resolve, reject) => { + const socket = new WebSocket(this.url); + + const onOpen = () => { + console.info('Socket connection established.'); + + removeListeners(); + resolve(socket); + }; + + const onError = (error: unknown) => { + console.error('Socket connection error.', error); + + removeListeners(); + reject(error); + }; + + socket.addEventListener('open', onOpen); + socket.addEventListener('error', onError); + + const removeListeners = () => { + socket?.removeEventListener('open', onOpen); + socket?.removeEventListener('error', onError); + }; + }).finally(() => { + this.connectionPromise = null; + }); + + return this.connectionPromise; + } + + private onSocketError = (event: Event) => { + console.error('socket error:', event); + + this.onError(event); + this.stop(); + }; + + private onSocketMessage = (event: MessageEvent) => { + const message = JSON.parse(event.data); + + console.info('socket message:', message); + + this.onMessage(message); + }; + + async start() { + if (this.socket) { + console.info('Socket already exists, reusing.'); + + return; + } + + this.socket = await this.createSocket(); + + this.socket.addEventListener('error', this.onSocketError); + this.socket.addEventListener('message', this.onSocketMessage); + } + + stop() { + if (this.socket?.readyState !== WebSocket.CLOSED && this.socket?.readyState !== WebSocket.CLOSING) { + this.socket?.close(); + } + + this.socket?.removeEventListener('error', this.onSocketError); + this.socket?.removeEventListener('message', this.onSocketMessage); + + this.socket = null; + } + + send(message: SocketOutputMessage) { + if (!this.socket) { + console.warn('Socket is not open, cannot send message.'); + + return; + } + + if (this.socket.readyState !== WebSocket.OPEN) { + console.warn('Socket is not open, cannot send message.'); + + return; + } + + this.socket.send(message instanceof Blob ? message : JSON.stringify(message)); + } +} diff --git a/packages/chat/src/views/VoiceWidget/services/voice.service.ts b/packages/chat/src/views/VoiceWidget/services/voice.service.ts new file mode 100644 index 000000000..41fa4f82d --- /dev/null +++ b/packages/chat/src/views/VoiceWidget/services/voice.service.ts @@ -0,0 +1,139 @@ +import { VOICE_STATE, VoiceState } from '@/constant/voice.constant'; + +// import { AudioService } from './legacy-audio.service'; +import { AudioService } from './audio.service'; +import { RecorderService } from './recorder.service'; +import { SocketInputMessage, SocketService } from './socket.service'; + +export interface VoiceServiceOptions { + url: string; + userID?: string; + accessToken: string; + assistantID: string; +} + +export class VoiceService { + private state: string = VOICE_STATE.IDLE; + + private userID: string; + + private listeners: Array<(state: VoiceState) => void> = []; + + private accessToken: string; + + private assistantID: string; + + private audio: AudioService | null = null; + + private socket: SocketService; + + private recorder: RecorderService; + + constructor({ url, userID, accessToken, assistantID }: VoiceServiceOptions) { + this.userID = userID ?? 'test'; + this.assistantID = assistantID; + this.accessToken = accessToken; + + // this.audio = new AudioService({ + // onMark: this.onAudioMark, + // onListening: this.onAudioListening, + // onTimeUpdate: this.onAudioTimeUpdate, + // }); + this.socket = new SocketService({ + url: `${url}/voice/socket`, + onError: this.onSocketError, + onMessage: this.onSocketMessage, + }); + this.recorder = new RecorderService(this.onRecorderDataAvailable); + } + + private onAudioMark = (markIndex: number) => { + this.socket?.send({ type: 'mark', payload: { markIndex } }); + }; + + private onAudioTalking = () => { + this.updateState(VOICE_STATE.TALKING); + }; + + private onAudioTimeUpdate = () => { + this.updateState(VOICE_STATE.TALKING); + }; + + private onAudioListening = () => { + this.updateState(VOICE_STATE.LISTENING); + }; + + private onRecorderDataAvailable = (data: Blob) => { + this.socket.send(data); + }; + + private onSocketError = () => { + // TODO: Handle socket error + }; + + private onSocketMessage = (message: SocketInputMessage) => { + if (message.type === 'audio') { + this.audio?.addChunk(message.payload.audio, message.payload.markIndex); + } else if (message.type === 'end') { + console.info('Conversation ended by server.', 'system'); + this.recorder.stop(); + } else if (message.type === 'interrupt') { + this.audio?.interrupt(); + } + }; + + private async start() { + this.updateState(VOICE_STATE.INITIALIZING); + + this.audio = new AudioService({ + onMark: this.onAudioMark, + onTalking: this.onAudioTalking, + onListening: this.onAudioListening, + }); + + await this.socket.start(); + + this.socket.send({ + type: 'start', + payload: { + userID: this.userID, + assistantID: this.assistantID, + authorization: this.accessToken, + }, + }); + + await this.recorder.start(); + + this.updateState(VOICE_STATE.LISTENING); + + console.info('listening...'); + } + + private stop() { + this.recorder.stop(); + this.socket.stop(); + this.audio?.stop(); + + this.updateState(VOICE_STATE.ENDED); + } + + endConversation = () => this.stop(); + + startConversation = () => this.start(); + + updateState = (state: VoiceState) => { + if (this.state === state) return; + + this.state = state; + + this.listeners.forEach((listener) => listener(state)); + }; + + onStateUpdate = (cb: (state: VoiceState) => void) => { + this.listeners.push(cb); + + return () => { + this.listeners = this.listeners.filter((listener) => listener !== cb); + }; + }; +} diff --git a/yarn.lock b/yarn.lock index 0558fdaf7..c568393f3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6388,6 +6388,15 @@ __metadata: languageName: node linkType: hard +"@types/lodash.debounce@npm:^4.0.9": + version: 4.0.9 + resolution: "@types/lodash.debounce@npm:4.0.9" + dependencies: + "@types/lodash": "*" + checksum: 8183a152e01928e3b97ca773f6ae6038b8695e76493ba8bf6b743ec143948a62294fbc9d49fa4a78b52265b3ba4892ef57534e0c13d04aa0f111671b5a944feb + languageName: node + linkType: hard + "@types/lodash.memoize@npm:^4.1.7": version: 4.1.9 resolution: "@types/lodash.memoize@npm:4.1.9" @@ -7649,6 +7658,7 @@ __metadata: "@testing-library/jest-dom": 6.4.2 "@testing-library/react": 15.0.2 "@types/chroma-js": 2.1.4 + "@types/lodash.debounce": ^4.0.9 "@types/node": 20.12.7 "@types/react": 18.2.8 "@types/react-dom": 18.2.4 @@ -7674,6 +7684,7 @@ __metadata: eslint-plugin-storybook: 0.8.0 happy-dom: 14.7.1 http-server: 14.1.1 + lodash.debounce: ^4.0.8 react: 18.2.0 react-dom: 18.2.0 react-markdown: 9.0.0 From 2ea2a17314a7c99080ef4fd37370baa5f3db9fc7 Mon Sep 17 00:00:00 2001 From: Tyler Han Date: Mon, 30 Dec 2024 18:58:49 -0800 Subject: [PATCH 2/5] fix: interruption behavior and audio stopping --- packages/chat/package.json | 1 - .../chat/src/views/VoiceWidget/services/audio.service.ts | 8 ++------ .../views/VoiceWidget/services/legacy-audio.service.ts | 2 -- .../src/views/VoiceWidget/services/recorder.service.ts | 4 ++-- yarn.lock | 1 - 5 files changed, 4 insertions(+), 12 deletions(-) diff --git a/packages/chat/package.json b/packages/chat/package.json index 056362b03..5ecaf9fa8 100644 --- a/packages/chat/package.json +++ b/packages/chat/package.json @@ -86,7 +86,6 @@ "chroma-js": "2.4.2", "clsx": "1.2.1", "cuid": "2.1.8", - "lodash.debounce": "^4.0.8", "react": "18.2.0", "react-dom": "18.2.0", "react-markdown": "9.0.0", diff --git a/packages/chat/src/views/VoiceWidget/services/audio.service.ts b/packages/chat/src/views/VoiceWidget/services/audio.service.ts index 8b2314d78..1c9165142 100644 --- a/packages/chat/src/views/VoiceWidget/services/audio.service.ts +++ b/packages/chat/src/views/VoiceWidget/services/audio.service.ts @@ -1,5 +1,3 @@ -import debounce from 'lodash.debounce'; - interface QueueItem { markIndex: number; arrayBuffer: ArrayBuffer; @@ -42,10 +40,12 @@ export class AudioService { private onAudioWaiting = () => { this.sendMark(); + this.onListening(); }; private onAudioStalled = () => { this.sendMark(); + this.onListening(); }; private onAudioPlaying = () => { @@ -87,8 +87,6 @@ export class AudioService { this.playQueue(); } else { this.sendMark(); - this.onListening(); - this.stopAudioDebounced(); } } @@ -136,8 +134,6 @@ export class AudioService { this.audio.src = ''; } - private stopAudioDebounced = debounce(this.stopAudio, 500); - private startAudio() { if (this.stopped || this.mediaSource) return Promise.resolve(); diff --git a/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts b/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts index 2cf58c9f9..033728d0d 100644 --- a/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts +++ b/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts @@ -126,8 +126,6 @@ export class AudioService { sourceOpen = true; ref.outputSourceBuffer = outputMediaSource.addSourceBuffer('audio/mpeg'); - ref.outputSourceBuffer.abort - // (Optional) Set the mode to 'segments' // outputSourceBuffer.mode = 'segments'; diff --git a/packages/chat/src/views/VoiceWidget/services/recorder.service.ts b/packages/chat/src/views/VoiceWidget/services/recorder.service.ts index 6d15a46ac..94d517480 100644 --- a/packages/chat/src/views/VoiceWidget/services/recorder.service.ts +++ b/packages/chat/src/views/VoiceWidget/services/recorder.service.ts @@ -32,8 +32,8 @@ export class RecorderService { return this.createAudioStreamPromise; } - private onMediaRecorderError = (event: ErrorEvent) => { - console.error('MediaRecorder error:', event.error); + private onMediaRecorderError = (event: Event) => { + console.error('MediaRecorder error:', (event as ErrorEvent).error); }; private onMediaRecorderDataAvailable = (event: BlobEvent) => { diff --git a/yarn.lock b/yarn.lock index c568393f3..1819a9585 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7684,7 +7684,6 @@ __metadata: eslint-plugin-storybook: 0.8.0 happy-dom: 14.7.1 http-server: 14.1.1 - lodash.debounce: ^4.0.8 react: 18.2.0 react-dom: 18.2.0 react-markdown: 9.0.0 From 2027c9621df230618647a0ac959e9ab5d61f491e Mon Sep 17 00:00:00 2001 From: Tyler Han Date: Mon, 6 Jan 2025 11:14:06 -0500 Subject: [PATCH 3/5] fix: update segements to 1 second --- .../chat/src/views/VoiceWidget/services/recorder.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/chat/src/views/VoiceWidget/services/recorder.service.ts b/packages/chat/src/views/VoiceWidget/services/recorder.service.ts index 94d517480..adba208fc 100644 --- a/packages/chat/src/views/VoiceWidget/services/recorder.service.ts +++ b/packages/chat/src/views/VoiceWidget/services/recorder.service.ts @@ -62,7 +62,7 @@ export class RecorderService { this.mediaRecorder.addEventListener('error', this.onMediaRecorderError); this.mediaRecorder.addEventListener('dataavailable', this.onMediaRecorderDataAvailable); - this.mediaRecorder?.start(500); + this.mediaRecorder?.start(1000); console.info('Started audio streaming...'); } From 67ec69f23f0a6b82fb9033ed6b23d0ae85f83d94 Mon Sep 17 00:00:00 2001 From: Tyler Han Date: Mon, 6 Jan 2025 11:15:21 -0500 Subject: [PATCH 4/5] fix: remove legacy audio service --- .../services/legacy-audio.service.ts | 180 ------------------ 1 file changed, 180 deletions(-) delete mode 100644 packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts diff --git a/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts b/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts deleted file mode 100644 index 033728d0d..000000000 --- a/packages/chat/src/views/VoiceWidget/services/legacy-audio.service.ts +++ /dev/null @@ -1,180 +0,0 @@ -export class AudioService { - webSocket: WebSocket | null = null; - - private lastMarkIndex = 0; - - private lastSentMarkIndex = 0; - - private audioQueue = Promise.resolve(); - - private talkingTimeout: NodeJS.Timeout | null = null; - - private currentAudioElement: { - outputAudioElement: HTMLAudioElement; - outputSourceBuffer: SourceBuffer | null; - } = { - outputAudioElement: new Audio(), - outputSourceBuffer: null, - }; - - private onMark: (markIndex: number) => void; - - private onListening: () => void; - - private onTimeUpdate: () => void; - - constructor(options: { onMark: (markIndex: number) => void; onListening: () => void; onTimeUpdate: () => void }) { - this.onMark = options.onMark; - this.onListening = options.onListening; - this.onTimeUpdate = options.onTimeUpdate; - } - - sendMark = () => { - if (this.lastMarkIndex === this.lastSentMarkIndex) return; - - this.lastSentMarkIndex = this.lastMarkIndex; - - this.onMark(this.lastMarkIndex); - }; - - base64ToArrayBuffer(base64: string) { - const binaryString = window.atob(base64); - const len = binaryString.length; - const bytes = new Uint8Array(len); - - for (let i = 0; i < len; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - - return bytes.buffer; - } - - waitForBufferUpdateEnd = (outputSourceBuffer: SourceBuffer, chunk: ArrayBuffer) => - new Promise((resolve, reject) => { - function onUpdateEnd() { - outputSourceBuffer.removeEventListener('updateend', onUpdateEnd); - outputSourceBuffer.removeEventListener('error', onError); - resolve(null); - } - - function onError(error: any) { - outputSourceBuffer.removeEventListener('updateend', onUpdateEnd); - outputSourceBuffer.removeEventListener('error', onError); - reject(error); - } - - outputSourceBuffer.addEventListener('updateend', onUpdateEnd); - outputSourceBuffer.addEventListener('error', onError); - - outputSourceBuffer.appendBuffer(chunk); - }); - - async addChunk(base64Chunk: string, markIndex: number) { - const { outputSourceBuffer } = this.currentAudioElement; - - if (!outputSourceBuffer) return; - - this.audioQueue = this.audioQueue.then(async () => { - const chunkBuffer = this.base64ToArrayBuffer(base64Chunk); - - await this.waitForBufferUpdateEnd(outputSourceBuffer, chunkBuffer); - - // 3. Append buffer - if (markIndex > this.lastMarkIndex) { - this.lastMarkIndex = markIndex; - } - - return undefined; - }); - } - - cleanup() { - this.currentAudioElement.outputSourceBuffer?.abort(); - this.currentAudioElement.outputAudioElement.pause(); - this.currentAudioElement.outputAudioElement.currentTime = 0; - this.currentAudioElement.outputAudioElement.src = ''; - } - - interrupt() { - this.cleanup(); - - this.sendMark(); - this.currentAudioElement = this.createAudioElement(); - } - - createAudioElement() { - const outputAudioElement = new Audio(); - - const ref: { - outputAudioElement: HTMLAudioElement; - outputSourceBuffer: SourceBuffer | null; - } = { - outputAudioElement, - outputSourceBuffer: null, - }; - - const outputMediaSource = new MediaSource(); - - outputAudioElement.src = URL.createObjectURL(outputMediaSource); - - // Flag to track if we're initialized - let sourceOpen = false; - - // Called when outputMediaSource is ready to accept data. - function onSourceOpen() { - if (!sourceOpen) { - sourceOpen = true; - ref.outputSourceBuffer = outputMediaSource.addSourceBuffer('audio/mpeg'); - - // (Optional) Set the mode to 'segments' - // outputSourceBuffer.mode = 'segments'; - - console.info('SourceBuffer created: audio/mpeg'); - } - } - - // Listen for MediaSource to be ready - outputMediaSource.addEventListener('sourceopen', onSourceOpen); - outputAudioElement.addEventListener('waiting', this.sendMark); - outputAudioElement.addEventListener('stalled', this.sendMark); - outputAudioElement.addEventListener('timeupdate', () => { - if (outputAudioElement === this.currentAudioElement.outputAudioElement) { - this.onTimeUpdate(); - } - - if (this.talkingTimeout) { - clearTimeout(this.talkingTimeout); - } - - this.talkingTimeout = setTimeout(() => { - this.onListening(); - this.talkingTimeout = null; - }, 500); - }); - - outputAudioElement.play(); - - return ref; - } - - start() { - this.currentAudioElement = this.createAudioElement(); - } - - stop() { - this.cleanup(); - - this.audioQueue = Promise.resolve(); - this.lastMarkIndex = 0; - this.lastSentMarkIndex = 0; - this.currentAudioElement = { - outputAudioElement: new Audio(), - outputSourceBuffer: null, - }; - - if (this.talkingTimeout) { - clearTimeout(this.talkingTimeout); - this.talkingTimeout = null; - } - } -} From 1870a207b4571b978fca537b727dd360db890c81 Mon Sep 17 00:00:00 2001 From: Tyler Han Date: Mon, 6 Jan 2025 12:30:31 -0500 Subject: [PATCH 5/5] fix: remove debounce types --- packages/chat/package.json | 1 - yarn.lock | 10 ---------- 2 files changed, 11 deletions(-) diff --git a/packages/chat/package.json b/packages/chat/package.json index 5ecaf9fa8..69615bc97 100644 --- a/packages/chat/package.json +++ b/packages/chat/package.json @@ -117,7 +117,6 @@ "@testing-library/jest-dom": "6.4.2", "@testing-library/react": "15.0.2", "@types/chroma-js": "2.1.4", - "@types/lodash.debounce": "^4.0.9", "@types/node": "20.12.7", "@types/react": "18.2.8", "@types/react-dom": "18.2.4", diff --git a/yarn.lock b/yarn.lock index 1819a9585..0558fdaf7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6388,15 +6388,6 @@ __metadata: languageName: node linkType: hard -"@types/lodash.debounce@npm:^4.0.9": - version: 4.0.9 - resolution: "@types/lodash.debounce@npm:4.0.9" - dependencies: - "@types/lodash": "*" - checksum: 8183a152e01928e3b97ca773f6ae6038b8695e76493ba8bf6b743ec143948a62294fbc9d49fa4a78b52265b3ba4892ef57534e0c13d04aa0f111671b5a944feb - languageName: node - linkType: hard - "@types/lodash.memoize@npm:^4.1.7": version: 4.1.9 resolution: "@types/lodash.memoize@npm:4.1.9" @@ -7658,7 +7649,6 @@ __metadata: "@testing-library/jest-dom": 6.4.2 "@testing-library/react": 15.0.2 "@types/chroma-js": 2.1.4 - "@types/lodash.debounce": ^4.0.9 "@types/node": 20.12.7 "@types/react": 18.2.8 "@types/react-dom": 18.2.4