diff --git a/package.json b/package.json index f4a30aa..2ee6cd5 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "license": "MIT", "scripts": { "build": "rimraf dist && rollup -c", + "prepare": "npm run build", "docs:generate": "typedoc --options typedoc.json", "docs:deploy": "npm run docs:generate && gh-pages -d docs", "format": "prettier --write src/**/*.ts", diff --git a/src/_module.ts b/src/_module.ts index b3e7b5d..e1b7867 100644 --- a/src/_module.ts +++ b/src/_module.ts @@ -5,6 +5,8 @@ export type { QueryProfilesResponse, QueryTweetsResponse } from './timeline-v1'; export type { Tweet } from './tweets'; export { Space } from './spaces/core/Space' +export { SpaceParticipant } from './spaces/core/SpaceParticipant' +export { Logger } from './spaces/logger' export { SttTtsPlugin } from './spaces/plugins/SttTtsPlugin' export { RecordToDiskPlugin } from './spaces/plugins/RecordToDiskPlugin' export { MonitorAudioPlugin } from './spaces/plugins/MonitorAudioPlugin' diff --git a/src/spaces/core/ChatClient.ts b/src/spaces/core/ChatClient.ts index df2bcb0..b16b770 100644 --- a/src/spaces/core/ChatClient.ts +++ b/src/spaces/core/ChatClient.ts @@ -5,17 +5,40 @@ import { EventEmitter } from 'events'; import type { SpeakerRequest, OccupancyUpdate } from '../types'; import { Logger } from '../logger'; +/** + * Configuration object for ChatClient. + */ interface ChatClientConfig { + /** + * The space ID (e.g., "1vOGwAbcdE...") for this audio space. + */ spaceId: string; + + /** + * The access token obtained from accessChat or the live_video_stream/status. + */ accessToken: string; + + /** + * The endpoint host for the chat server (e.g., "https://prod-chatman-ancillary-eu-central-1.pscp.tv"). + */ endpoint: string; + + /** + * An instance of Logger for debug/info logs. + */ logger: Logger; } +/** + * ChatClient handles the WebSocket connection to the Twitter/Periscope chat API. + * It emits events such as "speakerRequest", "occupancyUpdate", "muteStateChanged", etc. + */ export class ChatClient extends EventEmitter { private ws?: WebSocket; private connected = false; - private logger: Logger; + + private readonly logger: Logger; private readonly spaceId: string; private readonly accessToken: string; private endpoint: string; @@ -28,7 +51,10 @@ export class ChatClient extends EventEmitter { this.logger = config.logger; } - async connect() { + /** + * Establishes a WebSocket connection to the chat endpoint and sets up event handlers. + */ + public async connect(): Promise { const wsUrl = `${this.endpoint}/chatapi/v1/chatnow`.replace( 'https://', 'wss://', @@ -45,9 +71,12 @@ export class ChatClient extends EventEmitter { await this.setupHandlers(); } + /** + * Internal method to set up WebSocket event listeners (open, message, close, error). + */ private setupHandlers(): Promise { if (!this.ws) { - throw new Error('No WebSocket instance'); + throw new Error('[ChatClient] No WebSocket instance available'); } return new Promise((resolve, reject) => { @@ -75,9 +104,13 @@ export class ChatClient extends EventEmitter { }); } - private sendAuthAndJoin() { + /** + * Sends two WebSocket messages to authenticate and join the specified space. + */ + private sendAuthAndJoin(): void { if (!this.ws) return; + // 1) Send authentication (access token) this.ws.send( JSON.stringify({ payload: JSON.stringify({ access_token: this.accessToken }), @@ -85,6 +118,7 @@ export class ChatClient extends EventEmitter { }), ); + // 2) Send a "join" message specifying the room (space ID) this.ws.send( JSON.stringify({ payload: JSON.stringify({ @@ -96,8 +130,18 @@ export class ChatClient extends EventEmitter { ); } - reactWithEmoji(emoji: string) { - if (!this.ws) return; + /** + * Sends an emoji reaction to the chat server. + * @param emoji - The emoji string, e.g. '🔥', '🙏', etc. + */ + public reactWithEmoji(emoji: string): void { + if (!this.ws || !this.connected) { + this.logger.warn( + '[ChatClient] Not connected or WebSocket missing; ignoring reactWithEmoji.', + ); + return; + } + const payload = JSON.stringify({ body: JSON.stringify({ body: emoji, type: 2, v: 2 }), kind: 1, @@ -117,15 +161,20 @@ export class ChatClient extends EventEmitter { }), type: 2, }); + this.ws.send(payload); } - private handleMessage(raw: string) { + /** + * Handles inbound WebSocket messages, parsing JSON payloads + * and emitting relevant events (speakerRequest, occupancyUpdate, etc.). + */ + private handleMessage(raw: string): void { let msg: any; try { msg = JSON.parse(raw); } catch { - return; + return; // Invalid JSON, ignoring } if (!msg.payload) return; @@ -134,6 +183,7 @@ export class ChatClient extends EventEmitter { const body = safeJson(payload.body); + // 1) Speaker request => "guestBroadcastingEvent=1" if (body.guestBroadcastingEvent === 1) { const req: SpeakerRequest = { userId: body.guestRemoteID, @@ -144,6 +194,7 @@ export class ChatClient extends EventEmitter { this.emit('speakerRequest', req); } + // 2) Occupancy update => body.occupancy if (typeof body.occupancy === 'number') { const update: OccupancyUpdate = { occupancy: body.occupancy, @@ -152,6 +203,7 @@ export class ChatClient extends EventEmitter { this.emit('occupancyUpdate', update); } + // 3) Mute/unmute => "guestBroadcastingEvent=16" (mute) or "17" (unmute) if (body.guestBroadcastingEvent === 16) { this.emit('muteStateChanged', { userId: body.guestRemoteID, @@ -164,9 +216,19 @@ export class ChatClient extends EventEmitter { muted: false, }); } - // Example of guest reaction + + // 4) "guestBroadcastingEvent=12" => host accepted a speaker + if (body.guestBroadcastingEvent === 12) { + this.emit('newSpeakerAccepted', { + userId: body.guestRemoteID, + username: body.guestUsername, + sessionUUID: body.sessionUUID, + }); + } + + // 5) Reaction => body.type=2 if (body?.type === 2) { - this.logger.info('[ChatClient] Emitting guest reaction event =>', body); + this.logger.info('[ChatClient] Emitting guestReaction =>', body); this.emit('guestReaction', { displayName: body.displayName, emoji: body.body, @@ -174,7 +236,10 @@ export class ChatClient extends EventEmitter { } } - async disconnect() { + /** + * Closes the WebSocket connection if open, and resets internal state. + */ + public async disconnect(): Promise { if (this.ws) { this.logger.info('[ChatClient] Disconnecting...'); this.ws.close(); @@ -184,6 +249,9 @@ export class ChatClient extends EventEmitter { } } +/** + * Helper function to safely parse JSON without throwing. + */ function safeJson(text: string): any { try { return JSON.parse(text); diff --git a/src/spaces/core/JanusAudio.ts b/src/spaces/core/JanusAudio.ts index 951ed2d..99626be 100644 --- a/src/spaces/core/JanusAudio.ts +++ b/src/spaces/core/JanusAudio.ts @@ -6,14 +6,30 @@ const { nonstandard } = wrtc; const { RTCAudioSource, RTCAudioSink } = nonstandard; import { Logger } from '../logger'; +/** + * Configuration options for the JanusAudioSource. + */ interface AudioSourceOptions { + /** + * Optional logger instance for debug/info/warn logs. + */ logger?: Logger; } +/** + * Configuration options for the JanusAudioSink. + */ interface AudioSinkOptions { + /** + * Optional logger instance for debug/info/warn logs. + */ logger?: Logger; } +/** + * JanusAudioSource wraps a RTCAudioSource, allowing you to push + * raw PCM frames (Int16Array) into the WebRTC pipeline. + */ export class JanusAudioSource extends EventEmitter { private source: any; private readonly track: MediaStreamTrack; @@ -26,16 +42,31 @@ export class JanusAudioSource extends EventEmitter { this.track = this.source.createTrack(); } - getTrack() { + /** + * Returns the MediaStreamTrack associated with this audio source. + */ + public getTrack(): MediaStreamTrack { return this.track; } - pushPcmData(samples: Int16Array, sampleRate: number, channels = 1) { + /** + * Pushes PCM data into the RTCAudioSource. Typically 16-bit, single- or multi-channel frames. + * @param samples - The Int16Array audio samples. + * @param sampleRate - The sampling rate (e.g., 48000). + * @param channels - Number of channels (e.g., 1 for mono). + */ + public pushPcmData( + samples: Int16Array, + sampleRate: number, + channels = 1, + ): void { if (this.logger?.isDebugEnabled()) { this.logger?.debug( - `[JanusAudioSource] pushPcmData => sampleRate=${sampleRate}, channels=${channels}`, + `[JanusAudioSource] pushPcmData => sampleRate=${sampleRate}, channels=${channels}, frames=${samples.length}`, ); } + + // Feed data into the RTCAudioSource this.source.onData({ samples, sampleRate, @@ -46,6 +77,10 @@ export class JanusAudioSource extends EventEmitter { } } +/** + * JanusAudioSink wraps a RTCAudioSink, providing an event emitter + * that forwards raw PCM frames (Int16Array) to listeners. + */ export class JanusAudioSink extends EventEmitter { private sink: any; private active = true; @@ -54,11 +89,15 @@ export class JanusAudioSink extends EventEmitter { constructor(track: MediaStreamTrack, options?: AudioSinkOptions) { super(); this.logger = options?.logger; + if (track.kind !== 'audio') { - throw new Error('JanusAudioSink must be an audio track'); + throw new Error('[JanusAudioSink] Provided track is not an audio track'); } + + // Create RTCAudioSink to listen for PCM frames this.sink = new RTCAudioSink(track); + // Register callback for PCM frames this.sink.ondata = (frame: { samples: Int16Array; sampleRate: number; @@ -66,19 +105,29 @@ export class JanusAudioSink extends EventEmitter { channelCount: number; }) => { if (!this.active) return; + if (this.logger?.isDebugEnabled()) { this.logger?.debug( - `[JanusAudioSink] ondata => sampleRate=${frame.sampleRate}, bitsPerSample=${frame.bitsPerSample}, channelCount=${frame.channelCount}`, + `[JanusAudioSink] ondata => ` + + `sampleRate=${frame.sampleRate}, ` + + `bitsPerSample=${frame.bitsPerSample}, ` + + `channelCount=${frame.channelCount}, ` + + `frames=${frame.samples.length}`, ); } + + // Emit 'audioData' event with the raw PCM frame this.emit('audioData', frame); }; } - stop() { + /** + * Stops receiving audio data. Once called, no further 'audioData' events will be emitted. + */ + public stop(): void { this.active = false; if (this.logger?.isDebugEnabled()) { - this.logger?.debug('[JanusAudioSink] stop'); + this.logger?.debug('[JanusAudioSink] stop called => stopping the sink'); } this.sink?.stop(); } diff --git a/src/spaces/core/JanusClient.ts b/src/spaces/core/JanusClient.ts index 5aef42e..65cdfac 100644 --- a/src/spaces/core/JanusClient.ts +++ b/src/spaces/core/JanusClient.ts @@ -8,18 +8,51 @@ import type { AudioDataWithUser, TurnServersInfo } from '../types'; import { Logger } from '../logger'; interface JanusConfig { + /** + * The base URL for the Janus gateway (e.g. "https://gw-prod-hydra-eu-west-3.pscp.tv/s=prod:XX/v1/gateway") + */ webrtcUrl: string; + + /** + * The unique room ID (e.g., the broadcast or space ID) + */ roomId: string; + + /** + * The token/credential used to authorize requests to Janus (often a signed JWT). + */ credential: string; + + /** + * The user identifier (host or speaker). Used as 'display' in the Janus plugin. + */ userId: string; + + /** + * The name of the stream (often the same as roomId for convenience). + */ streamName: string; + + /** + * ICE / TURN server information returned by Twitter's /turnServers endpoint. + */ turnServers: TurnServersInfo; + + /** + * Logger instance for consistent debug/info/error logs. + */ logger: Logger; } /** - * This class is in charge of the Janus session, handle, - * joining the videoroom, and polling events. + * Manages the Janus session for a Twitter AudioSpace: + * - Creates a Janus session and plugin handle + * - Joins the Janus videoroom as publisher/subscriber + * - Subscribes to other speakers + * - Sends local PCM frames as Opus + * - Polls for Janus events + * + * It can be used by both the host (who creates a room) or a guest speaker (who joins an existing room). */ export class JanusClient extends EventEmitter { private logger: Logger; @@ -27,14 +60,20 @@ export class JanusClient extends EventEmitter { private sessionId?: number; private handleId?: number; private publisherId?: number; + private pc?: RTCPeerConnection; private localAudioSource?: JanusAudioSource; + private pollActive = false; + + // Tracks promises waiting for specific Janus events private eventWaiters: Array<{ predicate: (evt: any) => boolean; resolve: (value: any) => void; reject: (error: Error) => void; }> = []; + + // Tracks subscriber handle+pc for each userId we subscribe to private subscribers = new Map< string, { @@ -48,16 +87,31 @@ export class JanusClient extends EventEmitter { this.logger = config.logger; } - async initialize() { + /** + * Initializes this JanusClient for the host scenario: + * 1) createSession() + * 2) attachPlugin() + * 3) createRoom() + * 4) joinRoom() + * 5) configure local PeerConnection (send audio, etc.) + */ + public async initialize(): Promise { this.logger.debug('[JanusClient] initialize() called'); this.sessionId = await this.createSession(); this.handleId = await this.attachPlugin(); + + // Start polling for Janus events this.pollActive = true; this.startPolling(); + + // Create a new Janus room (only for the host scenario) await this.createRoom(); + + // Join that room as publisher this.publisherId = await this.joinRoom(); + // Set up our RTCPeerConnection for local audio this.pc = new RTCPeerConnection({ iceServers: [ { @@ -69,42 +123,134 @@ export class JanusClient extends EventEmitter { }); this.setupPeerEvents(); + // Add local audio track this.enableLocalAudio(); + + // Create an offer and configure the publisher in Janus await this.configurePublisher(); this.logger.info('[JanusClient] Initialization complete'); } - public async subscribeSpeaker(userId: string): Promise { - this.logger.debug('[JanusClient] subscribeSpeaker => userId=', userId); + /** + * Initializes this JanusClient for a guest speaker scenario: + * 1) createSession() + * 2) attachPlugin() + * 3) join existing room as publisher (no createRoom call) + * 4) configure local PeerConnection + * 5) subscribe to any existing publishers + */ + public async initializeGuestSpeaker(sessionUUID: string): Promise { + this.logger.debug('[JanusClient] initializeGuestSpeaker() called'); - const subscriberHandleId = await this.attachPlugin(); - this.logger.debug('[JanusClient] subscriber handle =>', subscriberHandleId); + // 1) Create a new Janus session + this.sessionId = await this.createSession(); + this.handleId = await this.attachPlugin(); + + // Start polling + this.pollActive = true; + this.startPolling(); - const publishersEvt = await this.waitForJanusEvent( + // 2) Join the existing room as a publisher (no createRoom) + const evtPromise = this.waitForJanusEvent( (e) => e.janus === 'event' && e.plugindata?.plugin === 'janus.plugin.videoroom' && - e.plugindata?.data?.videoroom === 'event' && - Array.isArray(e.plugindata?.data?.publishers) && - e.plugindata?.data?.publishers.length > 0, - 8000, - 'discover feed_id from "publishers"', + e.plugindata?.data?.videoroom === 'joined', + 10000, + 'Guest Speaker joined event', ); - const list = publishersEvt.plugindata.data.publishers as any[]; - const pub = list.find( - (p) => p.display === userId || p.periscope_user_id === userId, + const body = { + request: 'join', + room: this.config.roomId, + ptype: 'publisher', + display: this.config.userId, + periscope_user_id: this.config.userId, + }; + await this.sendJanusMessage(this.handleId, body); + + // Wait for the joined event + const evt = await evtPromise; + const data = evt.plugindata?.data; + this.publisherId = data.id; // Our own publisherId + this.logger.debug( + '[JanusClient] guest joined => publisherId=', + this.publisherId, ); - if (!pub) { - throw new Error( - `[JanusClient] subscribeSpeaker => No publisher found for userId=${userId}`, + + // If there are existing publishers, we can subscribe to them + const publishers = data.publishers || []; + this.logger.debug('[JanusClient] existing publishers =>', publishers); + + // 3) Create RTCPeerConnection for sending local audio + this.pc = new RTCPeerConnection({ + iceServers: [ + { + urls: this.config.turnServers.uris, + username: this.config.turnServers.username, + credential: this.config.turnServers.password, + }, + ], + }); + this.setupPeerEvents(); + this.enableLocalAudio(); + + // 4) configurePublisher => generate offer, wait for answer + await this.configurePublisher(sessionUUID); + + // 5) Subscribe to each existing publisher + await Promise.all( + publishers.map((pub: any) => this.subscribeSpeaker(pub.display, pub.id)), + ); + + this.logger.info('[JanusClient] Guest speaker negotiation complete'); + } + + /** + * Subscribes to a speaker's audio feed by userId and/or feedId. + * If feedId=0, we wait for a "publishers" event to discover feedId. + */ + public async subscribeSpeaker( + userId: string, + feedId: number = 0, + ): Promise { + this.logger.debug('[JanusClient] subscribeSpeaker => userId=', userId); + + // 1) Attach a separate plugin handle for subscriber + const subscriberHandleId = await this.attachPlugin(); + this.logger.debug('[JanusClient] subscriber handle =>', subscriberHandleId); + + // If feedId was not provided, wait for an event listing publishers + if (feedId === 0) { + const publishersEvt = await this.waitForJanusEvent( + (e) => + e.janus === 'event' && + e.plugindata?.plugin === 'janus.plugin.videoroom' && + e.plugindata?.data?.videoroom === 'event' && + Array.isArray(e.plugindata?.data?.publishers) && + e.plugindata?.data?.publishers.length > 0, + 8000, + 'discover feed_id from "publishers"', + ); + + const list = publishersEvt.plugindata.data.publishers as any[]; + const pub = list.find( + (p) => p.display === userId || p.periscope_user_id === userId, ); + if (!pub) { + throw new Error( + `[JanusClient] subscribeSpeaker => No publisher found for userId=${userId}`, + ); + } + feedId = pub.id; + this.logger.debug('[JanusClient] found feedId =>', feedId); } - const feedId = pub.id; - this.logger.debug('[JanusClient] found feedId =>', feedId); + + // Notify listeners that we've discovered a feed this.emit('subscribedSpeaker', { userId, feedId }); + // 2) Join the room as a "subscriber" const joinBody = { request: 'join', room: this.config.roomId, @@ -114,12 +260,13 @@ export class JanusClient extends EventEmitter { { feed: feedId, mid: '0', - send: true, + send: true, // indicates we might send audio? }, ], }; await this.sendJanusMessage(subscriberHandleId, joinBody); + // 3) Wait for "attached" + jsep.offer const attachedEvt = await this.waitForJanusEvent( (e) => e.janus === 'event' && @@ -132,8 +279,8 @@ export class JanusClient extends EventEmitter { ); this.logger.debug('[JanusClient] subscriber => "attached" with offer'); + // 4) Create a new RTCPeerConnection for receiving audio from this feed const offer = attachedEvt.jsep; - const subPc = new RTCPeerConnection({ iceServers: [ { @@ -151,8 +298,10 @@ export class JanusClient extends EventEmitter { evt.track.readyState, evt.track.muted, ); - + // Attach a JanusAudioSink to capture PCM const sink = new JanusAudioSink(evt.track, { logger: this.logger }); + + // For each audio frame, forward it to 'audioDataFromSpeaker' sink.on('audioData', (frame) => { if (this.logger.isDebugEnabled()) { let maxVal = 0; @@ -176,10 +325,12 @@ export class JanusClient extends EventEmitter { }); }; + // 5) Answer the subscription offer await subPc.setRemoteDescription(offer); const answer = await subPc.createAnswer(); await subPc.setLocalDescription(answer); + // 6) Send "start" request to begin receiving await this.sendJanusMessage( subscriberHandleId, { @@ -189,19 +340,28 @@ export class JanusClient extends EventEmitter { }, answer, ); + this.logger.debug('[JanusClient] subscriber => done (user=', userId, ')'); + + // Track this subscription handle+pc by userId this.subscribers.set(userId, { handleId: subscriberHandleId, pc: subPc }); } - pushLocalAudio(samples: Int16Array, sampleRate: number, channels = 1) { + /** + * Pushes local PCM frames to Janus. If the localAudioSource isn't active, it enables it. + */ + public pushLocalAudio(samples: Int16Array, sampleRate: number, channels = 1) { if (!this.localAudioSource) { - this.logger.warn('[JanusClient] No localAudioSource; enabling now...'); + this.logger.warn('[JanusClient] No localAudioSource => enabling now...'); this.enableLocalAudio(); } this.localAudioSource?.pushPcmData(samples, sampleRate, channels); } - enableLocalAudio() { + /** + * Ensures a local audio track is added to the RTCPeerConnection for publishing. + */ + public enableLocalAudio(): void { if (!this.pc) { this.logger.warn( '[JanusClient] enableLocalAudio => No RTCPeerConnection', @@ -212,6 +372,7 @@ export class JanusClient extends EventEmitter { this.logger.debug('[JanusClient] localAudioSource already active'); return; } + // Create a JanusAudioSource that feeds PCM frames this.localAudioSource = new JanusAudioSource({ logger: this.logger }); const track = this.localAudioSource.getTrack(); const localStream = new MediaStream(); @@ -219,7 +380,11 @@ export class JanusClient extends EventEmitter { this.pc.addTrack(track, localStream); } - async stop() { + /** + * Stops the Janus client: ends polling, closes the RTCPeerConnection, etc. + * Does not destroy or leave the room automatically; call destroyRoom() or leaveRoom() if needed. + */ + public async stop(): Promise { this.logger.info('[JanusClient] Stopping...'); this.pollActive = false; if (this.pc) { @@ -228,18 +393,30 @@ export class JanusClient extends EventEmitter { } } - getSessionId() { + /** + * Returns the current Janus sessionId, if any. + */ + public getSessionId(): number | undefined { return this.sessionId; } - getHandleId() { + /** + * Returns the Janus handleId for the publisher, if any. + */ + public getHandleId(): number | undefined { return this.handleId; } - getPublisherId() { + /** + * Returns the Janus publisherId (internal participant ID), if any. + */ + public getPublisherId(): number | undefined { return this.publisherId; } + /** + * Creates a new Janus session via POST /janus (with "janus":"create"). + */ private async createSession(): Promise { const transaction = this.randomTid(); const resp = await fetch(this.config.webrtcUrl, { @@ -264,6 +441,9 @@ export class JanusClient extends EventEmitter { return json.data.id; } + /** + * Attaches to the videoroom plugin via /janus/{sessionId} (with "janus":"attach"). + */ private async attachPlugin(): Promise { if (!this.sessionId) { throw new Error('[JanusClient] attachPlugin => no sessionId'); @@ -291,7 +471,11 @@ export class JanusClient extends EventEmitter { return json.data.id; } - private async createRoom() { + /** + * Creates a Janus room for the host scenario. + * For a guest, this step is skipped (the room already exists). + */ + private async createRoom(): Promise { if (!this.sessionId || !this.handleId) { throw new Error('[JanusClient] createRoom => No session/handle'); } @@ -328,11 +512,10 @@ export class JanusClient extends EventEmitter { } const json = await resp.json(); this.logger.debug('[JanusClient] createRoom =>', JSON.stringify(json)); + if (json.janus === 'error') { throw new Error( - `[JanusClient] createRoom error => ${ - json.error?.reason || 'Unknown error' - }`, + `[JanusClient] createRoom error => ${json.error?.reason || 'Unknown'}`, ); } if (json.plugindata?.data?.videoroom !== 'created') { @@ -347,11 +530,17 @@ export class JanusClient extends EventEmitter { ); } + /** + * Joins the created room as a publisher, for the host scenario. + */ private async joinRoom(): Promise { if (!this.sessionId || !this.handleId) { - throw new Error('[JanusClient] no session/handle'); + throw new Error('[JanusClient] no session/handle for joinRoom()'); } + this.logger.debug('[JanusClient] joinRoom => start'); + + // Wait for the 'joined' event from videoroom const evtPromise = this.waitForJanusEvent( (e) => e.janus === 'event' && @@ -360,6 +549,7 @@ export class JanusClient extends EventEmitter { 12000, 'Host Joined Event', ); + const body = { request: 'join', room: this.config.roomId, @@ -368,22 +558,29 @@ export class JanusClient extends EventEmitter { periscope_user_id: this.config.userId, }; await this.sendJanusMessage(this.handleId, body); + const evt = await evtPromise; const publisherId = evt.plugindata.data.id; this.logger.debug('[JanusClient] joined room => publisherId=', publisherId); return publisherId; } - private async configurePublisher() { + /** + * Creates an SDP offer and sends "configure" to Janus with it. + * Used by both host and guest after attach + join. + */ + private async configurePublisher(sessionUUID: string = ''): Promise { if (!this.pc || !this.sessionId || !this.handleId) { return; } + this.logger.debug('[JanusClient] createOffer...'); const offer = await this.pc.createOffer({ offerToReceiveAudio: true, offerToReceiveVideo: false, }); await this.pc.setLocalDescription(offer); + this.logger.debug('[JanusClient] sending configure with JSEP...'); await this.sendJanusMessage( this.handleId, @@ -391,7 +588,7 @@ export class JanusClient extends EventEmitter { request: 'configure', room: this.config.roomId, periscope_user_id: this.config.userId, - session_uuid: '', + session_uuid: sessionUUID, stream_name: this.config.streamName, vidman_token: this.config.credential, }, @@ -400,13 +597,16 @@ export class JanusClient extends EventEmitter { this.logger.debug('[JanusClient] waiting for answer...'); } + /** + * Sends a "janus":"message" to the Janus handle, optionally with jsep. + */ private async sendJanusMessage( handleId: number, body: any, jsep?: any, ): Promise { if (!this.sessionId) { - throw new Error('[JanusClient] No session'); + throw new Error('[JanusClient] No session for sendJanusMessage'); } const transaction = this.randomTid(); const resp = await fetch( @@ -427,12 +627,15 @@ export class JanusClient extends EventEmitter { ); if (!resp.ok) { throw new Error( - '[JanusClient] sendJanusMessage failed => ' + resp.status, + `[JanusClient] sendJanusMessage failed => status=${resp.status}`, ); } } - private startPolling() { + /** + * Starts polling /janus/{sessionId}?maxev=1 for events. We parse keepalives, answers, etc. + */ + private startPolling(): void { this.logger.debug('[JanusClient] Starting polling...'); const doPoll = async () => { if (!this.pollActive || !this.sessionId) { @@ -460,7 +663,10 @@ export class JanusClient extends EventEmitter { doPoll(); } - private handleJanusEvent(evt: any) { + /** + * Processes each Janus event received from the poll cycle. + */ + private handleJanusEvent(evt: any): void { if (!evt.janus) { return; } @@ -469,18 +675,23 @@ export class JanusClient extends EventEmitter { return; } if (evt.janus === 'webrtcup') { - this.logger.debug('[JanusClient] webrtcup =>', evt.sender); + this.logger.debug('[JanusClient] webrtcup => sender=', evt.sender); } + // If there's a JSEP answer, set it on our RTCPeerConnection if (evt.jsep && evt.jsep.type === 'answer') { this.onReceivedAnswer(evt.jsep); } + // If there's a publisherId in the data, store it if (evt.plugindata?.data?.id) { this.publisherId = evt.plugindata.data.id; } + // If there's an error, emit an 'error' event if (evt.error) { this.logger.error('[JanusClient] Janus error =>', evt.error.reason); this.emit('error', new Error(evt.error.reason)); } + + // Resolve any waiting eventWaiters whose predicate matches for (let i = 0; i < this.eventWaiters.length; i++) { const waiter = this.eventWaiters[i]; if (waiter.predicate(evt)) { @@ -491,7 +702,10 @@ export class JanusClient extends EventEmitter { } } - private async onReceivedAnswer(answer: any) { + /** + * Called whenever we get an SDP "answer" from Janus. Sets the remote description on our PC. + */ + private async onReceivedAnswer(answer: any): Promise { if (!this.pc) { return; } @@ -499,7 +713,10 @@ export class JanusClient extends EventEmitter { await this.pc.setRemoteDescription(answer); } - private setupPeerEvents() { + /** + * Sets up events on our main RTCPeerConnection for ICE changes, track additions, etc. + */ + private setupPeerEvents(): void { if (!this.pc) { return; } @@ -509,20 +726,24 @@ export class JanusClient extends EventEmitter { this.pc?.iceConnectionState, ); if (this.pc?.iceConnectionState === 'failed') { - this.emit('error', new Error('ICE connection failed')); + this.emit('error', new Error('[JanusClient] ICE connection failed')); } }); this.pc.addEventListener('track', (evt) => { - this.logger.debug('[JanusClient] track =>', evt.track.kind); + this.logger.debug('[JanusClient] ontrack => kind=', evt.track.kind); }); } - private randomTid() { + /** + * Generates a random transaction ID for Janus requests. + */ + private randomTid(): string { return Math.random().toString(36).slice(2, 10); } /** - * Allows code to wait for a specific Janus event that matches a predicate + * Waits for a specific Janus event (e.g., "joined", "attached", etc.) + * that matches a given predicate. Times out after timeoutMs if not received. */ private async waitForJanusEvent( predicate: (evt: any) => boolean, @@ -550,6 +771,9 @@ export class JanusClient extends EventEmitter { }); } + /** + * Destroys the Janus room (host only). Does not close local PC or stop polling. + */ public async destroyRoom(): Promise { if (!this.sessionId || !this.handleId) { this.logger.warn('[JanusClient] destroyRoom => no session/handle'); @@ -559,6 +783,7 @@ export class JanusClient extends EventEmitter { this.logger.warn('[JanusClient] destroyRoom => no roomId/userId'); return; } + const transaction = this.randomTid(); const body = { request: 'destroy', @@ -566,6 +791,7 @@ export class JanusClient extends EventEmitter { periscope_user_id: this.config.userId, }; this.logger.info('[JanusClient] destroying room =>', body); + const resp = await fetch( `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, { @@ -589,6 +815,9 @@ export class JanusClient extends EventEmitter { this.logger.debug('[JanusClient] destroyRoom =>', JSON.stringify(json)); } + /** + * Leaves the Janus room if we've joined. Does not close the local PC or stop polling. + */ public async leaveRoom(): Promise { if (!this.sessionId || !this.handleId) { this.logger.warn('[JanusClient] leaveRoom => no session/handle'); @@ -601,6 +830,7 @@ export class JanusClient extends EventEmitter { periscope_user_id: this.config.userId, }; this.logger.info('[JanusClient] leaving room =>', body); + const resp = await fetch( `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, { diff --git a/src/spaces/core/Space.ts b/src/spaces/core/Space.ts index 5883bd5..243f0c7 100644 --- a/src/spaces/core/Space.ts +++ b/src/spaces/core/Space.ts @@ -9,12 +9,12 @@ import { publishBroadcast, authorizeToken, getRegion, + muteSpeaker, + unmuteSpeaker, + setupCommonChatEvents, } from '../utils'; import type { BroadcastCreated, - SpeakerRequest, - OccupancyUpdate, - GuestReaction, Plugin, AudioDataWithUser, PluginRegistration, @@ -32,10 +32,11 @@ export interface SpaceConfig { } /** - * This class orchestrates: - * 1) Creation of the broadcast - * 2) Instantiation of Janus + Chat - * 3) Approve speakers, push audio, etc. + * Manages the creation of a new Space (broadcast host): + * 1) Creates the broadcast on Periscope + * 2) Sets up Janus WebRTC for audio + * 3) Optionally creates a ChatClient for interactive mode + * 4) Allows managing (approve/remove) speakers, pushing audio, etc. */ export class Space extends EventEmitter { private readonly debug: boolean; @@ -43,9 +44,11 @@ export class Space extends EventEmitter { private janusClient?: JanusClient; private chatClient?: ChatClient; + private authToken?: string; private broadcastInfo?: BroadcastCreated; private isInitialized = false; + private plugins = new Set(); private speakers = new Map(); @@ -58,33 +61,41 @@ export class Space extends EventEmitter { this.logger = new Logger(this.debug); } + /** + * Registers a plugin and calls its onAttach(...). + * init(...) will be invoked once initialization is complete. + */ public use(plugin: Plugin, config?: Record) { const registration: PluginRegistration = { plugin, config }; this.plugins.add(registration); this.logger.debug('[Space] Plugin added =>', plugin.constructor.name); + plugin.onAttach?.({ space: this, pluginConfig: config }); - plugin.onAttach?.(this); - + // If we've already initialized this Space, immediately call plugin.init(...) if (this.isInitialized && plugin.init) { - plugin.init({ - space: this, - pluginConfig: config, - }); + plugin.init({ space: this, pluginConfig: config }); + // If Janus is also up, call onJanusReady + if (this.janusClient) { + plugin.onJanusReady?.(this.janusClient); + } } return this; } /** - * Main entry point + * Main entry point to create and initialize the Space broadcast. */ - async initialize(config: SpaceConfig) { + public async initialize(config: SpaceConfig) { this.logger.debug('[Space] Initializing...'); + // 1) Obtain the Periscope cookie + region const cookie = await this.scraper.getPeriscopeCookie(); const region = await getRegion(); this.logger.debug('[Space] Got region =>', region); + + // 2) Create a broadcast this.logger.debug('[Space] Creating broadcast...'); const broadcast = await createBroadcast({ description: config.description, @@ -94,12 +105,15 @@ export class Space extends EventEmitter { }); this.broadcastInfo = broadcast; + // 3) Authorize to get an auth token this.logger.debug('[Space] Authorizing token...'); this.authToken = await authorizeToken(cookie); + // 4) Gather TURN servers this.logger.debug('[Space] Getting turn servers...'); const turnServers = await getTurnServers(cookie); + // 5) Create and initialize Janus for hosting this.janusClient = new JanusClient({ webrtcUrl: broadcast.webrtc_gw_url, roomId: broadcast.room_id, @@ -111,12 +125,13 @@ export class Space extends EventEmitter { }); await this.janusClient.initialize(); + // Forward PCM from Janus to plugin.onAudioData this.janusClient.on('audioDataFromSpeaker', (data: AudioDataWithUser) => { this.logger.debug('[Space] Received PCM from speaker =>', data.userId); this.handleAudioData(data); - // You can store or forward to a plugin, run STT, etc. }); + // Update speaker info once we subscribe this.janusClient.on('subscribedSpeaker', ({ userId, feedId }) => { const speaker = this.speakers.get(userId); if (!speaker) { @@ -128,11 +143,11 @@ export class Space extends EventEmitter { } speaker.janusParticipantId = feedId; this.logger.debug( - `[Space] updated speaker info => userId=${userId}, feedId=${feedId}`, + `[Space] updated speaker => userId=${userId}, feedId=${feedId}`, ); }); - // 7) Publish the broadcast + // 6) Publish the broadcast so it's live this.logger.debug('[Space] Publishing broadcast...'); await publishBroadcast({ title: config.title || '', @@ -143,7 +158,7 @@ export class Space extends EventEmitter { janusPublisherId: this.janusClient.getPublisherId(), }); - // 8) If interactive, open chat + // 7) If interactive => set up ChatClient if (config.mode === 'INTERACTIVE') { this.logger.debug('[Space] Connecting chat...'); this.chatClient = new ChatClient({ @@ -157,66 +172,49 @@ export class Space extends EventEmitter { } this.logger.info('[Space] Initialized =>', broadcast.share_url); - this.isInitialized = true; + // Call plugin.init(...) and onJanusReady(...) for all plugins now that we're set for (const { plugin, config: pluginConfig } of this.plugins) { - if (plugin.init) { - plugin.init({ - space: this, - pluginConfig, - }); - } + plugin.init?.({ space: this, pluginConfig }); + plugin.onJanusReady?.(this.janusClient); } this.logger.debug('[Space] All plugins initialized'); return broadcast; } - reactWithEmoji(emoji: string) { + /** + * Send an emoji reaction via chat, if interactive. + */ + public reactWithEmoji(emoji: string) { if (!this.chatClient) return; this.chatClient.reactWithEmoji(emoji); } + /** + * Internal method to wire up chat events if interactive. + */ private setupChatEvents() { if (!this.chatClient) return; - - this.chatClient.on('speakerRequest', (req: SpeakerRequest) => { - this.logger.info('[Space] Speaker request =>', req); - this.emit('speakerRequest', req); - }); - - this.chatClient.on('occupancyUpdate', (update: OccupancyUpdate) => { - this.logger.debug('[Space] occupancyUpdate =>', update); - this.emit('occupancyUpdate', update); - }); - - this.chatClient.on('muteStateChanged', (evt) => { - this.logger.debug('[Space] muteStateChanged =>', evt); - this.emit('muteStateChanged', evt); - }); - - this.chatClient.on('guestReaction', (reaction: GuestReaction) => { - this.logger.info('[Space] Guest reaction =>', reaction); - this.emit('guestReaction', reaction); - }); + setupCommonChatEvents(this.chatClient, this.logger, this); } /** - * Approves a speaker on Periscope side, then subscribes on Janus side + * Approves a speaker request on Twitter side, then calls Janus to subscribe their audio. */ - async approveSpeaker(userId: string, sessionUUID: string) { + public async approveSpeaker(userId: string, sessionUUID: string) { if (!this.isInitialized || !this.broadcastInfo) { - throw new Error('[Space] Not initialized or no broadcastInfo'); + throw new Error('[Space] Not initialized or missing broadcastInfo'); } - if (!this.authToken) { throw new Error('[Space] No auth token available'); } + // Store in our local speaker map this.speakers.set(userId, { userId, sessionUUID }); - // 1) Call the "request/approve" endpoint + // 1) Call Twitter's /request/approve await this.callApproveEndpoint( this.broadcastInfo, this.authToken, @@ -224,10 +222,13 @@ export class Space extends EventEmitter { sessionUUID, ); - // 2) Subscribe in Janus => receive speaker's audio + // 2) Subscribe to their audio in Janus await this.janusClient?.subscribeSpeaker(userId); } + /** + * Approve request => calls /api/v1/audiospace/request/approve + */ private async callApproveEndpoint( broadcast: BroadcastCreated, authorizationToken: string, @@ -266,20 +267,20 @@ export class Space extends EventEmitter { } /** - * Removes a speaker (userId) on the Twitter side (audiospace/stream/eject) - * then unsubscribes in Janus if needed. + * Removes a speaker from the Twitter side, then unsubscribes in Janus if needed. */ public async removeSpeaker(userId: string) { if (!this.isInitialized || !this.broadcastInfo) { - throw new Error('[Space] Not initialized or no broadcastInfo'); + throw new Error('[Space] Not initialized or missing broadcastInfo'); } if (!this.authToken) { - throw new Error('[Space] No auth token available'); + throw new Error('[Space] No auth token'); } if (!this.janusClient) { - throw new Error('[Space] No Janus client initialized'); + throw new Error('[Space] No Janus client'); } + // Find this speaker in local map const speaker = this.speakers.get(userId); if (!speaker) { throw new Error( @@ -301,16 +302,15 @@ export class Space extends EventEmitter { ); } + // 1) Eject on Twitter side const janusHandleId = this.janusClient.getHandleId(); const janusSessionId = this.janusClient.getSessionId(); - if (!janusHandleId || !janusSessionId) { throw new Error( `[Space] removeSpeaker => missing Janus handle/session for userId=${userId}`, ); } - // 1) Call the eject endpoint await this.callRemoveEndpoint( this.broadcastInfo, this.authToken, @@ -321,14 +321,13 @@ export class Space extends EventEmitter { janusSessionId, ); - // 2) Remove from local speakers map + // 2) Remove from local map this.speakers.delete(userId); - this.logger.info(`[Space] removeSpeaker => removed userId=${userId}`); } /** - * Calls the audiospace/stream/eject endpoint to remove a speaker on Twitter + * Twitter's /api/v1/audiospace/stream/eject call */ private async callRemoveEndpoint( broadcast: BroadcastCreated, @@ -374,23 +373,25 @@ export class Space extends EventEmitter { this.logger.debug('[Space] Speaker removed => sessionUUID=', sessionUUID); } - pushAudio(samples: Int16Array, sampleRate: number) { + /** + * Push PCM audio frames if you're the host. Usually you'd do this if you're capturing + * microphone input from the host side. + */ + public pushAudio(samples: Int16Array, sampleRate: number) { this.janusClient?.pushLocalAudio(samples, sampleRate); } /** - * This method is called by JanusClient on 'audioDataFromSpeaker' - * or we do it from the 'initialize(...)' once Janus is set up. + * Handler for PCM from other speakers, forwarded to plugin.onAudioData */ private handleAudioData(data: AudioDataWithUser) { - // Forward to plugins for (const { plugin } of this.plugins) { plugin.onAudioData?.(data); } } /** - * Gracefully end the Space (stop broadcast, destroy Janus room, etc.) + * Gracefully shut down this Space: destroy the Janus room, end the broadcast, etc. */ public async finalizeSpace(): Promise { this.logger.info('[Space] finalizeSpace => stopping broadcast gracefully'); @@ -425,12 +426,11 @@ export class Space extends EventEmitter { } await Promise.all(tasks); - this.logger.info('[Space] finalizeSpace => done.'); } /** - * Calls the endAudiospace endpoint from Twitter + * Calls /api/v1/audiospace/admin/endAudiospace on Twitter side. */ private async endAudiospace(params: { broadcastId: string; @@ -464,10 +464,106 @@ export class Space extends EventEmitter { this.logger.debug('[Space] endAudiospace => success =>', json); } + /** + * Retrieves an array of known speakers in this Space (by userId and sessionUUID). + */ public getSpeakers(): SpeakerInfo[] { return Array.from(this.speakers.values()); } + /** + * Mute the host (yourself). For the host, session_uuid = '' (empty). + */ + public async muteHost() { + if (!this.authToken) { + throw new Error('[Space] No auth token available'); + } + if (!this.broadcastInfo) { + throw new Error('[Space] No broadcastInfo'); + } + + await muteSpeaker({ + broadcastId: this.broadcastInfo.room_id, + sessionUUID: '', // host => empty + chatToken: this.broadcastInfo.access_token, + authToken: this.authToken, + }); + this.logger.info('[Space] Host muted successfully.'); + } + + /** + * Unmute the host (yourself). + */ + public async unmuteHost() { + if (!this.authToken) { + throw new Error('[Space] No auth token'); + } + if (!this.broadcastInfo) { + throw new Error('[Space] No broadcastInfo'); + } + + await unmuteSpeaker({ + broadcastId: this.broadcastInfo.room_id, + sessionUUID: '', + chatToken: this.broadcastInfo.access_token, + authToken: this.authToken, + }); + this.logger.info('[Space] Host unmuted successfully.'); + } + + /** + * Mute a specific speaker. We'll retrieve sessionUUID from our local map. + */ + public async muteSpeaker(userId: string) { + if (!this.authToken) { + throw new Error('[Space] No auth token available'); + } + if (!this.broadcastInfo) { + throw new Error('[Space] No broadcastInfo'); + } + + const speaker = this.speakers.get(userId); + if (!speaker) { + throw new Error(`[Space] Speaker not found for userId=${userId}`); + } + + await muteSpeaker({ + broadcastId: this.broadcastInfo.room_id, + sessionUUID: speaker.sessionUUID, + chatToken: this.broadcastInfo.access_token, + authToken: this.authToken, + }); + this.logger.info(`[Space] Muted speaker => userId=${userId}`); + } + + /** + * Unmute a specific speaker. We'll retrieve sessionUUID from local map. + */ + public async unmuteSpeaker(userId: string) { + if (!this.authToken) { + throw new Error('[Space] No auth token available'); + } + if (!this.broadcastInfo) { + throw new Error('[Space] No broadcastInfo'); + } + + const speaker = this.speakers.get(userId); + if (!speaker) { + throw new Error(`[Space] Speaker not found for userId=${userId}`); + } + + await unmuteSpeaker({ + broadcastId: this.broadcastInfo.room_id, + sessionUUID: speaker.sessionUUID, + chatToken: this.broadcastInfo.access_token, + authToken: this.authToken, + }); + this.logger.info(`[Space] Unmuted speaker => userId=${userId}`); + } + + /** + * Stop the broadcast entirely, performing finalizeSpace() plus plugin cleanup. + */ public async stop() { this.logger.info('[Space] Stopping...'); @@ -475,14 +571,19 @@ export class Space extends EventEmitter { this.logger.error('[Space] finalizeBroadcast error =>', err); }); + // Disconnect chat if present if (this.chatClient) { await this.chatClient.disconnect(); this.chatClient = undefined; } + + // Stop Janus if running if (this.janusClient) { await this.janusClient.stop(); this.janusClient = undefined; } + + // Cleanup all plugins for (const { plugin } of this.plugins) { plugin.cleanup?.(); } diff --git a/src/spaces/core/SpaceParticipant.ts b/src/spaces/core/SpaceParticipant.ts new file mode 100644 index 0000000..84679ee --- /dev/null +++ b/src/spaces/core/SpaceParticipant.ts @@ -0,0 +1,404 @@ +// src/core/SpaceParticipant.ts + +import { EventEmitter } from 'events'; +import { Logger } from '../logger'; +import { ChatClient } from './ChatClient'; +import { JanusClient } from './JanusClient'; +import { Scraper } from '../../scraper'; +import type { + TurnServersInfo, + Plugin, + PluginRegistration, + AudioDataWithUser, +} from '../types'; +import { + accessChat, + authorizeToken, + getTurnServers, + muteSpeaker, + negotiateGuestStream, + setupCommonChatEvents, + startWatching, + stopWatching, + submitSpeakerRequest, + unmuteSpeaker, + cancelSpeakerRequest, +} from '../utils'; + +interface SpaceParticipantConfig { + spaceId: string; + debug?: boolean; +} + +/** + * Manages joining an existing Space in listener mode, + * and optionally becoming a speaker via WebRTC (Janus). + */ +export class SpaceParticipant extends EventEmitter { + private readonly spaceId: string; + private readonly debug: boolean; + private readonly logger: Logger; + + // Basic auth/cookie data + private cookie?: string; + private authToken?: string; + + // Chat + private chatJwtToken?: string; + private chatToken?: string; + private chatClient?: ChatClient; + + // Watch session + private lifecycleToken?: string; + private watchSession?: string; + + // HLS stream + private hlsUrl?: string; + + // Speaker request + Janus + private sessionUUID?: string; + private janusJwt?: string; + private webrtcGwUrl?: string; + private janusClient?: JanusClient; + + // Plugin management + private plugins = new Set(); + + constructor( + private readonly scraper: Scraper, + config: SpaceParticipantConfig, + ) { + super(); + this.spaceId = config.spaceId; + this.debug = config.debug ?? false; + this.logger = new Logger(this.debug); + } + + /** + * Adds a plugin and calls its onAttach immediately. + * init() or onJanusReady() will be invoked later at the appropriate time. + */ + public use(plugin: Plugin, config?: Record) { + const registration: PluginRegistration = { plugin, config }; + this.plugins.add(registration); + + this.logger.debug( + '[SpaceParticipant] Plugin added =>', + plugin.constructor.name, + ); + + // Call the plugin's onAttach if it exists + plugin.onAttach?.({ space: this, pluginConfig: config }); + + return this; + } + + /** + * Joins the Space as a listener: obtains HLS, chat token, etc. + */ + public async joinAsListener(): Promise { + this.logger.info( + '[SpaceParticipant] Joining space as listener =>', + this.spaceId, + ); + + // 1) Get cookie and authorize + this.cookie = await this.scraper.getPeriscopeCookie(); + this.authToken = await authorizeToken(this.cookie); + + // 2) Retrieve the space metadata for mediaKey + const spaceMeta = await this.scraper.getAudioSpaceById(this.spaceId); + const mediaKey = spaceMeta?.metadata?.media_key; + if (!mediaKey) { + throw new Error('[SpaceParticipant] No mediaKey found in metadata'); + } + this.logger.debug('[SpaceParticipant] mediaKey =>', mediaKey); + + // 3) Query live_video_stream/status for HLS URL and chat token + const status = await this.scraper.getAudioSpaceStreamStatus(mediaKey); + this.hlsUrl = status?.source?.location; + this.chatJwtToken = status?.chatToken; + this.lifecycleToken = status?.lifecycleToken; + this.logger.debug('[SpaceParticipant] HLS =>', this.hlsUrl); + + // 4) Access the chat + if (!this.chatJwtToken) { + throw new Error('[SpaceParticipant] No chatToken found'); + } + const chatInfo = await accessChat(this.chatJwtToken, this.cookie!); + this.chatToken = chatInfo.access_token; + + // 5) Create and connect the ChatClient + this.chatClient = new ChatClient({ + spaceId: chatInfo.room_id, + accessToken: chatInfo.access_token, + endpoint: chatInfo.endpoint, + logger: this.logger, + }); + await this.chatClient.connect(); + this.setupChatEvents(); + + // 6) startWatching (to appear as a viewer) + this.watchSession = await startWatching(this.lifecycleToken!, this.cookie!); + + this.logger.info('[SpaceParticipant] Joined as listener.'); + + // Call plugin.init(...) now that we have basic "listener" mode set up + for (const { plugin, config } of this.plugins) { + plugin.init?.({ space: this, pluginConfig: config }); + } + } + + /** + * Returns the HLS URL if you want to consume the stream as a listener. + */ + public getHlsUrl(): string | undefined { + return this.hlsUrl; + } + + /** + * Submits a speaker request using /audiospace/request/submit. + * Returns the sessionUUID used to track approval. + */ + public async requestSpeaker(): Promise<{ sessionUUID: string }> { + if (!this.chatJwtToken) { + throw new Error( + '[SpaceParticipant] Must join as listener first (no chat token).', + ); + } + if (!this.authToken) { + throw new Error('[SpaceParticipant] No auth token available.'); + } + if (!this.chatToken) { + throw new Error('[SpaceParticipant] No chat token available.'); + } + + this.logger.info('[SpaceParticipant] Submitting speaker request...'); + + const { session_uuid } = await submitSpeakerRequest({ + broadcastId: this.spaceId, + chatToken: this.chatToken, + authToken: this.authToken, + }); + this.sessionUUID = session_uuid; + + this.logger.info( + '[SpaceParticipant] Speaker request submitted =>', + session_uuid, + ); + return { sessionUUID: session_uuid }; + } + + /** + * Cancels a previously submitted speaker request using /audiospace/request/cancel. + * This requires a valid sessionUUID from requestSpeaker() first. + */ + public async cancelSpeakerRequest(): Promise { + if (!this.sessionUUID) { + throw new Error( + '[SpaceParticipant] No sessionUUID; cannot cancel a speaker request that was never submitted.', + ); + } + if (!this.authToken) { + throw new Error('[SpaceParticipant] No auth token available.'); + } + if (!this.chatToken) { + throw new Error('[SpaceParticipant] No chat token available.'); + } + + await cancelSpeakerRequest({ + broadcastId: this.spaceId, + sessionUUID: this.sessionUUID, + chatToken: this.chatToken, + authToken: this.authToken, + }); + + this.logger.info( + '[SpaceParticipant] Speaker request canceled =>', + this.sessionUUID, + ); + this.sessionUUID = undefined; + } + + /** + * Once the host approves our speaker request, we perform Janus negotiation + * to become a speaker. + */ + public async becomeSpeaker(): Promise { + if (!this.sessionUUID) { + throw new Error( + '[SpaceParticipant] No sessionUUID (did you call requestSpeaker()?).', + ); + } + this.logger.info( + '[SpaceParticipant] Negotiating speaker role via Janus...', + ); + + // 1) Retrieve TURN servers + const turnServers: TurnServersInfo = await getTurnServers(this.cookie!); + this.logger.debug('[SpaceParticipant] turnServers =>', turnServers); + + // 2) Negotiate with /audiospace/stream/negotiate + const nego = await negotiateGuestStream({ + broadcastId: this.spaceId, + sessionUUID: this.sessionUUID, + authToken: this.authToken!, + cookie: this.cookie!, + }); + this.janusJwt = nego.janus_jwt; + this.webrtcGwUrl = nego.webrtc_gw_url; + this.logger.debug('[SpaceParticipant] webrtcGwUrl =>', this.webrtcGwUrl); + + // 3) Create JanusClient + this.janusClient = new JanusClient({ + webrtcUrl: this.webrtcGwUrl!, + roomId: this.spaceId, + credential: this.janusJwt!, + userId: turnServers.username.split(':')[1], + streamName: this.spaceId, + turnServers, + logger: this.logger, + }); + + // 4) Initialize the guest speaker session in Janus + await this.janusClient.initializeGuestSpeaker(this.sessionUUID); + + this.janusClient.on('audioDataFromSpeaker', (data: AudioDataWithUser) => { + this.logger.debug( + '[SpaceParticipant] Received speaker audio =>', + data.userId, + ); + this.handleAudioData(data); + }); + + this.logger.info( + '[SpaceParticipant] Now speaker on the Space =>', + this.spaceId, + ); + + // For plugins that need direct Janus references, call onJanusReady + for (const { plugin } of this.plugins) { + plugin.onJanusReady?.(this.janusClient); + } + } + + /** + * Leaves the Space gracefully: + * - Stop Janus if we were a speaker + * - Stop watching as a viewer + * - Disconnect chat + */ + public async leaveSpace(): Promise { + this.logger.info('[SpaceParticipant] Leaving space...'); + + // If speaker, stop Janus + if (this.janusClient) { + await this.janusClient.stop(); + this.janusClient = undefined; + } + + // Stop watching + if (this.watchSession && this.cookie) { + await stopWatching(this.watchSession, this.cookie); + } + + // Disconnect chat + if (this.chatClient) { + await this.chatClient.disconnect(); + this.chatClient = undefined; + } + + this.logger.info('[SpaceParticipant] Left space =>', this.spaceId); + } + + /** + * Pushes PCM audio frames if we're speaker; otherwise logs a warning. + */ + public pushAudio(samples: Int16Array, sampleRate: number) { + if (!this.janusClient) { + this.logger.warn( + '[SpaceParticipant] Not a speaker yet; ignoring pushAudio.', + ); + return; + } + this.janusClient.pushLocalAudio(samples, sampleRate); + } + + /** + * Internal handler for incoming PCM frames from Janus, forwarded to plugin.onAudioData if present. + */ + private handleAudioData(data: AudioDataWithUser) { + for (const { plugin } of this.plugins) { + plugin.onAudioData?.(data); + } + } + + /** + * Sets up chat events: "occupancyUpdate", "newSpeakerAccepted", etc. + */ + private setupChatEvents() { + if (!this.chatClient) return; + setupCommonChatEvents(this.chatClient, this.logger, this); + + this.chatClient.on('newSpeakerAccepted', ({ userId }) => { + this.logger.debug('[SpaceParticipant] newSpeakerAccepted =>', userId); + + // If we haven't created Janus yet, skip + if (!this.janusClient) { + this.logger.warn( + '[SpaceParticipant] No janusClient yet; ignoring new speaker...', + ); + return; + } + // If this is our own handle, skip + if (userId === this.janusClient.getHandleId()) { + return; + } + + // Subscribe to this new speaker's audio + this.janusClient.subscribeSpeaker(userId).catch((err) => { + this.logger.error('[SpaceParticipant] subscribeSpeaker error =>', err); + }); + }); + } + + /** + * Mute self if we are speaker: calls /audiospace/muteSpeaker with our sessionUUID. + */ + public async muteSelf(): Promise { + if (!this.authToken || !this.chatToken) { + throw new Error('[SpaceParticipant] Missing authToken or chatToken.'); + } + if (!this.sessionUUID) { + throw new Error('[SpaceParticipant] No sessionUUID; are you a speaker?'); + } + + await muteSpeaker({ + broadcastId: this.spaceId, + sessionUUID: this.sessionUUID, + chatToken: this.chatToken, + authToken: this.authToken, + }); + this.logger.info('[SpaceParticipant] Successfully muted self.'); + } + + /** + * Unmute self if we are speaker: calls /audiospace/unmuteSpeaker with our sessionUUID. + */ + public async unmuteSelf(): Promise { + if (!this.authToken || !this.chatToken) { + throw new Error('[SpaceParticipant] Missing authToken or chatToken.'); + } + if (!this.sessionUUID) { + throw new Error('[SpaceParticipant] No sessionUUID; are you a speaker?'); + } + + await unmuteSpeaker({ + broadcastId: this.spaceId, + sessionUUID: this.sessionUUID, + chatToken: this.chatToken, + authToken: this.authToken, + }); + this.logger.info('[SpaceParticipant] Successfully unmuted self.'); + } +} diff --git a/src/spaces/plugins/HlsRecordPlugin.ts b/src/spaces/plugins/HlsRecordPlugin.ts index 9f88eac..2e244c0 100644 --- a/src/spaces/plugins/HlsRecordPlugin.ts +++ b/src/spaces/plugins/HlsRecordPlugin.ts @@ -1,99 +1,123 @@ -// src/plugins/HlsRecordPlugin.ts - import { spawn, ChildProcessWithoutNullStreams } from 'child_process'; import { Plugin, OccupancyUpdate } from '../types'; import { Space } from '../core/Space'; import { Logger } from '../logger'; /** - * Plugin that records the final Twitter Spaces HLS mix to a local file. - * It waits for at least one listener to join (occupancy > 0), - * then repeatedly attempts to get the HLS URL from Twitter - * until it is available (HTTP 200), and finally spawns ffmpeg. + * HlsRecordPlugin + * --------------- + * Records the final Twitter Spaces HLS mix to a local .ts file using ffmpeg. + * + * Workflow: + * - Wait for occupancy > 0 (i.e., at least one listener). + * - Attempt to retrieve the HLS URL from Twitter (via scraper). + * - If valid (HTTP 200), spawn ffmpeg to record the stream. + * - If HLS not ready yet (HTTP 404), wait for next occupancy event. + * + * Lifecycle: + * - onAttach(...) => minimal references, logger setup + * - init(...) => fully runs once the Space is created (broadcastInfo ready) + * - cleanup() => stop ffmpeg if running */ export class HlsRecordPlugin implements Plugin { private logger?: Logger; private recordingProcess?: ChildProcessWithoutNullStreams; private isRecording = false; + private outputPath?: string; private mediaKey?: string; private space?: Space; + /** + * You can optionally provide an outputPath in the constructor. + * Alternatively, it can be set via pluginConfig in onAttach/init. + */ constructor(outputPath?: string) { this.outputPath = outputPath; } /** - * Called once the Space has fully initialized (broadcastInfo is ready). - * We store references and subscribe to "occupancyUpdate". + * Called immediately after .use(plugin). We store references here + * (e.g., the space) and create a Logger based on pluginConfig.debug. */ - async init(params: { space: Space; pluginConfig?: Record }) { - const spaceLogger = (params.space as any).logger as Logger | undefined; - if (spaceLogger) { - this.logger = spaceLogger; - } + onAttach(params: { space: Space; pluginConfig?: Record }): void { + this.space = params.space; + + const debug = params.pluginConfig?.debug ?? false; + this.logger = new Logger(debug); + this.logger.info('[HlsRecordPlugin] onAttach => plugin attached'); + + // If outputPath was not passed in constructor, check pluginConfig if (params.pluginConfig?.outputPath) { this.outputPath = params.pluginConfig.outputPath; } + } - this.space = params.space; + /** + * Called once the Space has fully initialized (broadcastInfo is ready). + * We retrieve the media_key from the broadcast, subscribe to occupancy, + * and prepare for recording if occupancy > 0. + */ + async init(params: { space: Space; pluginConfig?: Record }) { + // Merge plugin config again (in case it was not set in onAttach). + if (params.pluginConfig?.outputPath) { + this.outputPath = params.pluginConfig.outputPath; + } - const broadcastInfo = (params.space as any).broadcastInfo; + // Use the same logger from onAttach + const broadcastInfo = (this.space as any)?.broadcastInfo; if (!broadcastInfo || !broadcastInfo.broadcast?.media_key) { - this.logger?.warn( - '[HlsRecordPlugin] No media_key found in broadcastInfo', - ); + this.logger?.warn('[HlsRecordPlugin] No media_key found in broadcastInfo'); return; } this.mediaKey = broadcastInfo.broadcast.media_key; + // If no custom output path was provided, use a default const roomId = broadcastInfo.room_id || 'unknown_room'; if (!this.outputPath) { this.outputPath = `/tmp/record_${roomId}.ts`; } - // Subscribe to occupancyUpdate - this.space.on('occupancyUpdate', (update: OccupancyUpdate) => { + this.logger?.info( + `[HlsRecordPlugin] init => ready to record. Output path="${this.outputPath}"`, + ); + + // Listen for occupancy updates + this.space?.on('occupancyUpdate', (update: OccupancyUpdate) => { this.handleOccupancyUpdate(update).catch((err) => { - this.logger?.error( - '[HlsRecordPlugin] handleOccupancyUpdate error =>', - err, - ); + this.logger?.error('[HlsRecordPlugin] handleOccupancyUpdate =>', err); }); }); } /** - * Called each time occupancyUpdate is emitted. - * If occupancy > 0 and we're not recording yet, we attempt to fetch the HLS URL. - * If the URL is valid (HTTP 200), we launch ffmpeg. + * If occupancy > 0 and we're not recording yet, attempt to fetch the HLS URL + * from Twitter. If it's ready, spawn ffmpeg to record. */ private async handleOccupancyUpdate(update: OccupancyUpdate) { if (!this.space || !this.mediaKey) return; if (this.isRecording) return; - - // We only care if occupancy > 0 (at least one listener). if (update.occupancy <= 0) { this.logger?.debug('[HlsRecordPlugin] occupancy=0 => ignoring'); return; } + this.logger?.debug( + `[HlsRecordPlugin] occupancy=${update.occupancy} => trying to fetch HLS URL...`, + ); + const scraper = (this.space as any).scraper; if (!scraper) { this.logger?.warn('[HlsRecordPlugin] No scraper found on space'); return; } - this.logger?.debug( - `[HlsRecordPlugin] occupancy=${update.occupancy} => trying to fetch HLS URL...`, - ); - try { const status = await scraper.getAudioSpaceStreamStatus(this.mediaKey); if (!status?.source?.location) { this.logger?.debug( - '[HlsRecordPlugin] occupancy>0 but no HLS URL => wait next update', + '[HlsRecordPlugin] occupancy>0 but no HLS URL => wait next update', ); return; } @@ -102,30 +126,63 @@ export class HlsRecordPlugin implements Plugin { const isReady = await this.waitForHlsReady(hlsUrl, 1); if (!isReady) { this.logger?.debug( - '[HlsRecordPlugin] HLS URL 404 => waiting next occupancy update...', + '[HlsRecordPlugin] HLS URL 404 => waiting next occupancy update...', ); return; } - await this.startRecording(hlsUrl); } catch (err) { this.logger?.error('[HlsRecordPlugin] handleOccupancyUpdate =>', err); } } + /** + * HEAD request to see if the HLS URL is returning 200 OK. + * maxRetries=1 => only try once here; rely on occupancy re-calls otherwise. + */ + private async waitForHlsReady( + hlsUrl: string, + maxRetries: number, + ): Promise { + let attempt = 0; + while (attempt < maxRetries) { + try { + const resp = await fetch(hlsUrl, { method: 'HEAD' }); + if (resp.ok) { + this.logger?.debug( + `[HlsRecordPlugin] HLS is ready (attempt #${attempt + 1})`, + ); + return true; + } else { + this.logger?.debug( + `[HlsRecordPlugin] HLS status=${resp.status}, retrying...`, + ); + } + } catch (error) { + this.logger?.debug( + '[HlsRecordPlugin] HLS fetch error =>', + (error as Error).message, + ); + } + attempt++; + await new Promise((r) => setTimeout(r, 2000)); + } + return false; + } + /** * Spawns ffmpeg to record the HLS stream at the given URL. */ private async startRecording(hlsUrl: string): Promise { if (this.isRecording) { - this.logger?.debug('[HlsRecordPlugin] Already recording'); + this.logger?.debug('[HlsRecordPlugin] Already recording, skipping...'); return; } this.isRecording = true; if (!this.outputPath) { this.logger?.warn( - '[HlsRecordPlugin] No output path set, using /tmp/space_record.ts', + '[HlsRecordPlugin] No output path set, using /tmp/space_record.ts', ); this.outputPath = '/tmp/space_record.ts'; } @@ -141,6 +198,7 @@ export class HlsRecordPlugin implements Plugin { this.outputPath, ]); + // Capture stderr for errors or debug info this.recordingProcess.stderr.on('data', (chunk) => { const msg = chunk.toString(); if (msg.toLowerCase().includes('error')) { @@ -153,8 +211,8 @@ export class HlsRecordPlugin implements Plugin { this.recordingProcess.on('close', (code) => { this.isRecording = false; this.logger?.info( - '[HlsRecordPlugin] Recording process closed => code=', - code, + '[HlsRecordPlugin] Recording process closed => code=', + code, ); }); @@ -163,43 +221,9 @@ export class HlsRecordPlugin implements Plugin { }); } - /** - * HEAD request to see if the HLS URL is returning 200 OK. - * maxRetries=1 means we'll just try once here, and rely on occupancyUpdate re-calls for further tries. - */ - private async waitForHlsReady( - hlsUrl: string, - maxRetries: number, - ): Promise { - let attempt = 0; - while (attempt < maxRetries) { - try { - const resp = await fetch(hlsUrl, { method: 'HEAD' }); - if (resp.ok) { - this.logger?.debug( - `[HlsRecordPlugin] HLS is ready (attempt #${attempt + 1})`, - ); - return true; - } else { - this.logger?.debug( - `[HlsRecordPlugin] HLS status=${resp.status}, retrying...`, - ); - } - } catch (error) { - this.logger?.debug( - '[HlsRecordPlugin] HLS fetch error:', - (error as Error).message, - ); - } - - attempt++; - await new Promise((r) => setTimeout(r, 2000)); - } - return false; - } - /** * Called when the plugin is cleaned up (e.g. space.stop()). + * Kills ffmpeg if still running. */ cleanup(): void { if (this.isRecording && this.recordingProcess) { @@ -209,4 +233,4 @@ export class HlsRecordPlugin implements Plugin { this.isRecording = false; } } -} +} \ No newline at end of file diff --git a/src/spaces/plugins/IdleMonitorPlugin.ts b/src/spaces/plugins/IdleMonitorPlugin.ts index 83b53fb..0ddc995 100644 --- a/src/spaces/plugins/IdleMonitorPlugin.ts +++ b/src/spaces/plugins/IdleMonitorPlugin.ts @@ -1,42 +1,57 @@ -// src/plugins/IdleMonitorPlugin.ts - import { Plugin, AudioDataWithUser } from '../types'; import { Space } from '../core/Space'; +import { Logger } from '../logger'; /** - * Plugin that tracks the last speaker audio timestamp - * and the last local audio timestamp to detect overall silence. + * IdleMonitorPlugin + * ----------------- + * Monitors silence in both remote speaker audio and local (pushed) audio. + * If no audio is detected for a specified duration, it emits an 'idleTimeout' event on the space. */ export class IdleMonitorPlugin implements Plugin { private space?: Space; + private logger?: Logger; + private lastSpeakerAudioMs = Date.now(); private lastLocalAudioMs = Date.now(); private checkInterval?: NodeJS.Timeout; /** - * @param idleTimeoutMs How many ms of silence before triggering idle (default 60s) - * @param checkEveryMs Interval for checking silence (default 10s) + * @param idleTimeoutMs The duration (in ms) of total silence before triggering idle. (Default: 60s) + * @param checkEveryMs How frequently (in ms) to check for silence. (Default: 10s) */ constructor( - private idleTimeoutMs: number = 60_000, - private checkEveryMs: number = 10_000, + private idleTimeoutMs: number = 60_000, + private checkEveryMs: number = 10_000, ) {} - onAttach(space: Space) { - this.space = space; - console.log('[IdleMonitorPlugin] onAttach => plugin attached'); + /** + * Called immediately after .use(plugin). + * Allows for minimal setup, including obtaining a debug logger if desired. + */ + onAttach(params: { space: Space; pluginConfig?: Record }): void { + this.space = params.space; + const debug = params.pluginConfig?.debug ?? false; + this.logger = new Logger(debug); + + this.logger.info('[IdleMonitorPlugin] onAttach => plugin attached'); } + /** + * Called once the space has fully initialized (basic mode). + * We set up idle checks and override pushAudio to detect local audio activity. + */ init(params: { space: Space; pluginConfig?: Record }): void { this.space = params.space; - console.log('[IdleMonitorPlugin] init => setting up idle checks'); + this.logger?.info('[IdleMonitorPlugin] init => setting up idle checks'); // Update lastSpeakerAudioMs on incoming speaker audio - this.space.on('audioDataFromSpeaker', (data: AudioDataWithUser) => { + // (Here we're hooking into an event triggered by Space for each speaker's PCM data.) + this.space.on('audioDataFromSpeaker', (_data: AudioDataWithUser) => { this.lastSpeakerAudioMs = Date.now(); }); - // Patch space.pushAudio to update lastLocalAudioMs + // Patch space.pushAudio to track local audio const originalPushAudio = this.space.pushAudio.bind(this.space); this.space.pushAudio = (samples, sampleRate) => { this.lastLocalAudioMs = Date.now(); @@ -47,23 +62,25 @@ export class IdleMonitorPlugin implements Plugin { this.checkInterval = setInterval(() => this.checkIdle(), this.checkEveryMs); } + /** + * Checks if we've exceeded idleTimeoutMs with no audio activity. + * If so, emits an 'idleTimeout' event on the space with { idleMs } info. + */ private checkIdle() { const now = Date.now(); const lastAudio = Math.max(this.lastSpeakerAudioMs, this.lastLocalAudioMs); const idleMs = now - lastAudio; if (idleMs >= this.idleTimeoutMs) { - console.log( - '[IdleMonitorPlugin] idleTimeout => no audio for', - idleMs, - 'ms', + this.logger?.warn( + `[IdleMonitorPlugin] idleTimeout => no audio for ${idleMs}ms`, ); this.space?.emit('idleTimeout', { idleMs }); } } /** - * Returns how many ms have passed since any audio was detected. + * Returns how many milliseconds have passed since any audio was detected (local or speaker). */ public getIdleTimeMs(): number { const now = Date.now(); @@ -71,11 +88,14 @@ export class IdleMonitorPlugin implements Plugin { return now - lastAudio; } + /** + * Cleans up resources (interval) when the plugin is removed or space stops. + */ cleanup(): void { - console.log('[IdleMonitorPlugin] cleanup => stopping idle checks'); + this.logger?.info('[IdleMonitorPlugin] cleanup => stopping idle checks'); if (this.checkInterval) { clearInterval(this.checkInterval); this.checkInterval = undefined; } } -} +} \ No newline at end of file diff --git a/src/spaces/plugins/MonitorAudioPlugin.ts b/src/spaces/plugins/MonitorAudioPlugin.ts index 34ae5ec..e0dadea 100644 --- a/src/spaces/plugins/MonitorAudioPlugin.ts +++ b/src/spaces/plugins/MonitorAudioPlugin.ts @@ -1,19 +1,37 @@ -// src/plugins/MonitorAudioPlugin.ts - import { spawn, ChildProcessWithoutNullStreams } from 'child_process'; import { Plugin, AudioDataWithUser } from '../types'; +import { Logger } from '../logger'; +/** + * MonitorAudioPlugin + * ------------------ + * A simple plugin that spawns an `ffplay` process to play raw PCM audio in real time. + * It reads frames from `onAudioData()` and writes them to ffplay via stdin. + * + * Usage: + * const plugin = new MonitorAudioPlugin(48000, /* debug= *\/ true); + * space.use(plugin); +*/ export class MonitorAudioPlugin implements Plugin { private ffplay?: ChildProcessWithoutNullStreams; + private logger: Logger; + + /** + * @param sampleRate The expected PCM sample rate (e.g. 16000 or 48000). + * @param debug If true, enables debug logging via Logger. + */ + constructor( + private readonly sampleRate = 48000, + debug = false, + ) { + this.logger = new Logger(debug); - constructor(private readonly sampleRate = 48000) { - // spawn ffplay reading raw PCM s16le from stdin - // "-nodisp" hides any video window, "-loglevel quiet" reduces console spam + // Spawn ffplay to read raw PCM (s16le) on stdin this.ffplay = spawn('ffplay', [ '-f', 's16le', '-ar', - this.sampleRate.toString(), // e.g. "16000" + this.sampleRate.toString(), '-ac', '1', // mono '-nodisp', @@ -24,43 +42,48 @@ export class MonitorAudioPlugin implements Plugin { ]); this.ffplay.on('error', (err) => { - console.error('[MonitorAudioPlugin] ffplay error =>', err); + this.logger.error('[MonitorAudioPlugin] ffplay error =>', err); }); + this.ffplay.on('close', (code) => { - console.log('[MonitorAudioPlugin] ffplay closed => code=', code); + this.logger.info('[MonitorAudioPlugin] ffplay closed => code=', code); this.ffplay = undefined; }); - console.log('[MonitorAudioPlugin] Started ffplay for real-time monitoring'); + this.logger.info( + `[MonitorAudioPlugin] Started ffplay for real-time monitoring (sampleRate=${this.sampleRate})` + ); } + /** + * Called whenever PCM frames arrive (from a speaker). + * Writes frames to ffplay's stdin to play them in real time. + */ onAudioData(data: AudioDataWithUser): void { - // TODO: REMOVE DEBUG - // console.log( - // '[MonitorAudioPlugin] onAudioData => user=', - // data.userId, - // 'samples=', - // data.samples.length, - // 'sampleRate=', - // data.sampleRate, - // ); + // Log debug info + this.logger.debug( + `[MonitorAudioPlugin] onAudioData => userId=${data.userId}, samples=${data.samples.length}, sampleRate=${data.sampleRate}`, + ); - // Check sampleRate if needed - if (!this.ffplay?.stdin.writable) return; - - // Suppose data.sampleRate = this.sampleRate - // Convert Int16Array => Buffer - const buf = Buffer.from(data.samples.buffer); + if (!this.ffplay?.stdin.writable) { + return; + } - // Write raw 16-bit PCM samples to ffplay stdin - this.ffplay.stdin.write(buf); + // In this plugin, we assume data.sampleRate matches our expected sampleRate. + // Convert the Int16Array to a Buffer, then write to ffplay stdin. + const pcmBuffer = Buffer.from(data.samples.buffer); + this.ffplay.stdin.write(pcmBuffer); } + /** + * Cleanup is called when the plugin is removed or when the space/participant stops. + * Ends the ffplay process and closes its stdin pipe. + */ cleanup(): void { - console.log('[MonitorAudioPlugin] Cleanup => stopping ffplay'); + this.logger.info('[MonitorAudioPlugin] Cleanup => stopping ffplay'); if (this.ffplay) { - this.ffplay.stdin.end(); // close the pipe - this.ffplay.kill(); // kill ffplay process + this.ffplay.stdin.end(); + this.ffplay.kill(); this.ffplay = undefined; } } diff --git a/src/spaces/plugins/RecordToDiskPlugin.ts b/src/spaces/plugins/RecordToDiskPlugin.ts index 024f1d7..59e5551 100644 --- a/src/spaces/plugins/RecordToDiskPlugin.ts +++ b/src/spaces/plugins/RecordToDiskPlugin.ts @@ -1,16 +1,91 @@ import * as fs from 'fs'; import { AudioDataWithUser, Plugin } from '../types'; +import { Space } from '../core/Space'; +import { SpaceParticipant } from '../core/SpaceParticipant'; +import { Logger } from '../logger'; +interface RecordToDiskPluginConfig { + filePath?: string; + debug?: boolean; // whether to enable verbose logs +} + +/** + * RecordToDiskPlugin + * ------------------ + * A simple plugin that writes all incoming PCM frames to a local .raw file. + * + * Lifecycle: + * - onAttach(...) => minimal references, logger config + * - init(...) => finalize file path, open stream + * - onAudioData(...) => append PCM frames to the file + * - cleanup(...) => close file stream + */ export class RecordToDiskPlugin implements Plugin { - private outStream = fs.createWriteStream('/tmp/speaker_audio.raw'); + private filePath: string = '/tmp/speaker_audio.raw'; + private outStream?: fs.WriteStream; + private logger?: Logger; + + /** + * Called immediately after .use(plugin). + * We create a logger based on pluginConfig.debug and store the file path if provided. + */ + onAttach(params: { + space: Space | SpaceParticipant; + pluginConfig?: Record; + }): void { + const debugEnabled = params.pluginConfig?.debug ?? false; + this.logger = new Logger(debugEnabled); + + this.logger.info('[RecordToDiskPlugin] onAttach => plugin attached'); + + if (params.pluginConfig?.filePath) { + this.filePath = params.pluginConfig.filePath; + } + this.logger.debug('[RecordToDiskPlugin] Using filePath =>', this.filePath); + } + + /** + * Called after the space/participant has joined in basic mode. + * We open the WriteStream to our file path here. + */ + init(params: { + space: Space | SpaceParticipant; + pluginConfig?: Record; + }): void { + // If filePath was re-defined in pluginConfig, re-check: + if (params.pluginConfig?.filePath) { + this.filePath = params.pluginConfig.filePath; + } + this.logger?.info('[RecordToDiskPlugin] init => opening output stream'); + this.outStream = fs.createWriteStream(this.filePath, { flags: 'w' }); + } + + /** + * Called whenever PCM audio frames arrive from a speaker. + * We write them to the file as raw 16-bit PCM. + */ onAudioData(data: AudioDataWithUser): void { - // Convert Int16Array -> Buffer + if (!this.outStream) { + this.logger?.warn('[RecordToDiskPlugin] No outStream yet; ignoring data'); + return; + } const buf = Buffer.from(data.samples.buffer); this.outStream.write(buf); + this.logger?.debug( + `[RecordToDiskPlugin] Wrote ${buf.byteLength} bytes from userId=${data.userId} to disk`, + ); } + /** + * Called when the plugin is cleaned up (e.g. space/participant stop). + * We close our file stream. + */ cleanup(): void { - this.outStream.end(); + this.logger?.info('[RecordToDiskPlugin] cleanup => closing output stream'); + if (this.outStream) { + this.outStream.end(); + this.outStream = undefined; + } } -} +} \ No newline at end of file diff --git a/src/spaces/plugins/SttTtsPlugin.ts b/src/spaces/plugins/SttTtsPlugin.ts index 6acab40..8cf07eb 100644 --- a/src/spaces/plugins/SttTtsPlugin.ts +++ b/src/spaces/plugins/SttTtsPlugin.ts @@ -5,78 +5,125 @@ import path from 'path'; import { spawn } from 'child_process'; import { AudioDataWithUser, Plugin } from '../types'; import { Space } from '../core/Space'; +import { SpaceParticipant } from '../core/SpaceParticipant'; import { JanusClient } from '../core/JanusClient'; +import { Logger } from '../logger'; interface PluginConfig { openAiApiKey?: string; // for STT & ChatGPT elevenLabsApiKey?: string; // for TTS - sttLanguage?: string; // e.g. "en" for Whisper - gptModel?: string; // e.g. "gpt-3.5-turbo" + sttLanguage?: string; // e.g., "en" for Whisper + gptModel?: string; // e.g., "gpt-3.5-turbo" or "gpt-4" silenceThreshold?: number; // amplitude threshold for ignoring silence voiceId?: string; // specify which ElevenLabs voice to use - elevenLabsModel?: string; // e.g. "eleven_monolingual_v1" - systemPrompt?: string; // ex. "You are a helpful AI assistant" + elevenLabsModel?: string; // e.g., "eleven_monolingual_v1" + systemPrompt?: string; // e.g., "You are a helpful AI assistant" chatContext?: Array<{ role: 'system' | 'user' | 'assistant'; content: string; }>; + debug?: boolean; } /** - * MVP plugin for speech-to-text (OpenAI) + conversation + TTS (ElevenLabs) - * Approach: - * - Collect each speaker's unmuted PCM in a memory buffer (only if above silence threshold) - * - On speaker mute -> flush STT -> GPT -> TTS -> push to Janus + * SttTtsPlugin + * ------------ + * Provides an end-to-end flow of: + * - Speech-to-Text (OpenAI Whisper) + * - ChatGPT conversation + * - Text-to-Speech (ElevenLabs) + * - Streams TTS audio frames back to Janus + * + * Lifecycle: + * - onAttach(...) => minimal references + * - init(...) => space or participant has joined in basic mode + * - onJanusReady(...) => we have a JanusClient + * - onAudioData(...) => receiving PCM frames from speakers + * - cleanup(...) => release resources, stop timers, etc. */ export class SttTtsPlugin implements Plugin { - private space?: Space; + // References to the space/participant and the Janus client + private spaceOrParticipant?: Space | SpaceParticipant; private janus?: JanusClient; + // Optional logger retrieved from the space or participant + private logger?: Logger; + + // Credentials & config private openAiApiKey?: string; private elevenLabsApiKey?: string; + private sttLanguage: string = 'en'; + private gptModel: string = 'gpt-3.5-turbo'; + private voiceId: string = '21m00Tcm4TlvDq8ikWAM'; + private elevenLabsModel: string = 'eleven_monolingual_v1'; + private systemPrompt: string = 'You are a helpful AI assistant.'; + private silenceThreshold: number = 50; - private sttLanguage = 'en'; - private gptModel = 'gpt-3.5-turbo'; - private voiceId = '21m00Tcm4TlvDq8ikWAM'; - private elevenLabsModel = 'eleven_monolingual_v1'; - private systemPrompt = 'You are a helpful AI assistant.'; + /** + * chatContext accumulates the conversation for GPT: + * - system: persona instructions + * - user/assistant: running conversation + */ private chatContext: Array<{ role: 'system' | 'user' | 'assistant'; content: string; }> = []; /** - * userId => arrayOfChunks (PCM Int16) + * Maps each userId => array of Int16Array PCM chunks + * Only accumulates data if the speaker is unmuted */ private pcmBuffers = new Map(); /** - * Track mute states: userId => boolean (true=unmuted) + * Tracks which speakers are currently unmuted: + * userId => true/false */ private speakerUnmuted = new Map(); /** - * For ignoring near-silence frames (if amplitude < threshold) + * TTS queue for sequential playback */ - private silenceThreshold = 50; - - // TTS queue for sequentially speaking private ttsQueue: string[] = []; - private isSpeaking = false; + private isSpeaking: boolean = false; - onAttach(space: Space) { - console.log('[SttTtsPlugin] onAttach => space was attached'); - } + /** + * Called immediately after `.use(plugin)`. + * Usually used for storing references or minimal setup. + */ + onAttach(params: { + space: Space | SpaceParticipant; + pluginConfig?: Record; + }): void { + // Store a reference to the space or participant + this.spaceOrParticipant = params.space; - init(params: { space: Space; pluginConfig?: Record }): void { - console.log( - '[SttTtsPlugin] init => Space fully ready. Subscribing to events.', - ); + const debugEnabled = params.pluginConfig?.debug ?? false; + this.logger = new Logger(debugEnabled); - this.space = params.space; - this.janus = (this.space as any)?.janusClient as JanusClient | undefined; + console.log('[SttTtsPlugin] onAttach => plugin attached'); + } + /** + * Called after the space/participant has joined in basic mode (listener + chat). + * This is where we can finalize setup that doesn't require Janus or speaker mode. + */ + init(params: { + space: Space | SpaceParticipant; + pluginConfig?: Record; + }): void { const config = params.pluginConfig as PluginConfig; + + this.logger?.debug('[SttTtsPlugin] init => finalizing basic setup'); + + // Overwrite the local reference with a strong typed one + this.spaceOrParticipant = params.space; + + // If space/participant has a Janus client already, we can store it, + // but typically we rely on "onJanusReady" for that. + this.janus = (this.spaceOrParticipant as any).janusClient; + + // Merge plugin configuration this.openAiApiKey = config?.openAiApiKey; this.elevenLabsApiKey = config?.elevenLabsApiKey; if (config?.sttLanguage) this.sttLanguage = config.sttLanguage; @@ -84,30 +131,27 @@ export class SttTtsPlugin implements Plugin { if (typeof config?.silenceThreshold === 'number') { this.silenceThreshold = config.silenceThreshold; } - if (config?.voiceId) { - this.voiceId = config.voiceId; - } - if (config?.elevenLabsModel) { - this.elevenLabsModel = config.elevenLabsModel; - } - if (config?.systemPrompt) { - this.systemPrompt = config.systemPrompt; - } + if (config?.voiceId) this.voiceId = config.voiceId; + if (config?.elevenLabsModel) this.elevenLabsModel = config.elevenLabsModel; + if (config?.systemPrompt) this.systemPrompt = config.systemPrompt; if (config?.chatContext) { this.chatContext = config.chatContext; } - console.log('[SttTtsPlugin] Plugin config =>', config); - // Listen for mute events - this.space.on( + this.logger?.debug('[SttTtsPlugin] Merged config =>', config); + + // Example: watch for "muteStateChanged" events from the space or participant + this.spaceOrParticipant.on( 'muteStateChanged', (evt: { userId: string; muted: boolean }) => { - console.log('[SttTtsPlugin] Speaker muteStateChanged =>', evt); + this.logger?.debug('[SttTtsPlugin] muteStateChanged =>', evt); if (evt.muted) { - this.handleMute(evt.userId).catch((err) => - console.error('[SttTtsPlugin] handleMute error =>', err), - ); + // If the user just muted, flush STT + this.handleMute(evt.userId).catch((err) => { + this.logger?.error('[SttTtsPlugin] handleMute error =>', err); + }); } else { + // Mark user as unmuted this.speakerUnmuted.set(evt.userId, true); if (!this.pcmBuffers.has(evt.userId)) { this.pcmBuffers.set(evt.userId, []); @@ -118,44 +162,59 @@ export class SttTtsPlugin implements Plugin { } /** - * Called whenever we receive PCM from a speaker + * Called if/when the plugin needs direct access to a JanusClient. + * For example, once the participant becomes a speaker or if a host + * has finished setting up Janus. + */ + onJanusReady(janusClient: JanusClient): void { + this.logger?.debug( + '[SttTtsPlugin] onJanusReady => JanusClient is now available', + ); + this.janus = janusClient; + } + + /** + * onAudioData: triggered for every incoming PCM frame from a speaker. + * We'll accumulate them if that speaker is currently unmuted. */ onAudioData(data: AudioDataWithUser): void { - if (!this.speakerUnmuted.get(data.userId)) return; + const { userId, samples } = data; + if (!this.speakerUnmuted.get(userId)) return; + // Basic amplitude check let maxVal = 0; - for (let i = 0; i < data.samples.length; i++) { - const val = Math.abs(data.samples[i]); + for (let i = 0; i < samples.length; i++) { + const val = Math.abs(samples[i]); if (val > maxVal) maxVal = val; } - if (maxVal < this.silenceThreshold) { - return; - } + if (maxVal < this.silenceThreshold) return; - let arr = this.pcmBuffers.get(data.userId); - if (!arr) { - arr = []; - this.pcmBuffers.set(data.userId, arr); - } - arr.push(data.samples); + // Accumulate frames + const chunks = this.pcmBuffers.get(userId) ?? []; + chunks.push(samples); + this.pcmBuffers.set(userId, chunks); } /** - * On speaker mute => flush STT => GPT => TTS => push to Janus + * handleMute: called when a speaker goes from unmuted to muted. + * We'll flush their collected PCM => STT => GPT => TTS => push to Janus */ private async handleMute(userId: string): Promise { this.speakerUnmuted.set(userId, false); + const chunks = this.pcmBuffers.get(userId) || []; - this.pcmBuffers.set(userId, []); + this.pcmBuffers.set(userId, []); // reset if (!chunks.length) { - console.log('[SttTtsPlugin] No audio chunks for user =>', userId); + this.logger?.debug('[SttTtsPlugin] No audio data => userId=', userId); return; } - console.log( - `[SttTtsPlugin] Flushing STT buffer for user=${userId}, chunks=${chunks.length}`, + + this.logger?.info( + `[SttTtsPlugin] Flushing STT buffer => userId=${userId}, chunkCount=${chunks.length}`, ); + // Merge into one Int16Array const totalLen = chunks.reduce((acc, c) => acc + c.length, 0); const merged = new Int16Array(totalLen); let offset = 0; @@ -164,60 +223,70 @@ export class SttTtsPlugin implements Plugin { offset += c.length; } - // Convert PCM to WAV for STT + // Convert to WAV const wavPath = await this.convertPcmToWav(merged, 48000); - console.log('[SttTtsPlugin] WAV ready =>', wavPath); + this.logger?.debug('[SttTtsPlugin] WAV created =>', wavPath); // Whisper STT const sttText = await this.transcribeWithOpenAI(wavPath, this.sttLanguage); - fs.unlinkSync(wavPath); + fs.unlinkSync(wavPath); // remove temp if (!sttText.trim()) { - console.log('[SttTtsPlugin] No speech recognized for user =>', userId); + this.logger?.debug( + '[SttTtsPlugin] No speech recognized => userId=', + userId, + ); return; } - console.log(`[SttTtsPlugin] STT => user=${userId}, text="${sttText}"`); + this.logger?.info( + `[SttTtsPlugin] STT => userId=${userId}, text="${sttText}"`, + ); - // GPT answer + // GPT response const replyText = await this.askChatGPT(sttText); - console.log(`[SttTtsPlugin] GPT => user=${userId}, reply="${replyText}"`); + this.logger?.info( + `[SttTtsPlugin] GPT => userId=${userId}, reply="${replyText}"`, + ); - // Use the standard speak method with queue + // Send TTS await this.speakText(replyText); } /** - * Public method to queue a TTS request + * speakText: Public method to enqueue a text message for TTS output */ public async speakText(text: string): Promise { this.ttsQueue.push(text); + if (!this.isSpeaking) { this.isSpeaking = true; this.processTtsQueue().catch((err) => { - console.error('[SttTtsPlugin] processTtsQueue error =>', err); + this.logger?.error('[SttTtsPlugin] processTtsQueue error =>', err); }); } } /** - * Process TTS requests one by one + * processTtsQueue: Drains the TTS queue in order, sending frames to Janus */ private async processTtsQueue(): Promise { while (this.ttsQueue.length > 0) { const text = this.ttsQueue.shift(); if (!text) continue; - try { - const ttsAudio = await this.elevenLabsTts(text); - const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); + const mp3Buf = await this.elevenLabsTts(text); + const pcm = await this.convertMp3ToPcm(mp3Buf, 48000); await this.streamToJanus(pcm, 48000); } catch (err) { - console.error('[SttTtsPlugin] TTS streaming error =>', err); + this.logger?.error('[SttTtsPlugin] TTS streaming error =>', err); } } this.isSpeaking = false; } + /** + * convertPcmToWav: Creates a temporary WAV file from raw PCM samples + */ private convertPcmToWav( samples: Int16Array, sampleRate: number, @@ -236,92 +305,79 @@ export class SttTtsPlugin implements Plugin { '-y', tmpPath, ]); + ff.stdin.write(Buffer.from(samples.buffer)); ff.stdin.end(); + ff.on('close', (code) => { - if (code === 0) resolve(tmpPath); - else reject(new Error(`ffmpeg error code=${code}`)); + if (code === 0) { + resolve(tmpPath); + } else { + reject(new Error(`ffmpeg pcm->wav error code=${code}`)); + } }); }); } /** - * OpenAI Whisper STT + * transcribeWithOpenAI: sends the WAV file to OpenAI Whisper */ - private async transcribeWithOpenAI(wavPath: string, language: string) { + private async transcribeWithOpenAI( + wavPath: string, + language: string, + ): Promise { if (!this.openAiApiKey) { - throw new Error('[SttTtsPlugin] No OpenAI API key available'); + throw new Error('[SttTtsPlugin] No OpenAI API key'); } - try { - console.log('[SttTtsPlugin] Transcribe =>', wavPath); + this.logger?.info('[SttTtsPlugin] Transcribing =>', wavPath); + const fileBuffer = fs.readFileSync(wavPath); + this.logger?.debug('[SttTtsPlugin] WAV size =>', fileBuffer.length); - // Read file into buffer - const fileBuffer = fs.readFileSync(wavPath); - console.log( - '[SttTtsPlugin] File read, size:', - fileBuffer.length, - 'bytes', - ); + const blob = new Blob([fileBuffer], { type: 'audio/wav' }); + const formData = new FormData(); + formData.append('file', blob, path.basename(wavPath)); + formData.append('model', 'whisper-1'); + formData.append('language', language); + formData.append('temperature', '0'); - // Create blob from buffer - const blob = new Blob([fileBuffer], { type: 'audio/wav' }); - - // Create FormData - const formData = new FormData(); - formData.append('file', blob, path.basename(wavPath)); - formData.append('model', 'whisper-1'); - formData.append('language', language); - formData.append('temperature', '0'); - - // Call OpenAI API - const response = await fetch( - 'https://api.openai.com/v1/audio/transcriptions', - { - method: 'POST', - headers: { - Authorization: `Bearer ${this.openAiApiKey}`, - }, - body: formData, - }, - ); - if (!response.ok) { - const errorText = await response.text(); - console.error('[SttTtsPlugin] OpenAI API Error:', errorText); - throw new Error(`OpenAI API error: ${response.status} ${errorText}`); - } - const data = (await response.json()) as { text: string }; - return data.text?.trim() || ''; - } catch (err) { - console.error('[SttTtsPlugin] OpenAI STT Error =>', err); - throw new Error('OpenAI STT failed'); + const resp = await fetch('https://api.openai.com/v1/audio/transcriptions', { + method: 'POST', + headers: { Authorization: `Bearer ${this.openAiApiKey}` }, + body: formData, + }); + + if (!resp.ok) { + const errText = await resp.text(); + this.logger?.error('[SttTtsPlugin] OpenAI STT error =>', errText); + throw new Error(`OpenAI STT => ${resp.status} ${errText}`); } + + const data = (await resp.json()) as { text: string }; + return data.text.trim(); } /** - * Simple ChatGPT call + * askChatGPT: sends user text to GPT, returns the assistant reply */ private async askChatGPT(userText: string): Promise { if (!this.openAiApiKey) { - throw new Error('[SttTtsPlugin] No OpenAI API key for ChatGPT'); + throw new Error('[SttTtsPlugin] No OpenAI API key (GPT) provided'); } - const url = 'https://api.openai.com/v1/chat/completions'; + const messages = [ { role: 'system', content: this.systemPrompt }, ...this.chatContext, { role: 'user', content: userText }, ]; - const resp = await fetch(url, { + const resp = await fetch('https://api.openai.com/v1/chat/completions', { method: 'POST', headers: { Authorization: `Bearer ${this.openAiApiKey}`, 'Content-Type': 'application/json', }, - body: JSON.stringify({ - model: this.gptModel, - messages, - }), + body: JSON.stringify({ model: this.gptModel, messages }), }); if (!resp.ok) { @@ -333,18 +389,20 @@ export class SttTtsPlugin implements Plugin { const json = await resp.json(); const reply = json.choices?.[0]?.message?.content || ''; + // Keep conversation context this.chatContext.push({ role: 'user', content: userText }); this.chatContext.push({ role: 'assistant', content: reply }); return reply.trim(); } /** - * ElevenLabs TTS => returns MP3 Buffer + * elevenLabsTts: fetches MP3 audio from ElevenLabs for a given text */ private async elevenLabsTts(text: string): Promise { if (!this.elevenLabsApiKey) { throw new Error('[SttTtsPlugin] No ElevenLabs API key'); } + const url = `https://api.elevenlabs.io/v1/text-to-speech/${this.voiceId}`; const resp = await fetch(url, { method: 'POST', @@ -358,18 +416,20 @@ export class SttTtsPlugin implements Plugin { voice_settings: { stability: 0.4, similarity_boost: 0.8 }, }), }); + if (!resp.ok) { const errText = await resp.text(); throw new Error( - `[SttTtsPlugin] ElevenLabs TTS error => ${resp.status} ${errText}`, + `[SttTtsPlugin] ElevenLabs error => ${resp.status} ${errText}`, ); } - const arrayBuf = await resp.arrayBuffer(); - return Buffer.from(arrayBuf); + + const arrayBuffer = await resp.arrayBuffer(); + return Buffer.from(arrayBuffer); } /** - * Convert MP3 => PCM via ffmpeg + * convertMp3ToPcm: uses ffmpeg to convert an MP3 buffer to raw PCM */ private convertMp3ToPcm( mp3Buf: Buffer, @@ -387,17 +447,18 @@ export class SttTtsPlugin implements Plugin { '1', 'pipe:1', ]); + let raw = Buffer.alloc(0); ff.stdout.on('data', (chunk: Buffer) => { raw = Buffer.concat([raw, chunk]); }); ff.stderr.on('data', () => { - // ignoring ffmpeg logs + // ignoring ffmpeg stderr }); ff.on('close', (code) => { if (code !== 0) { - reject(new Error(`ffmpeg error code=${code}`)); + reject(new Error(`ffmpeg mp3->pcm error code=${code}`)); return; } const samples = new Int16Array( @@ -414,64 +475,76 @@ export class SttTtsPlugin implements Plugin { } /** - * Push PCM back to Janus in small frames - * We'll do 10ms @48k => 960 samples per frame + * streamToJanus: push PCM frames to Janus in small increments (~10ms). */ private async streamToJanus( samples: Int16Array, sampleRate: number, ): Promise { - // TODO: Check if better than 480 fixed - const FRAME_SIZE = Math.floor(sampleRate * 0.01); // 10ms frames => 480 @48kHz + if (!this.janus) { + this.logger?.warn( + '[SttTtsPlugin] No JanusClient available, cannot send TTS audio', + ); + return; + } + + const frameSize = Math.floor(sampleRate * 0.01); // 10ms => e.g. 480 @ 48kHz for ( let offset = 0; - offset + FRAME_SIZE <= samples.length; - offset += FRAME_SIZE + offset + frameSize <= samples.length; + offset += frameSize ) { - const frame = new Int16Array(FRAME_SIZE); - frame.set(samples.subarray(offset, offset + FRAME_SIZE)); - this.janus?.pushLocalAudio(frame, sampleRate, 1); - - // Short pause so we don't overload + const frame = new Int16Array(frameSize); + frame.set(samples.subarray(offset, offset + frameSize)); + this.janus.pushLocalAudio(frame, sampleRate, 1); await new Promise((r) => setTimeout(r, 10)); } } - public setSystemPrompt(prompt: string) { + /** + * setSystemPrompt: update the GPT system prompt at runtime + */ + public setSystemPrompt(prompt: string): void { this.systemPrompt = prompt; - console.log('[SttTtsPlugin] setSystemPrompt =>', prompt); + this.logger?.info('[SttTtsPlugin] setSystemPrompt =>', prompt); } /** - * Change the GPT model at runtime (e.g. "gpt-4", "gpt-3.5-turbo", etc.). + * setGptModel: switch GPT model (e.g. "gpt-4") */ - public setGptModel(model: string) { + public setGptModel(model: string): void { this.gptModel = model; - console.log('[SttTtsPlugin] setGptModel =>', model); + this.logger?.info('[SttTtsPlugin] setGptModel =>', model); } /** - * Add a message (system, user or assistant) to the chat context. - * E.g. to store conversation history or inject a persona. + * addMessage: manually add a system/user/assistant message to the chat context */ - public addMessage(role: 'system' | 'user' | 'assistant', content: string) { + public addMessage( + role: 'system' | 'user' | 'assistant', + content: string, + ): void { this.chatContext.push({ role, content }); - console.log( - `[SttTtsPlugin] addMessage => role=${role}, content=${content}`, + this.logger?.debug( + `[SttTtsPlugin] addMessage => role=${role}, content="${content}"`, ); } /** - * Clear the chat context if needed. + * clearChatContext: resets the GPT conversation */ - public clearChatContext() { + public clearChatContext(): void { this.chatContext = []; - console.log('[SttTtsPlugin] clearChatContext => done'); + this.logger?.debug('[SttTtsPlugin] clearChatContext => done'); } + /** + * cleanup: release resources when the space/participant is stopping or plugin removed + */ cleanup(): void { - console.log('[SttTtsPlugin] cleanup => releasing resources'); + this.logger?.info('[SttTtsPlugin] cleanup => releasing resources'); + this.pcmBuffers.clear(); this.speakerUnmuted.clear(); this.ttsQueue = []; diff --git a/src/spaces/test.ts b/src/spaces/test.ts index 06aeb9c..b22803e 100644 --- a/src/spaces/test.ts +++ b/src/spaces/test.ts @@ -122,7 +122,7 @@ async function main() { console.log('[Test] Speaker request =>', req); await space.approveSpeaker(req.userId, req.sessionUUID); - // Remove the speaker after 10 seconds (testing only) + // Remove the speaker after 60 seconds (testing only) setTimeout(() => { console.log( `[Test] Removing speaker => userId=${req.userId} (after 60s)`, diff --git a/src/spaces/testParticipant.ts b/src/spaces/testParticipant.ts new file mode 100644 index 0000000..4ec0680 --- /dev/null +++ b/src/spaces/testParticipant.ts @@ -0,0 +1,180 @@ +// src/testParticipant.ts + +import 'dotenv/config'; +import { Scraper } from '../scraper'; +import { SpaceParticipant } from './core/SpaceParticipant'; +import { SttTtsPlugin } from './plugins/SttTtsPlugin'; + +/** + * Main test entry point for the "participant" flow: + * - Joins an existing Space in listener mode + * - Requests speaker role + * - Waits for host approval (with a timeout) + * - Optionally sends periodic beep frames if we become speaker + * - Adds a graceful SIGINT handler for cleanup + */ +async function main() { + console.log('[TestParticipant] Starting...'); + + // 1) Twitter login via Scraper + const scraper = new Scraper(); + await scraper.login( + process.env.TWITTER_USERNAME!, + process.env.TWITTER_PASSWORD!, + ); + + // 2) Create the participant + // Replace with your target AudioSpace ID + const audioSpaceId = '1eaKbaNYanvxX'; + const participant = new SpaceParticipant(scraper, { + spaceId: audioSpaceId, + debug: false, + }); + + // Create our TTS/STT plugin instance, just for demonstration + const sttTtsPlugin = new SttTtsPlugin(); + participant.use(sttTtsPlugin, { + openAiApiKey: process.env.OPENAI_API_KEY, + elevenLabsApiKey: process.env.ELEVENLABS_API_KEY, + voiceId: 'D38z5RcWu1voky8WS1ja', // example voice + // systemPrompt: "You are a calm and friendly AI assistant." + }); + + // 3) Join the Space in listener mode + await participant.joinAsListener(); + console.log('[TestParticipant] HLS URL =>', participant.getHlsUrl()); + + // 4) Request the speaker role => returns { sessionUUID } + const { sessionUUID } = await participant.requestSpeaker(); + console.log('[TestParticipant] Requested speaker =>', sessionUUID); + + // 5) Wait for host acceptance with a maximum wait time (e.g., 15 seconds). + try { + await waitForApproval(participant, sessionUUID, 15000); + console.log( + '[TestParticipant] Speaker approval sequence completed (ok or timed out).', + ); + } catch (err) { + console.error('[TestParticipant] Approval error or timeout =>', err); + // Optionally cancel the request if we timed out or got an error + try { + await participant.cancelSpeakerRequest(); + console.log( + '[TestParticipant] Speaker request canceled after timeout or error.', + ); + } catch (cancelErr) { + console.error( + '[TestParticipant] Could not cancel the request =>', + cancelErr, + ); + } + } + + // (Optional) Mute/unmute test + await participant.muteSelf(); + console.log('[TestParticipant] Muted.'); + await new Promise((resolve) => setTimeout(resolve, 3000)); + await participant.unmuteSelf(); + console.log('[TestParticipant] Unmuted.'); + + // --------------------------------------------------------- + // Example beep generation (sends PCM frames if we're speaker) + // --------------------------------------------------------- + const beepDurationMs = 500; + const sampleRate = 16000; + const totalSamples = (sampleRate * beepDurationMs) / 1000; // 8000 + const beepFull = new Int16Array(totalSamples); + + // Sine wave at 440Hz, amplitude ~12000 + const freq = 440; + const amplitude = 12000; + for (let i = 0; i < beepFull.length; i++) { + const t = i / sampleRate; + beepFull[i] = amplitude * Math.sin(2 * Math.PI * freq * t); + } + + const FRAME_SIZE = 160; + async function sendBeep() { + console.log('[TestParticipant] Starting beep...'); + for (let offset = 0; offset < beepFull.length; offset += FRAME_SIZE) { + const portion = beepFull.subarray(offset, offset + FRAME_SIZE); + const frame = new Int16Array(FRAME_SIZE); + frame.set(portion); + participant.pushAudio(frame, sampleRate); + await new Promise((r) => setTimeout(r, 10)); + } + console.log('[TestParticipant] Finished beep.'); + } + + // Example: send beep every 10s + const beepInterval = setInterval(() => { + sendBeep().catch((err) => + console.error('[TestParticipant] beep error =>', err), + ); + }, 10000); + + // Graceful shutdown after 60s + const shutdownTimer = setTimeout(async () => { + await participant.leaveSpace(); + console.log('[TestParticipant] Left space. Bye!'); + process.exit(0); + }, 60000); + + // Catch SIGINT for manual stop + process.on('SIGINT', async () => { + console.log('\n[TestParticipant] Caught interrupt signal, stopping...'); + clearInterval(beepInterval); + clearTimeout(shutdownTimer); + await participant.leaveSpace(); + console.log('[TestParticipant] Space left. Bye!'); + process.exit(0); + }); +} + +/** + * waitForApproval waits until "newSpeakerAccepted" matches our sessionUUID, + * then calls becomeSpeaker() or rejects after a given timeout. + */ +function waitForApproval( + participant: SpaceParticipant, + sessionUUID: string, + timeoutMs = 10000, +): Promise { + return new Promise((resolve, reject) => { + let resolved = false; + + const handler = async (evt: { sessionUUID: string }) => { + if (evt.sessionUUID === sessionUUID) { + resolved = true; + participant.off('newSpeakerAccepted', handler); + try { + await participant.becomeSpeaker(); + console.log('[TestParticipant] Successfully became speaker!'); + resolve(); + } catch (err) { + reject(err); + } + } + }; + + // Listen to "newSpeakerAccepted" from participant + participant.on('newSpeakerAccepted', handler); + + // Timeout to reject if not approved in time + setTimeout(() => { + if (!resolved) { + participant.off('newSpeakerAccepted', handler); + reject( + new Error( + `[TestParticipant] Timed out waiting for speaker approval after ${timeoutMs}ms.`, + ), + ); + } + }, timeoutMs); + }); +} + +main().catch((err) => { + console.error('[TestParticipant] Unhandled error =>', err); + process.exit(1); +}); diff --git a/src/spaces/types.ts b/src/spaces/types.ts index aaaabbc..5b24be3 100644 --- a/src/spaces/types.ts +++ b/src/spaces/types.ts @@ -1,19 +1,51 @@ // src/types.ts import { Space } from './core/Space'; +import { SpaceParticipant } from './core/SpaceParticipant'; +/** + * Basic PCM audio frame properties. + */ export interface AudioData { - bitsPerSample: number; // e.g., 16 - sampleRate: number; // e.g., 48000 - channelCount: number; // e.g., 1 for mono, 2 for stereo - numberOfFrames: number; // how many samples per channel - samples: Int16Array; // the raw PCM data + /** + * Bits per sample (e.g., 16). + */ + bitsPerSample: number; + + /** + * The sample rate in Hz (e.g., 48000 for 48kHz). + */ + sampleRate: number; + + /** + * Number of channels (e.g., 1 for mono, 2 for stereo). + */ + channelCount: number; + + /** + * Number of frames (samples per channel). + */ + numberOfFrames: number; + + /** + * The raw PCM data for all channels (interleaved if stereo). + */ + samples: Int16Array; } +/** + * PCM audio data with an associated user ID, indicating which speaker produced it. + */ export interface AudioDataWithUser extends AudioData { - userId: string; // The ID of the speaker or user + /** + * The ID of the speaker or user who produced this audio frame. + */ + userId: string; } +/** + * Information about a speaker request event in a Space. + */ export interface SpeakerRequest { userId: string; username: string; @@ -21,16 +53,25 @@ export interface SpeakerRequest { sessionUUID: string; } +/** + * Occupancy update describing the number of participants in a Space. + */ export interface OccupancyUpdate { occupancy: number; totalParticipants: number; } +/** + * Represents an emoji reaction event by a user in the chat. + */ export interface GuestReaction { displayName: string; emoji: string; } +/** + * Response structure after creating a broadcast on Periscope/Twitter. + */ export interface BroadcastCreated { room_id: string; credential: string; @@ -47,6 +88,9 @@ export interface BroadcastCreated { stream_url: string; } +/** + * Describes TURN server credentials and URIs. + */ export interface TurnServersInfo { ttl: string; username: string; @@ -54,28 +98,65 @@ export interface TurnServersInfo { uris: string[]; } +/** + * Defines a plugin interface for both Space (broadcast host) and SpaceParticipant (listener/speaker). + * + * Lifecycle hooks: + * - onAttach(...) is called immediately after .use(plugin). + * - init(...) is called after the space or participant has joined in basic mode (listener + chat). + * - onJanusReady(...) is called if/when a JanusClient is created (i.e., speaker mode). + * - onAudioData(...) is called upon receiving raw PCM frames from a speaker. + * - cleanup(...) is called when the space/participant stops or the plugin is removed. + */ export interface Plugin { /** - * onAttach is called immediately when .use(plugin) is invoked, - * passing the Space instance (if needed for immediate usage). + * Called immediately when the plugin is added via .use(plugin). + * Usually used for initial references or minimal setup. + */ + onAttach?(params: { + space: Space | SpaceParticipant; + pluginConfig?: Record; + }): void; + + /** + * Called once the space/participant has fully initialized basic features (chat, HLS, etc.). + * This is the ideal place to finalize setup for plugins that do not require Janus/speaker mode. */ - onAttach?(space: Space): void; + init?(params: { + space: Space | SpaceParticipant; + pluginConfig?: Record; + }): void; /** - * init is called once the Space has *fully* initialized (Janus, broadcast, etc.) - * so the plugin can get references to Janus or final config, etc. + * Called if/when a JanusClient becomes available (e.g., user becomes a speaker). + * Plugins that need direct Janus interactions can implement logic here. */ - init?(params: { space: Space; pluginConfig?: Record }): void; + onJanusReady?(janusClient: any): void; + /** + * Called whenever raw PCM audio frames arrive from a speaker. + * Useful for speech-to-text, analytics, or logging. + */ onAudioData?(data: AudioDataWithUser): void; + + /** + * Cleanup lifecycle hook, invoked when the plugin is removed or the space/participant stops. + * Allows releasing resources, stopping timers, or closing file handles. + */ cleanup?(): void; } +/** + * Internal registration structure for a plugin, used to store the plugin instance + config. + */ export interface PluginRegistration { plugin: Plugin; config?: Record; } +/** + * Stores information about a speaker in a Space (host perspective). + */ export interface SpeakerInfo { userId: string; sessionUUID: string; diff --git a/src/spaces/utils.ts b/src/spaces/utils.ts index 815e58c..cd60429 100644 --- a/src/spaces/utils.ts +++ b/src/spaces/utils.ts @@ -2,7 +2,14 @@ import { Headers } from 'headers-polyfill'; import type { BroadcastCreated, TurnServersInfo } from './types'; +import { ChatClient } from './core/ChatClient'; +import { Logger } from './logger'; +import { EventEmitter } from 'events'; +/** + * Authorizes a token for guest access, using the provided Periscope cookie. + * Returns an authorization token (bearer/JWT-like). + */ export async function authorizeToken(cookie: string): Promise { const headers = new Headers({ 'X-Periscope-User-Agent': 'Twitter/m5', @@ -22,17 +29,25 @@ export async function authorizeToken(cookie: string): Promise { }); if (!resp.ok) { - throw new Error(`Failed to authorize token => ${resp.status}`); + throw new Error( + `authorizeToken => request failed with status ${resp.status}`, + ); } const data = (await resp.json()) as { authorization_token: string }; if (!data.authorization_token) { - throw new Error('authorizeToken: Missing authorization_token in response'); + throw new Error( + 'authorizeToken => Missing authorization_token in response', + ); } return data.authorization_token; } +/** + * Publishes a newly created broadcast (Space) to make it live/visible. + * Generally invoked after creating the broadcast and initializing Janus. + */ export async function publishBroadcast(params: { title: string; broadcast: BroadcastCreated; @@ -40,7 +55,7 @@ export async function publishBroadcast(params: { janusSessionId?: number; janusHandleId?: number; janusPublisherId?: number; -}) { +}): Promise { const headers = new Headers({ 'X-Periscope-User-Agent': 'Twitter/m5', 'Content-Type': 'application/json', @@ -66,6 +81,9 @@ export async function publishBroadcast(params: { }); } +/** + * Retrieves TURN server credentials and URIs from Periscope. + */ export async function getTurnServers(cookie: string): Promise { const headers = new Headers({ 'X-Periscope-User-Agent': 'Twitter/m5', @@ -80,12 +98,16 @@ export async function getTurnServers(cookie: string): Promise { headers, body: JSON.stringify({ cookie }), }); - if (!resp.ok) throw new Error('Failed to get turn servers => ' + resp.status); + if (!resp.ok) { + throw new Error( + `getTurnServers => request failed with status ${resp.status}`, + ); + } return resp.json(); } /** - * Get region from signer.pscp.tv + * Obtains the region from signer.pscp.tv, typically used when creating a broadcast. */ export async function getRegion(): Promise { const resp = await fetch('https://signer.pscp.tv/region', { @@ -97,14 +119,15 @@ export async function getRegion(): Promise { body: JSON.stringify({}), }); if (!resp.ok) { - throw new Error(`Failed to get region => ${resp.status}`); + throw new Error(`getRegion => request failed with status ${resp.status}`); } const data = (await resp.json()) as { region: string }; return data.region; } /** - * Create broadcast on Periscope + * Creates a new broadcast on Periscope/Twitter. + * Used by the host to create the underlying audio-room structure. */ export async function createBroadcast(params: { description?: string; @@ -141,9 +164,363 @@ export async function createBroadcast(params: { if (!resp.ok) { const text = await resp.text(); - throw new Error(`Failed to create broadcast => ${resp.status} ${text}`); + throw new Error( + `createBroadcast => request failed with status ${resp.status} ${text}`, + ); } const data = await resp.json(); return data as BroadcastCreated; } + +/** + * Acquires chat access info (token, endpoint, etc.) from Periscope. + * Needed to connect via WebSocket to the chat server. + */ +export async function accessChat( + chatToken: string, + cookie: string, +): Promise { + const url = 'https://proxsee.pscp.tv/api/v2/accessChat'; + const headers = new Headers({ + 'Content-Type': 'application/json', + 'X-Periscope-User-Agent': 'Twitter/m5', + }); + + const body = { + chat_token: chatToken, + cookie, + }; + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + throw new Error(`accessChat => request failed with status ${resp.status}`); + } + return resp.json(); +} + +/** + * Registers this client as a viewer (POST /startWatching), returning a watch session token. + */ +export async function startWatching( + lifecycleToken: string, + cookie: string, +): Promise { + const url = 'https://proxsee.pscp.tv/api/v2/startWatching'; + const headers = new Headers({ + 'Content-Type': 'application/json', + 'X-Periscope-User-Agent': 'Twitter/m5', + }); + + const body = { + auto_play: false, + life_cycle_token: lifecycleToken, + cookie, + }; + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + throw new Error( + `startWatching => request failed with status ${resp.status}`, + ); + } + const json = await resp.json(); + // Typically returns { session: "...someToken..." } + return json.session; +} + +/** + * Deregisters this client from viewing the broadcast (POST /stopWatching). + */ +export async function stopWatching( + session: string, + cookie: string, +): Promise { + const url = 'https://proxsee.pscp.tv/api/v2/stopWatching'; + const headers = new Headers({ + 'Content-Type': 'application/json', + 'X-Periscope-User-Agent': 'Twitter/m5', + }); + + const body = { session, cookie }; + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + throw new Error( + `stopWatching => request failed with status ${resp.status}`, + ); + } +} + +/** + * Optional step: join an existing AudioSpace (POST /audiospace/join). + * This might be required before you can request speaker. + */ +export async function joinAudioSpace(params: { + broadcastId: string; + chatToken: string; + authToken: string; + joinAsAdmin?: boolean; + shouldAutoJoin?: boolean; +}): Promise { + const url = 'https://guest.pscp.tv/api/v1/audiospace/join'; + + const body = { + ntpForBroadcasterFrame: '2208988800031000000', + ntpForLiveFrame: '2208988800031000000', + broadcast_id: params.broadcastId, + join_as_admin: params.joinAsAdmin ?? false, + should_auto_join: params.shouldAutoJoin ?? false, + chat_token: params.chatToken, + }; + + const headers = new Headers({ + 'Content-Type': 'application/json', + Authorization: params.authToken, + }); + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + + if (!resp.ok) { + throw new Error( + `joinAudioSpace => request failed with status ${resp.status}`, + ); + } + // Typically returns { can_auto_join: boolean } etc. + return resp.json(); +} + +/** + * Submits a speaker request (POST /audiospace/request/submit), + * returning the session UUID you need for negotiation. + */ +export async function submitSpeakerRequest(params: { + broadcastId: string; + chatToken: string; + authToken: string; +}): Promise<{ session_uuid: string }> { + const url = 'https://guest.pscp.tv/api/v1/audiospace/request/submit'; + const headers = new Headers({ + 'Content-Type': 'application/json', + Authorization: params.authToken, + }); + + const body = { + ntpForBroadcasterFrame: '2208988800030000000', + ntpForLiveFrame: '2208988800030000000', + broadcast_id: params.broadcastId, + chat_token: params.chatToken, + }; + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + throw new Error( + `submitSpeakerRequest => request failed with status ${resp.status}`, + ); + } + return resp.json(); +} + +/** + * Cancels a previously submitted speaker request (POST /audiospace/request/cancel). + * Only valid if a request/submit was made first with a sessionUUID. + */ +export async function cancelSpeakerRequest(params: { + broadcastId: string; + sessionUUID: string; + chatToken: string; + authToken: string; +}): Promise { + const url = 'https://guest.pscp.tv/api/v1/audiospace/request/cancel'; + const headers = new Headers({ + 'Content-Type': 'application/json', + Authorization: params.authToken, + }); + + const body = { + ntpForBroadcasterFrame: '2208988800002000000', + ntpForLiveFrame: '2208988800002000000', + broadcast_id: params.broadcastId, + session_uuid: params.sessionUUID, + chat_token: params.chatToken, + }; + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + throw new Error( + `cancelSpeakerRequest => request failed with status ${resp.status}`, + ); + } + // Typically returns { "success": true } + return resp.json(); +} + +/** + * Negotiates a guest streaming session (POST /audiospace/stream/negotiate), + * returning a Janus JWT and gateway URL for WebRTC. + */ +export async function negotiateGuestStream(params: { + broadcastId: string; + sessionUUID: string; + authToken: string; + cookie: string; +}): Promise<{ janus_jwt: string; webrtc_gw_url: string }> { + const url = 'https://guest.pscp.tv/api/v1/audiospace/stream/negotiate'; + const headers = new Headers({ + 'Content-Type': 'application/json', + Authorization: params.authToken, + }); + + const body = { + session_uuid: params.sessionUUID, + }; + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + throw new Error( + `negotiateGuestStream => request failed with status ${resp.status}`, + ); + } + return resp.json(); +} + +/** + * Mutes a speaker (POST /audiospace/muteSpeaker). + * If called by the host, sessionUUID is "". + * If called by a speaker, pass your own sessionUUID. + */ +export async function muteSpeaker(params: { + broadcastId: string; + sessionUUID?: string; + chatToken: string; + authToken: string; +}): Promise { + const url = 'https://guest.pscp.tv/api/v1/audiospace/muteSpeaker'; + + const body = { + ntpForBroadcasterFrame: 2208988800031000000, + ntpForLiveFrame: 2208988800031000000, + session_uuid: params.sessionUUID ?? '', + broadcast_id: params.broadcastId, + chat_token: params.chatToken, + }; + + const headers = new Headers({ + 'Content-Type': 'application/json', + Authorization: params.authToken, + }); + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + const text = await resp.text(); + throw new Error(`muteSpeaker => ${resp.status} ${text}`); + } +} + +/** + * Unmutes a speaker (POST /audiospace/unmuteSpeaker). + * If called by the host, sessionUUID is "". + * If called by a speaker, pass your own sessionUUID. + */ +export async function unmuteSpeaker(params: { + broadcastId: string; + sessionUUID?: string; + chatToken: string; + authToken: string; +}): Promise { + const url = 'https://guest.pscp.tv/api/v1/audiospace/unmuteSpeaker'; + + const body = { + ntpForBroadcasterFrame: 2208988800031000000, + ntpForLiveFrame: 2208988800031000000, + session_uuid: params.sessionUUID ?? '', + broadcast_id: params.broadcastId, + chat_token: params.chatToken, + }; + + const headers = new Headers({ + 'Content-Type': 'application/json', + Authorization: params.authToken, + }); + + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + if (!resp.ok) { + const text = await resp.text(); + throw new Error(`unmuteSpeaker => ${resp.status} ${text}`); + } +} + +/** + * Common chat events helper. Attaches listeners to a ChatClient, then re-emits them + * through a given EventEmitter (e.g. Space or SpaceParticipant). + */ +export function setupCommonChatEvents( + chatClient: ChatClient, + logger: Logger, + emitter: EventEmitter, +): void { + // Occupancy updates + chatClient.on('occupancyUpdate', (upd) => { + logger.debug('[ChatEvents] occupancyUpdate =>', upd); + emitter.emit('occupancyUpdate', upd); + }); + + // Reaction events + chatClient.on('guestReaction', (reaction) => { + logger.debug('[ChatEvents] guestReaction =>', reaction); + emitter.emit('guestReaction', reaction); + }); + + // Mute state changes + chatClient.on('muteStateChanged', (evt) => { + logger.debug('[ChatEvents] muteStateChanged =>', evt); + emitter.emit('muteStateChanged', evt); + }); + + // Speaker requests + chatClient.on('speakerRequest', (req) => { + logger.debug('[ChatEvents] speakerRequest =>', req); + emitter.emit('speakerRequest', req); + }); + + // Additional event example: new speaker accepted + chatClient.on('newSpeakerAccepted', (info) => { + logger.debug('[ChatEvents] newSpeakerAccepted =>', info); + emitter.emit('newSpeakerAccepted', info); + }); +}