diff --git a/package-lock.json b/package-lock.json index 6754adf..e4a41b3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,23 +1,21 @@ { "name": "agent-twitter-client", - "version": "0.0.17", + "version": "0.0.18", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "agent-twitter-client", - "version": "0.0.17", + "version": "0.0.18", "license": "MIT", "dependencies": { "@roamhq/wrtc": "^0.8.0", "@sinclair/typebox": "^0.32.20", "headers-polyfill": "^3.1.2", "json-stable-stringify": "^1.0.2", - "node-fetch": "^3.3.2", "otpauth": "^9.2.2", "set-cookie-parser": "^2.6.0", "tough-cookie": "^4.1.2", - "tslib": "^2.5.2", "twitter-api-v2": "^1.18.2", "undici": "^7.1.1", "ws": "^8.18.0" @@ -28,7 +26,7 @@ "@tsconfig/node16": "^16.1.3", "@types/jest": "^29.5.1", "@types/json-stable-stringify": "^1.0.34", - "@types/node": "^22.9.1", + "@types/node": "^22.10.2", "@types/set-cookie-parser": "^2.4.2", "@types/tough-cookie": "^4.0.2", "@types/ws": "^8.5.13", @@ -3694,15 +3692,6 @@ "node": ">=8" } }, - "node_modules/data-uri-to-buffer": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/data-uri-to-buffer/-/data-uri-to-buffer-4.0.1.tgz", - "integrity": "sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==", - "license": "MIT", - "engines": { - "node": ">= 12" - } - }, "node_modules/debug": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", @@ -4500,29 +4489,6 @@ "bser": "2.1.1" } }, - "node_modules/fetch-blob": { - "version": "3.2.0", - "resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.2.0.tgz", - "integrity": "sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/jimmywarting" - }, - { - "type": "paypal", - "url": "https://paypal.me/jimmywarting" - } - ], - "license": "MIT", - "dependencies": { - "node-domexception": "^1.0.0", - "web-streams-polyfill": "^3.0.3" - }, - "engines": { - "node": "^12.20 || >= 14.13" - } - }, "node_modules/file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", @@ -4714,18 +4680,6 @@ "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/formdata-polyfill": { - "version": "4.0.10", - "resolved": "https://registry.npmjs.org/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz", - "integrity": "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==", - "license": "MIT", - "dependencies": { - "fetch-blob": "^3.1.2" - }, - "engines": { - "node": ">=12.20.0" - } - }, "node_modules/fs-extra": { "version": "11.2.0", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.2.0.tgz", @@ -7035,43 +6989,6 @@ "dev": true, "license": "MIT" }, - "node_modules/node-domexception": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz", - "integrity": "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/jimmywarting" - }, - { - "type": "github", - "url": "https://paypal.me/jimmywarting" - } - ], - "license": "MIT", - "engines": { - "node": ">=10.5.0" - } - }, - "node_modules/node-fetch": { - "version": "3.3.2", - "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-3.3.2.tgz", - "integrity": "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==", - "license": "MIT", - "dependencies": { - "data-uri-to-buffer": "^4.0.0", - "fetch-blob": "^3.1.4", - "formdata-polyfill": "^4.0.10" - }, - "engines": { - "node": "^12.20.0 || ^14.13.1 || >=16.0.0" - }, - "funding": { - "type": "opencollective", - "url": "https://opencollective.com/node-fetch" - } - }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -8888,12 +8805,6 @@ "dev": true, "license": "MIT" }, - "node_modules/tslib": { - "version": "2.8.1", - "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", - "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "license": "0BSD" - }, "node_modules/tsutils": { "version": "3.21.0", "resolved": "https://registry.npmjs.org/tsutils/-/tsutils-3.21.0.tgz", @@ -9162,15 +9073,6 @@ "makeerror": "1.0.12" } }, - "node_modules/web-streams-polyfill": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.3.3.tgz", - "integrity": "sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw==", - "license": "MIT", - "engines": { - "node": ">= 8" - } - }, "node_modules/webidl-conversions": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", diff --git a/package.json b/package.json index 8b01738..b036a65 100644 --- a/package.json +++ b/package.json @@ -30,11 +30,9 @@ "@sinclair/typebox": "^0.32.20", "headers-polyfill": "^3.1.2", "json-stable-stringify": "^1.0.2", - "node-fetch": "^3.3.2", "otpauth": "^9.2.2", "set-cookie-parser": "^2.6.0", "tough-cookie": "^4.1.2", - "tslib": "^2.5.2", "twitter-api-v2": "^1.18.2", "undici": "^7.1.1", "ws": "^8.18.0" @@ -45,7 +43,7 @@ "@tsconfig/node16": "^16.1.3", "@types/jest": "^29.5.1", "@types/json-stable-stringify": "^1.0.34", - "@types/node": "^22.9.1", + "@types/node": "^22.10.2", "@types/set-cookie-parser": "^2.4.2", "@types/tough-cookie": "^4.0.2", "@types/ws": "^8.5.13", diff --git a/src/_module.ts b/src/_module.ts index c31b50c..b3e7b5d 100644 --- a/src/_module.ts +++ b/src/_module.ts @@ -9,5 +9,6 @@ export { SttTtsPlugin } from './spaces/plugins/SttTtsPlugin' export { RecordToDiskPlugin } from './spaces/plugins/RecordToDiskPlugin' export { MonitorAudioPlugin } from './spaces/plugins/MonitorAudioPlugin' export { IdleMonitorPlugin } from './spaces/plugins/IdleMonitorPlugin' +export { HlsRecordPlugin } from './spaces/plugins/HlsRecordPlugin' export * from './types/spaces' diff --git a/src/spaces/core/ChatClient.ts b/src/spaces/core/ChatClient.ts index b02de4a..df2bcb0 100644 --- a/src/spaces/core/ChatClient.ts +++ b/src/spaces/core/ChatClient.ts @@ -3,17 +3,29 @@ import WebSocket from 'ws'; import { EventEmitter } from 'events'; import type { SpeakerRequest, OccupancyUpdate } from '../types'; +import { Logger } from '../logger'; + +interface ChatClientConfig { + spaceId: string; + accessToken: string; + endpoint: string; + logger: Logger; +} export class ChatClient extends EventEmitter { private ws?: WebSocket; private connected = false; + private logger: Logger; + private readonly spaceId: string; + private readonly accessToken: string; + private endpoint: string; - constructor( - private readonly spaceId: string, - private readonly accessToken: string, - private readonly endpoint: string, - ) { + constructor(config: ChatClientConfig) { super(); + this.spaceId = config.spaceId; + this.accessToken = config.accessToken; + this.endpoint = config.endpoint; + this.logger = config.logger; } async connect() { @@ -21,7 +33,7 @@ export class ChatClient extends EventEmitter { 'https://', 'wss://', ); - console.log('[ChatClient] Connecting =>', wsUrl); + this.logger.info('[ChatClient] Connecting =>', wsUrl); this.ws = new WebSocket(wsUrl, { headers: { @@ -34,28 +46,30 @@ export class ChatClient extends EventEmitter { } private setupHandlers(): Promise { - if (!this.ws) throw new Error('No WebSocket instance'); + if (!this.ws) { + throw new Error('No WebSocket instance'); + } return new Promise((resolve, reject) => { this.ws!.on('open', () => { - console.log('[ChatClient] Connected'); + this.logger.info('[ChatClient] Connected'); this.connected = true; this.sendAuthAndJoin(); resolve(); }); - this.ws!.on('message', (data: { toString: () => string; }) => { + this.ws!.on('message', (data: { toString: () => string }) => { this.handleMessage(data.toString()); }); this.ws!.on('close', () => { - console.log('[ChatClient] Closed'); + this.logger.info('[ChatClient] Closed'); this.connected = false; this.emit('disconnected'); }); this.ws!.on('error', (err) => { - console.error('[ChatClient] Error =>', err); + this.logger.error('[ChatClient] Error =>', err); reject(err); }); }); @@ -63,14 +77,14 @@ export class ChatClient extends EventEmitter { private sendAuthAndJoin() { if (!this.ws) return; - // Auth + this.ws.send( JSON.stringify({ payload: JSON.stringify({ access_token: this.accessToken }), kind: 3, }), ); - // Join + this.ws.send( JSON.stringify({ payload: JSON.stringify({ @@ -120,7 +134,6 @@ export class ChatClient extends EventEmitter { const body = safeJson(payload.body); - // Example of speaker request detection if (body.guestBroadcastingEvent === 1) { const req: SpeakerRequest = { userId: body.guestRemoteID, @@ -131,7 +144,6 @@ export class ChatClient extends EventEmitter { this.emit('speakerRequest', req); } - // Example of occupancy update if (typeof body.occupancy === 'number') { const update: OccupancyUpdate = { occupancy: body.occupancy, @@ -140,7 +152,6 @@ export class ChatClient extends EventEmitter { this.emit('occupancyUpdate', update); } - // Example of mute state if (body.guestBroadcastingEvent === 16) { this.emit('muteStateChanged', { userId: body.guestRemoteID, @@ -155,7 +166,7 @@ export class ChatClient extends EventEmitter { } // Example of guest reaction if (body?.type === 2) { - console.log('[ChatClient] Emiting guest reaction event =>', body); + this.logger.info('[ChatClient] Emitting guest reaction event =>', body); this.emit('guestReaction', { displayName: body.displayName, emoji: body.body, @@ -165,6 +176,7 @@ export class ChatClient extends EventEmitter { async disconnect() { if (this.ws) { + this.logger.info('[ChatClient] Disconnecting...'); this.ws.close(); this.ws = undefined; this.connected = false; diff --git a/src/spaces/core/JanusAudioSource.ts b/src/spaces/core/JanusAudio.ts similarity index 53% rename from src/spaces/core/JanusAudioSource.ts rename to src/spaces/core/JanusAudio.ts index 14595b3..951ed2d 100644 --- a/src/spaces/core/JanusAudioSource.ts +++ b/src/spaces/core/JanusAudio.ts @@ -1,16 +1,27 @@ -// src/core/audio.ts +// src/core/JanusAudio.ts import { EventEmitter } from 'events'; import wrtc from '@roamhq/wrtc'; const { nonstandard } = wrtc; const { RTCAudioSource, RTCAudioSink } = nonstandard; +import { Logger } from '../logger'; + +interface AudioSourceOptions { + logger?: Logger; +} + +interface AudioSinkOptions { + logger?: Logger; +} export class JanusAudioSource extends EventEmitter { private source: any; - private track: MediaStreamTrack; + private readonly track: MediaStreamTrack; + private logger?: Logger; - constructor() { + constructor(options?: AudioSourceOptions) { super(); + this.logger = options?.logger; this.source = new RTCAudioSource(); this.track = this.source.createTrack(); } @@ -20,6 +31,11 @@ export class JanusAudioSource extends EventEmitter { } pushPcmData(samples: Int16Array, sampleRate: number, channels = 1) { + if (this.logger?.isDebugEnabled()) { + this.logger?.debug( + `[JanusAudioSource] pushPcmData => sampleRate=${sampleRate}, channels=${channels}`, + ); + } this.source.onData({ samples, sampleRate, @@ -33,12 +49,14 @@ export class JanusAudioSource extends EventEmitter { export class JanusAudioSink extends EventEmitter { private sink: any; private active = true; + private logger?: Logger; - constructor(track: MediaStreamTrack) { + constructor(track: MediaStreamTrack, options?: AudioSinkOptions) { super(); - if (track.kind !== 'audio') + this.logger = options?.logger; + if (track.kind !== 'audio') { throw new Error('JanusAudioSink must be an audio track'); - + } this.sink = new RTCAudioSink(track); this.sink.ondata = (frame: { @@ -48,12 +66,20 @@ export class JanusAudioSink extends EventEmitter { channelCount: number; }) => { if (!this.active) return; + if (this.logger?.isDebugEnabled()) { + this.logger?.debug( + `[JanusAudioSink] ondata => sampleRate=${frame.sampleRate}, bitsPerSample=${frame.bitsPerSample}, channelCount=${frame.channelCount}`, + ); + } this.emit('audioData', frame); }; } stop() { this.active = false; + if (this.logger?.isDebugEnabled()) { + this.logger?.debug('[JanusAudioSink] stop'); + } this.sink?.stop(); } } diff --git a/src/spaces/core/JanusClient.ts b/src/spaces/core/JanusClient.ts index b7b4ddd..5aef42e 100644 --- a/src/spaces/core/JanusClient.ts +++ b/src/spaces/core/JanusClient.ts @@ -3,8 +3,9 @@ import { EventEmitter } from 'events'; import wrtc from '@roamhq/wrtc'; const { RTCPeerConnection, MediaStream } = wrtc; -import { JanusAudioSink, JanusAudioSource } from './JanusAudioSource'; +import { JanusAudioSink, JanusAudioSource } from './JanusAudio'; import type { AudioDataWithUser, TurnServersInfo } from '../types'; +import { Logger } from '../logger'; interface JanusConfig { webrtcUrl: string; @@ -13,6 +14,7 @@ interface JanusConfig { userId: string; streamName: string; turnServers: TurnServersInfo; + logger: Logger; } /** @@ -20,6 +22,8 @@ interface JanusConfig { * joining the videoroom, and polling events. */ export class JanusClient extends EventEmitter { + private logger: Logger; + private sessionId?: number; private handleId?: number; private publisherId?: number; @@ -41,26 +45,19 @@ export class JanusClient extends EventEmitter { constructor(private readonly config: JanusConfig) { super(); + this.logger = config.logger; } async initialize() { - // 1) Create session - this.sessionId = await this.createSession(); + this.logger.debug('[JanusClient] initialize() called'); - // 2) Attach plugin + this.sessionId = await this.createSession(); this.handleId = await this.attachPlugin(); - - // 3) Start polling events right now this.pollActive = true; this.startPolling(); - - // 4) Create room await this.createRoom(); - - // 3) Join room this.publisherId = await this.joinRoom(); - // 4) Create local RTCPeerConnection this.pc = new RTCPeerConnection({ iceServers: [ { @@ -73,22 +70,17 @@ export class JanusClient extends EventEmitter { this.setupPeerEvents(); this.enableLocalAudio(); - // 6) Do the initial configure -> generate Offer -> setLocalDesc -> send -> setRemoteDesc await this.configurePublisher(); - console.log('[JanusClient] Initialization complete'); + this.logger.info('[JanusClient] Initialization complete'); } public async subscribeSpeaker(userId: string): Promise { - console.log('[JanusClient] subscribeSpeaker => userId=', userId); + this.logger.debug('[JanusClient] subscribeSpeaker => userId=', userId); - // 1) Attach plugin as subscriber const subscriberHandleId = await this.attachPlugin(); - console.log('[JanusClient] subscriber handle =>', subscriberHandleId); + this.logger.debug('[JanusClient] subscriber handle =>', subscriberHandleId); - // 2) Wait for an event with "publishers" to discover feedId - // We do *not* check sender === subscriberHandleId because Hydra - // might send it from the main handle or another handle. const publishersEvt = await this.waitForJanusEvent( (e) => e.janus === 'event' && @@ -100,8 +92,6 @@ export class JanusClient extends EventEmitter { 'discover feed_id from "publishers"', ); - // Extract the feedId from the first publisher whose 'display' or 'periscope_user_id' = userId - // (in your logs, 'display' or 'periscope_user_id' is the actual user) const list = publishersEvt.plugindata.data.publishers as any[]; const pub = list.find( (p) => p.display === userId || p.periscope_user_id === userId, @@ -112,10 +102,9 @@ export class JanusClient extends EventEmitter { ); } const feedId = pub.id; - console.log('[JanusClient] found feedId =>', feedId); + this.logger.debug('[JanusClient] found feedId =>', feedId); this.emit('subscribedSpeaker', { userId, feedId }); - // 3) "join" as subscriber with "streams: [{ feed, mid: '0', send: true }]" const joinBody = { request: 'join', room: this.config.roomId, @@ -131,8 +120,6 @@ export class JanusClient extends EventEmitter { }; await this.sendJanusMessage(subscriberHandleId, joinBody); - // 4) Wait for "attached" + jsep.offer from *this subscriber handle* - // Now we do filter on e.sender === subscriberHandleId const attachedEvt = await this.waitForJanusEvent( (e) => e.janus === 'event' && @@ -143,11 +130,10 @@ export class JanusClient extends EventEmitter { 8000, 'subscriber attached + offer', ); - console.log('[JanusClient] subscriber => "attached" with offer'); + this.logger.debug('[JanusClient] subscriber => "attached" with offer'); const offer = attachedEvt.jsep; - // 5) Create subPc, setRemoteDescription(offer), createAnswer, setLocalDescription(answer) const subPc = new RTCPeerConnection({ iceServers: [ { @@ -158,42 +144,26 @@ export class JanusClient extends EventEmitter { ], }); - // Ontrack => parse PCM via JanusAudioSink subPc.ontrack = (evt) => { - console.log('[JanusClient] subscriber track =>', evt.track.kind); - - // TODO: REMOVE DEBUG - // console.log( - // '[JanusClient] subscriber track => kind=', - // evt.track.kind, - // 'readyState=', - // evt.track.readyState, - // 'muted=', - // evt.track.muted, - // ); - - const sink = new JanusAudioSink(evt.track); + this.logger.debug( + '[JanusClient] subscriber track => kind=%s, readyState=%s, muted=%s', + evt.track.kind, + evt.track.readyState, + evt.track.muted, + ); + + const sink = new JanusAudioSink(evt.track, { logger: this.logger }); sink.on('audioData', (frame) => { - // TODO: REMOVE DEBUG - // console.log( - // '[AudioSink] got an audio frame => sampleRate=', - // frame.sampleRate, - // 'length=', - // frame.samples.length, - // ); - // console.log( - // '[AudioSink] sampleRate=', - // frame.sampleRate, - // 'channels=', - // frame.channelCount, - // ); - // const { samples } = frame; // Int16Array - // let maxVal = 0; - // for (let i = 0; i < samples.length; i++) { - // const val = Math.abs(samples[i]); - // if (val > maxVal) maxVal = val; - // } - // console.log(`[AudioSink] userId=${userId}, maxAmplitude=${maxVal}`); + if (this.logger.isDebugEnabled()) { + let maxVal = 0; + for (let i = 0; i < frame.samples.length; i++) { + const val = Math.abs(frame.samples[i]); + if (val > maxVal) maxVal = val; + } + this.logger.debug( + `[AudioSink] userId=${userId}, maxAmplitude=${maxVal}`, + ); + } this.emit('audioDataFromSpeaker', { userId, @@ -210,7 +180,6 @@ export class JanusClient extends EventEmitter { const answer = await subPc.createAnswer(); await subPc.setLocalDescription(answer); - // 6) Send "start" with jsep=answer await this.sendJanusMessage( subscriberHandleId, { @@ -220,15 +189,13 @@ export class JanusClient extends EventEmitter { }, answer, ); - console.log('[JanusClient] subscriber => done (user=', userId, ')'); - - // Store for potential cleanup + this.logger.debug('[JanusClient] subscriber => done (user=', userId, ')'); this.subscribers.set(userId, { handleId: subscriberHandleId, pc: subPc }); } pushLocalAudio(samples: Int16Array, sampleRate: number, channels = 1) { if (!this.localAudioSource) { - console.warn('[JanusClient] No localAudioSource; enabling now...'); + this.logger.warn('[JanusClient] No localAudioSource; enabling now...'); this.enableLocalAudio(); } this.localAudioSource?.pushPcmData(samples, sampleRate, channels); @@ -236,14 +203,16 @@ export class JanusClient extends EventEmitter { enableLocalAudio() { if (!this.pc) { - console.warn('[JanusClient] No RTCPeerConnection'); + this.logger.warn( + '[JanusClient] enableLocalAudio => No RTCPeerConnection', + ); return; } if (this.localAudioSource) { - console.log('[JanusClient] localAudioSource already active'); + this.logger.debug('[JanusClient] localAudioSource already active'); return; } - this.localAudioSource = new JanusAudioSource(); + this.localAudioSource = new JanusAudioSource({ logger: this.logger }); const track = this.localAudioSource.getTrack(); const localStream = new MediaStream(); localStream.addTrack(track); @@ -251,10 +220,8 @@ export class JanusClient extends EventEmitter { } async stop() { - console.log('[JanusClient] Stopping...'); + this.logger.info('[JanusClient] Stopping...'); this.pollActive = false; - // leave the room, etc. - // close PC if (this.pc) { this.pc.close(); this.pc = undefined; @@ -264,15 +231,15 @@ export class JanusClient extends EventEmitter { getSessionId() { return this.sessionId; } + getHandleId() { return this.handleId; } + getPublisherId() { return this.publisherId; } - // ------------------- Private Methods -------------------- - private async createSession(): Promise { const transaction = this.randomTid(); const resp = await fetch(this.config.webrtcUrl, { @@ -287,16 +254,20 @@ export class JanusClient extends EventEmitter { transaction, }), }); - if (!resp.ok) throw new Error('[JanusClient] createSession failed'); + if (!resp.ok) { + throw new Error('[JanusClient] createSession failed'); + } const json = await resp.json(); - if (json.janus !== 'success') + if (json.janus !== 'success') { throw new Error('[JanusClient] createSession invalid response'); - return json.data.id; // sessionId + } + return json.data.id; } private async attachPlugin(): Promise { - if (!this.sessionId) throw new Error('[JanusClient] no sessionId'); - + if (!this.sessionId) { + throw new Error('[JanusClient] attachPlugin => no sessionId'); + } const transaction = this.randomTid(); const resp = await fetch(`${this.config.webrtcUrl}/${this.sessionId}`, { method: 'POST', @@ -310,18 +281,20 @@ export class JanusClient extends EventEmitter { transaction, }), }); - if (!resp.ok) throw new Error('[JanusClient] attachPlugin failed'); + if (!resp.ok) { + throw new Error('[JanusClient] attachPlugin failed'); + } const json = await resp.json(); - if (json.janus !== 'success') + if (json.janus !== 'success') { throw new Error('[JanusClient] attachPlugin invalid response'); + } return json.data.id; } private async createRoom() { if (!this.sessionId || !this.handleId) { - throw new Error('[JanusClient] No session/handle'); + throw new Error('[JanusClient] createRoom => No session/handle'); } - const transaction = this.randomTid(); const body = { request: 'create', @@ -334,8 +307,6 @@ export class JanusClient extends EventEmitter { h264_profile: '42e01f', dummy_publisher: false, }; - - // Send the "create" request const resp = await fetch( `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, { @@ -352,15 +323,11 @@ export class JanusClient extends EventEmitter { }), }, ); - if (!resp.ok) { throw new Error(`[JanusClient] createRoom failed => ${resp.status}`); } - const json = await resp.json(); - console.log('[JanusClient] createRoom =>', JSON.stringify(json)); - - // Check if Janus responded with videoroom:"created" + this.logger.debug('[JanusClient] createRoom =>', JSON.stringify(json)); if (json.janus === 'error') { throw new Error( `[JanusClient] createRoom error => ${ @@ -368,7 +335,6 @@ export class JanusClient extends EventEmitter { }`, ); } - if (json.plugindata?.data?.videoroom !== 'created') { throw new Error( `[JanusClient] unexpected createRoom response => ${JSON.stringify( @@ -376,30 +342,24 @@ export class JanusClient extends EventEmitter { )}`, ); } - - console.log( + this.logger.debug( `[JanusClient] Room '${this.config.roomId}' created successfully`, ); } private async joinRoom(): Promise { - if (!this.sessionId || !this.handleId) + if (!this.sessionId || !this.handleId) { throw new Error('[JanusClient] no session/handle'); - - // Wait for the event that indicates we joined - // Typically: evt.janus === 'event' && evt.plugindata?.data?.videoroom === 'joined' + } + this.logger.debug('[JanusClient] joinRoom => start'); const evtPromise = this.waitForJanusEvent( - (e) => { - return ( - e.janus === 'event' && - e.plugindata?.plugin === 'janus.plugin.videoroom' && - e.plugindata?.data?.videoroom === 'joined' - ); - }, + (e) => + e.janus === 'event' && + e.plugindata?.plugin === 'janus.plugin.videoroom' && + e.plugindata?.data?.videoroom === 'joined', 12000, 'Host Joined Event', ); - const body = { request: 'join', room: this.config.roomId, @@ -408,25 +368,23 @@ export class JanusClient extends EventEmitter { periscope_user_id: this.config.userId, }; await this.sendJanusMessage(this.handleId, body); - const evt = await evtPromise; - const publisherId = evt.plugindata.data.id; - console.log('[JanusClient] joined room => publisherId=', publisherId); + this.logger.debug('[JanusClient] joined room => publisherId=', publisherId); return publisherId; } private async configurePublisher() { - if (!this.pc || !this.sessionId || !this.handleId) return; - - console.log('[JanusClient] createOffer...'); + if (!this.pc || !this.sessionId || !this.handleId) { + return; + } + this.logger.debug('[JanusClient] createOffer...'); const offer = await this.pc.createOffer({ offerToReceiveAudio: true, offerToReceiveVideo: false, }); await this.pc.setLocalDescription(offer); - - console.log('[JanusClient] sending configure with JSEP...'); + this.logger.debug('[JanusClient] sending configure with JSEP...'); await this.sendJanusMessage( this.handleId, { @@ -439,10 +397,7 @@ export class JanusClient extends EventEmitter { }, offer, ); - - console.log('[JanusClient] waiting for answer...'); - // In a real scenario, we do an event-based wait for jsep.type === 'answer'. - // For MVP, let's do a poll in handleJanusEvent for that "answer" and setRemoteDesc + this.logger.debug('[JanusClient] waiting for answer...'); } private async sendJanusMessage( @@ -454,7 +409,6 @@ export class JanusClient extends EventEmitter { throw new Error('[JanusClient] No session'); } const transaction = this.randomTid(); - const resp = await fetch( `${this.config.webrtcUrl}/${this.sessionId}/${handleId}`, { @@ -471,7 +425,6 @@ export class JanusClient extends EventEmitter { }), }, ); - if (!resp.ok) { throw new Error( '[JanusClient] sendJanusMessage failed => ' + resp.status, @@ -480,10 +433,10 @@ export class JanusClient extends EventEmitter { } private startPolling() { - console.log('[JanusClient] Starting polling...'); + this.logger.debug('[JanusClient] Starting polling...'); const doPoll = async () => { if (!this.pollActive || !this.sessionId) { - console.log('[JanusClient] Polling stopped'); + this.logger.debug('[JanusClient] Polling stopped'); return; } try { @@ -497,10 +450,10 @@ export class JanusClient extends EventEmitter { const event = await resp.json(); this.handleJanusEvent(event); } else { - console.log('[JanusClient] poll error =>', resp.status); + this.logger.warn('[JanusClient] poll error =>', resp.status); } } catch (err) { - console.error('[JanusClient] poll exception =>', err); + this.logger.error('[JanusClient] poll exception =>', err); } setTimeout(doPoll, 500); }; @@ -508,62 +461,59 @@ export class JanusClient extends EventEmitter { } private handleJanusEvent(evt: any) { - // TODO: REMOVE DEBUG - // console.log('[JanusClient] handleJanusEvent =>', JSON.stringify(evt)); - - if (!evt.janus) return; + if (!evt.janus) { + return; + } if (evt.janus === 'keepalive') { + this.logger.debug('[JanusClient] keepalive received'); return; } if (evt.janus === 'webrtcup') { - console.log('[JanusClient] webrtcup =>', evt.sender); + this.logger.debug('[JanusClient] webrtcup =>', evt.sender); } if (evt.jsep && evt.jsep.type === 'answer') { this.onReceivedAnswer(evt.jsep); } if (evt.plugindata?.data?.id) { - // e.g. publisherId this.publisherId = evt.plugindata.data.id; } if (evt.error) { - console.error('[JanusClient] Janus error =>', evt.error.reason); + this.logger.error('[JanusClient] Janus error =>', evt.error.reason); this.emit('error', new Error(evt.error.reason)); } - for (let i = 0; i < this.eventWaiters.length; i++) { const waiter = this.eventWaiters[i]; if (waiter.predicate(evt)) { - // remove from the array this.eventWaiters.splice(i, 1); - // resolve the promise waiter.resolve(evt); - break; // important: only resolve one waiter + break; } } - // Add more logic if needed } private async onReceivedAnswer(answer: any) { - if (!this.pc) return; - console.log('[JanusClient] got answer => setRemoteDescription'); + if (!this.pc) { + return; + } + this.logger.debug('[JanusClient] got answer => setRemoteDescription'); await this.pc.setRemoteDescription(answer); } private setupPeerEvents() { - if (!this.pc) return; - + if (!this.pc) { + return; + } this.pc.addEventListener('iceconnectionstatechange', () => { - // TODO: REMOVE DEBUG - // console.log('[JanusClient] ICE state =>', this.pc?.iceConnectionState); - + this.logger.debug( + '[JanusClient] ICE state =>', + this.pc?.iceConnectionState, + ); if (this.pc?.iceConnectionState === 'failed') { this.emit('error', new Error('ICE connection failed')); } }); - this.pc.addEventListener('track', (evt) => { - console.log('[JanusClient] track =>', evt.track.kind); - // Here you can attach a JanusAudioSink to parse PCM frames + this.logger.debug('[JanusClient] track =>', evt.track.kind); }); } @@ -580,18 +530,14 @@ export class JanusClient extends EventEmitter { description = 'some event', ): Promise { return new Promise((resolve, reject) => { - const waiter = { - predicate, - resolve, - reject, - }; + const waiter = { predicate, resolve, reject }; this.eventWaiters.push(waiter); setTimeout(() => { const idx = this.eventWaiters.indexOf(waiter); if (idx !== -1) { this.eventWaiters.splice(idx, 1); - console.log( + this.logger.warn( `[JanusClient] waitForJanusEvent => timed out waiting for: ${description}`, ); reject( @@ -606,22 +552,20 @@ export class JanusClient extends EventEmitter { public async destroyRoom(): Promise { if (!this.sessionId || !this.handleId) { - console.warn('[JanusClient] destroyRoom => no session/handle'); + this.logger.warn('[JanusClient] destroyRoom => no session/handle'); return; } if (!this.config.roomId || !this.config.userId) { - console.warn('[JanusClient] destroyRoom => no roomId/userId'); + this.logger.warn('[JanusClient] destroyRoom => no roomId/userId'); return; } - const transaction = this.randomTid(); const body = { request: 'destroy', room: this.config.roomId, periscope_user_id: this.config.userId, }; - - console.log('[JanusClient] destroying room =>', body); + this.logger.info('[JanusClient] destroying room =>', body); const resp = await fetch( `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, { @@ -638,18 +582,16 @@ export class JanusClient extends EventEmitter { }), }, ); - if (!resp.ok) { throw new Error(`[JanusClient] destroyRoom failed => ${resp.status}`); } - const json = await resp.json(); - console.log('[JanusClient] destroyRoom =>', JSON.stringify(json)); + this.logger.debug('[JanusClient] destroyRoom =>', JSON.stringify(json)); } public async leaveRoom(): Promise { if (!this.sessionId || !this.handleId) { - console.warn('[JanusClient] leaveRoom => no session/handle'); + this.logger.warn('[JanusClient] leaveRoom => no session/handle'); return; } const transaction = this.randomTid(); @@ -658,8 +600,7 @@ export class JanusClient extends EventEmitter { room: this.config.roomId, periscope_user_id: this.config.userId, }; - - console.log('[JanusClient] leaving room =>', body); + this.logger.info('[JanusClient] leaving room =>', body); const resp = await fetch( `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, { @@ -679,6 +620,6 @@ export class JanusClient extends EventEmitter { throw new Error(`[JanusClient] leaveRoom => error code ${resp.status}`); } const json = await resp.json(); - console.log('[JanusClient] leaveRoom =>', JSON.stringify(json)); + this.logger.debug('[JanusClient] leaveRoom =>', JSON.stringify(json)); } } diff --git a/src/spaces/core/Space.ts b/src/spaces/core/Space.ts index b6c9125..5883bd5 100644 --- a/src/spaces/core/Space.ts +++ b/src/spaces/core/Space.ts @@ -11,7 +11,6 @@ import { getRegion, } from '../utils'; import type { - SpaceConfig, BroadcastCreated, SpeakerRequest, OccupancyUpdate, @@ -22,6 +21,15 @@ import type { SpeakerInfo, } from '../types'; import { Scraper } from '../../scraper'; +import { Logger } from '../logger'; + +export interface SpaceConfig { + mode: 'BROADCAST' | 'LISTEN' | 'INTERACTIVE'; + title?: string; + description?: string; + languages?: string[]; + debug?: boolean; +} /** * This class orchestrates: @@ -30,6 +38,9 @@ import { Scraper } from '../../scraper'; * 3) Approve speakers, push audio, etc. */ export class Space extends EventEmitter { + private readonly debug: boolean; + private readonly logger: Logger; + private janusClient?: JanusClient; private chatClient?: ChatClient; private authToken?: string; @@ -38,15 +49,20 @@ export class Space extends EventEmitter { private plugins = new Set(); private speakers = new Map(); - constructor(private readonly scraper: Scraper) { + constructor( + private readonly scraper: Scraper, + options?: { debug?: boolean }, + ) { super(); + this.debug = options?.debug ?? false; + this.logger = new Logger(this.debug); } public use(plugin: Plugin, config?: Record) { const registration: PluginRegistration = { plugin, config }; this.plugins.add(registration); - console.log('[Space] Plugin added =>', plugin.constructor.name); + this.logger.debug('[Space] Plugin added =>', plugin.constructor.name); plugin.onAttach?.(this); @@ -64,17 +80,12 @@ export class Space extends EventEmitter { * Main entry point */ async initialize(config: SpaceConfig) { - console.log('[Space] Initializing...'); + this.logger.debug('[Space] Initializing...'); - // 1) get Periscope cookie const cookie = await this.scraper.getPeriscopeCookie(); - - // 2) get region const region = await getRegion(); - console.log('[Space] Got region =>', region); - - // 3) create broadcast - console.log('[Space] Creating broadcast...'); + this.logger.debug('[Space] Got region =>', region); + this.logger.debug('[Space] Creating broadcast...'); const broadcast = await createBroadcast({ description: config.description, languages: config.languages, @@ -83,15 +94,12 @@ export class Space extends EventEmitter { }); this.broadcastInfo = broadcast; - // 4) Authorize token if needed - console.log('[Space] Authorizing token...'); + this.logger.debug('[Space] Authorizing token...'); this.authToken = await authorizeToken(cookie); - // 5) Get TURN servers - console.log('[Space] Getting turn servers...'); + this.logger.debug('[Space] Getting turn servers...'); const turnServers = await getTurnServers(cookie); - // 6) Create Janus client this.janusClient = new JanusClient({ webrtcUrl: broadcast.webrtc_gw_url, roomId: broadcast.room_id, @@ -99,11 +107,12 @@ export class Space extends EventEmitter { userId: broadcast.broadcast.user_id, streamName: broadcast.stream_name, turnServers, + logger: this.logger, }); await this.janusClient.initialize(); this.janusClient.on('audioDataFromSpeaker', (data: AudioDataWithUser) => { - // console.log('[Space] Received PCM from speaker =>', data.userId); + this.logger.debug('[Space] Received PCM from speaker =>', data.userId); this.handleAudioData(data); // You can store or forward to a plugin, run STT, etc. }); @@ -111,21 +120,20 @@ export class Space extends EventEmitter { this.janusClient.on('subscribedSpeaker', ({ userId, feedId }) => { const speaker = this.speakers.get(userId); if (!speaker) { - console.log( - '[Space] subscribedSpeaker => speaker not found for userId=', + this.logger.debug( + '[Space] subscribedSpeaker => no speaker found', userId, ); return; } - speaker.janusParticipantId = feedId; - console.log( + this.logger.debug( `[Space] updated speaker info => userId=${userId}, feedId=${feedId}`, ); }); // 7) Publish the broadcast - console.log('[Space] Publishing broadcast...'); + this.logger.debug('[Space] Publishing broadcast...'); await publishBroadcast({ title: config.title || '', broadcast, @@ -137,18 +145,20 @@ export class Space extends EventEmitter { // 8) If interactive, open chat if (config.mode === 'INTERACTIVE') { - console.log('[Space] Connecting chat...'); - this.chatClient = new ChatClient( - broadcast.room_id, - broadcast.access_token, - broadcast.endpoint, - ); + this.logger.debug('[Space] Connecting chat...'); + this.chatClient = new ChatClient({ + spaceId: broadcast.room_id, + accessToken: broadcast.access_token, + endpoint: broadcast.endpoint, + logger: this.logger, + }); await this.chatClient.connect(); this.setupChatEvents(); } + this.logger.info('[Space] Initialized =>', broadcast.share_url); + this.isInitialized = true; - console.log('[Space] Initialized =>', broadcast.share_url); for (const { plugin, config: pluginConfig } of this.plugins) { if (plugin.init) { @@ -159,7 +169,7 @@ export class Space extends EventEmitter { } } - console.log('[Space] All plugins initialized'); + this.logger.debug('[Space] All plugins initialized'); return broadcast; } @@ -172,17 +182,22 @@ export class Space extends EventEmitter { if (!this.chatClient) return; this.chatClient.on('speakerRequest', (req: SpeakerRequest) => { - console.log('[Space] Speaker request =>', req); + this.logger.info('[Space] Speaker request =>', req); this.emit('speakerRequest', req); }); + this.chatClient.on('occupancyUpdate', (update: OccupancyUpdate) => { + this.logger.debug('[Space] occupancyUpdate =>', update); this.emit('occupancyUpdate', update); }); + this.chatClient.on('muteStateChanged', (evt) => { + this.logger.debug('[Space] muteStateChanged =>', evt); this.emit('muteStateChanged', evt); }); + this.chatClient.on('guestReaction', (reaction: GuestReaction) => { - console.log('[Space] Guest reaction =>', reaction); + this.logger.info('[Space] Guest reaction =>', reaction); this.emit('guestReaction', reaction); }); } @@ -199,10 +214,7 @@ export class Space extends EventEmitter { throw new Error('[Space] No auth token available'); } - this.speakers.set(userId, { - userId, - sessionUUID, - }); + this.speakers.set(userId, { userId, sessionUUID }); // 1) Call the "request/approve" endpoint await this.callApproveEndpoint( @@ -223,13 +235,11 @@ export class Space extends EventEmitter { sessionUUID: string, ): Promise { const endpoint = 'https://guest.pscp.tv/api/v1/audiospace/request/approve'; - const headers = { 'Content-Type': 'application/json', Referer: 'https://x.com/', Authorization: authorizationToken, }; - const body = { ntpForBroadcasterFrame: '2208988800024000300', ntpForLiveFrame: '2208988800024000300', @@ -237,7 +247,8 @@ export class Space extends EventEmitter { session_uuid: sessionUUID, }; - console.log('[Space] Approving speaker =>', endpoint, body); + this.logger.debug('[Space] Approving speaker =>', endpoint, body); + const resp = await fetch(endpoint, { method: 'POST', headers, @@ -251,7 +262,7 @@ export class Space extends EventEmitter { ); } - console.log('[Space] Speaker approved =>', userId); + this.logger.info('[Space] Speaker approved =>', userId); } /** @@ -276,9 +287,14 @@ export class Space extends EventEmitter { ); } - const sessionUUID = speaker.sessionUUID; - const janusParticipantId = speaker.janusParticipantId; - console.log(sessionUUID, janusParticipantId, speaker); + const { sessionUUID, janusParticipantId } = speaker; + this.logger.debug( + '[Space] removeSpeaker =>', + sessionUUID, + janusParticipantId, + speaker, + ); + if (!sessionUUID || janusParticipantId === undefined) { throw new Error( `[Space] removeSpeaker => missing sessionUUID or feedId for userId=${userId}`, @@ -290,11 +306,11 @@ export class Space extends EventEmitter { if (!janusHandleId || !janusSessionId) { throw new Error( - `[Space] removeSpeaker => missing Janus handle or sessionId for userId=${userId}`, + `[Space] removeSpeaker => missing Janus handle/session for userId=${userId}`, ); } - // 1) Call the Twitter eject endpoint + // 1) Call the eject endpoint await this.callRemoveEndpoint( this.broadcastInfo, this.authToken, @@ -308,7 +324,7 @@ export class Space extends EventEmitter { // 2) Remove from local speakers map this.speakers.delete(userId); - console.log(`[Space] removeSpeaker => removed userId=${userId}`); + this.logger.info(`[Space] removeSpeaker => removed userId=${userId}`); } /** @@ -324,13 +340,11 @@ export class Space extends EventEmitter { webrtcSessionId: number, ): Promise { const endpoint = 'https://guest.pscp.tv/api/v1/audiospace/stream/eject'; - const headers = { 'Content-Type': 'application/json', Referer: 'https://x.com/', Authorization: authorizationToken, }; - const body = { ntpForBroadcasterFrame: '2208988800024000300', ntpForLiveFrame: '2208988800024000300', @@ -342,7 +356,8 @@ export class Space extends EventEmitter { webrtc_session_id: webrtcSessionId, }; - console.log('[Space] Removing speaker =>', endpoint, body); + this.logger.debug('[Space] Removing speaker =>', endpoint, body); + const resp = await fetch(endpoint, { method: 'POST', headers, @@ -356,7 +371,7 @@ export class Space extends EventEmitter { ); } - console.log('[Space] Speaker removed => sessionUUID=', sessionUUID); + this.logger.debug('[Space] Speaker removed => sessionUUID=', sessionUUID); } pushAudio(samples: Int16Array, sampleRate: number) { @@ -378,14 +393,14 @@ export class Space extends EventEmitter { * Gracefully end the Space (stop broadcast, destroy Janus room, etc.) */ public async finalizeSpace(): Promise { - console.log('[Space] finalizeSpace => stopping broadcast gracefully'); + this.logger.info('[Space] finalizeSpace => stopping broadcast gracefully'); const tasks: Array> = []; if (this.janusClient) { tasks.push( this.janusClient.destroyRoom().catch((err) => { - console.error('[Space] destroyRoom error =>', err); + this.logger.error('[Space] destroyRoom error =>', err); }), ); } @@ -396,7 +411,7 @@ export class Space extends EventEmitter { broadcastId: this.broadcastInfo.room_id, chatToken: this.broadcastInfo.access_token, }).catch((err) => { - console.error('[Space] endAudiospace error =>', err); + this.logger.error('[Space] endAudiospace error =>', err); }), ); } @@ -404,13 +419,14 @@ export class Space extends EventEmitter { if (this.janusClient) { tasks.push( this.janusClient.leaveRoom().catch((err) => { - console.error('[Space] leaveRoom error =>', err); + this.logger.error('[Space] leaveRoom error =>', err); }), ); } await Promise.all(tasks); - console.log('[Space] finalizeSpace => done.'); + + this.logger.info('[Space] finalizeSpace => done.'); } /** @@ -426,13 +442,13 @@ export class Space extends EventEmitter { Referer: 'https://x.com/', Authorization: this.authToken || '', }; - const body = { broadcast_id: params.broadcastId, chat_token: params.chatToken, }; - console.log('[Space] endAudiospace =>', body); + this.logger.debug('[Space] endAudiospace =>', body); + const resp = await fetch(url, { method: 'POST', headers, @@ -443,19 +459,20 @@ export class Space extends EventEmitter { const errText = await resp.text(); throw new Error(`[Space] endAudiospace => ${resp.status} ${errText}`); } + const json = await resp.json(); - console.log('[Space] endAudiospace => success =>', json); + this.logger.debug('[Space] endAudiospace => success =>', json); } public getSpeakers(): SpeakerInfo[] { return Array.from(this.speakers.values()); } - async stop() { - console.log('[Space] Stopping...'); + public async stop() { + this.logger.info('[Space] Stopping...'); await this.finalizeSpace().catch((err) => { - console.error('[Space] finalizeBroadcast error =>', err); + this.logger.error('[Space] finalizeBroadcast error =>', err); }); if (this.chatClient) { diff --git a/src/spaces/logger.ts b/src/spaces/logger.ts new file mode 100644 index 0000000..fc37b5f --- /dev/null +++ b/src/spaces/logger.ts @@ -0,0 +1,31 @@ +// src/logger.ts + +export class Logger { + private readonly debugEnabled: boolean; + + constructor(debugEnabled: boolean) { + this.debugEnabled = debugEnabled; + } + + info(msg: string, ...args: any[]) { + console.log(msg, ...args); + } + + debug(msg: string, ...args: any[]) { + if (this.debugEnabled) { + console.log(msg, ...args); + } + } + + warn(msg: string, ...args: any[]) { + console.warn('[WARN]', msg, ...args); + } + + error(msg: string, ...args: any[]) { + console.error(msg, ...args); + } + + isDebugEnabled(): boolean { + return this.debugEnabled; + } +} diff --git a/src/spaces/plugins/HlsRecordPlugin.ts b/src/spaces/plugins/HlsRecordPlugin.ts new file mode 100644 index 0000000..9f88eac --- /dev/null +++ b/src/spaces/plugins/HlsRecordPlugin.ts @@ -0,0 +1,212 @@ +// src/plugins/HlsRecordPlugin.ts + +import { spawn, ChildProcessWithoutNullStreams } from 'child_process'; +import { Plugin, OccupancyUpdate } from '../types'; +import { Space } from '../core/Space'; +import { Logger } from '../logger'; + +/** + * Plugin that records the final Twitter Spaces HLS mix to a local file. + * It waits for at least one listener to join (occupancy > 0), + * then repeatedly attempts to get the HLS URL from Twitter + * until it is available (HTTP 200), and finally spawns ffmpeg. + */ +export class HlsRecordPlugin implements Plugin { + private logger?: Logger; + private recordingProcess?: ChildProcessWithoutNullStreams; + private isRecording = false; + private outputPath?: string; + private mediaKey?: string; + private space?: Space; + + constructor(outputPath?: string) { + this.outputPath = outputPath; + } + + /** + * Called once the Space has fully initialized (broadcastInfo is ready). + * We store references and subscribe to "occupancyUpdate". + */ + async init(params: { space: Space; pluginConfig?: Record }) { + const spaceLogger = (params.space as any).logger as Logger | undefined; + if (spaceLogger) { + this.logger = spaceLogger; + } + + if (params.pluginConfig?.outputPath) { + this.outputPath = params.pluginConfig.outputPath; + } + + this.space = params.space; + + const broadcastInfo = (params.space as any).broadcastInfo; + if (!broadcastInfo || !broadcastInfo.broadcast?.media_key) { + this.logger?.warn( + '[HlsRecordPlugin] No media_key found in broadcastInfo', + ); + return; + } + this.mediaKey = broadcastInfo.broadcast.media_key; + + const roomId = broadcastInfo.room_id || 'unknown_room'; + if (!this.outputPath) { + this.outputPath = `/tmp/record_${roomId}.ts`; + } + + // Subscribe to occupancyUpdate + this.space.on('occupancyUpdate', (update: OccupancyUpdate) => { + this.handleOccupancyUpdate(update).catch((err) => { + this.logger?.error( + '[HlsRecordPlugin] handleOccupancyUpdate error =>', + err, + ); + }); + }); + } + + /** + * Called each time occupancyUpdate is emitted. + * If occupancy > 0 and we're not recording yet, we attempt to fetch the HLS URL. + * If the URL is valid (HTTP 200), we launch ffmpeg. + */ + private async handleOccupancyUpdate(update: OccupancyUpdate) { + if (!this.space || !this.mediaKey) return; + if (this.isRecording) return; + + // We only care if occupancy > 0 (at least one listener). + if (update.occupancy <= 0) { + this.logger?.debug('[HlsRecordPlugin] occupancy=0 => ignoring'); + return; + } + + const scraper = (this.space as any).scraper; + if (!scraper) { + this.logger?.warn('[HlsRecordPlugin] No scraper found on space'); + return; + } + + this.logger?.debug( + `[HlsRecordPlugin] occupancy=${update.occupancy} => trying to fetch HLS URL...`, + ); + + try { + const status = await scraper.getAudioSpaceStreamStatus(this.mediaKey); + if (!status?.source?.location) { + this.logger?.debug( + '[HlsRecordPlugin] occupancy>0 but no HLS URL => wait next update', + ); + return; + } + + const hlsUrl = status.source.location; + const isReady = await this.waitForHlsReady(hlsUrl, 1); + if (!isReady) { + this.logger?.debug( + '[HlsRecordPlugin] HLS URL 404 => waiting next occupancy update...', + ); + return; + } + + await this.startRecording(hlsUrl); + } catch (err) { + this.logger?.error('[HlsRecordPlugin] handleOccupancyUpdate =>', err); + } + } + + /** + * Spawns ffmpeg to record the HLS stream at the given URL. + */ + private async startRecording(hlsUrl: string): Promise { + if (this.isRecording) { + this.logger?.debug('[HlsRecordPlugin] Already recording'); + return; + } + this.isRecording = true; + + if (!this.outputPath) { + this.logger?.warn( + '[HlsRecordPlugin] No output path set, using /tmp/space_record.ts', + ); + this.outputPath = '/tmp/space_record.ts'; + } + + this.logger?.info('[HlsRecordPlugin] Starting HLS recording =>', hlsUrl); + + this.recordingProcess = spawn('ffmpeg', [ + '-y', + '-i', + hlsUrl, + '-c', + 'copy', + this.outputPath, + ]); + + this.recordingProcess.stderr.on('data', (chunk) => { + const msg = chunk.toString(); + if (msg.toLowerCase().includes('error')) { + this.logger?.error('[HlsRecordPlugin][ffmpeg error] =>', msg.trim()); + } else { + this.logger?.debug('[HlsRecordPlugin][ffmpeg]', msg.trim()); + } + }); + + this.recordingProcess.on('close', (code) => { + this.isRecording = false; + this.logger?.info( + '[HlsRecordPlugin] Recording process closed => code=', + code, + ); + }); + + this.recordingProcess.on('error', (err) => { + this.logger?.error('[HlsRecordPlugin] Recording process failed =>', err); + }); + } + + /** + * HEAD request to see if the HLS URL is returning 200 OK. + * maxRetries=1 means we'll just try once here, and rely on occupancyUpdate re-calls for further tries. + */ + private async waitForHlsReady( + hlsUrl: string, + maxRetries: number, + ): Promise { + let attempt = 0; + while (attempt < maxRetries) { + try { + const resp = await fetch(hlsUrl, { method: 'HEAD' }); + if (resp.ok) { + this.logger?.debug( + `[HlsRecordPlugin] HLS is ready (attempt #${attempt + 1})`, + ); + return true; + } else { + this.logger?.debug( + `[HlsRecordPlugin] HLS status=${resp.status}, retrying...`, + ); + } + } catch (error) { + this.logger?.debug( + '[HlsRecordPlugin] HLS fetch error:', + (error as Error).message, + ); + } + + attempt++; + await new Promise((r) => setTimeout(r, 2000)); + } + return false; + } + + /** + * Called when the plugin is cleaned up (e.g. space.stop()). + */ + cleanup(): void { + if (this.isRecording && this.recordingProcess) { + this.logger?.info('[HlsRecordPlugin] Stopping HLS recording...'); + this.recordingProcess.kill(); + this.recordingProcess = undefined; + this.isRecording = false; + } + } +} diff --git a/src/spaces/plugins/SttTtsPlugin.ts b/src/spaces/plugins/SttTtsPlugin.ts index 63866ec..6acab40 100644 --- a/src/spaces/plugins/SttTtsPlugin.ts +++ b/src/spaces/plugins/SttTtsPlugin.ts @@ -3,7 +3,7 @@ import fs from 'fs'; import path from 'path'; import { spawn } from 'child_process'; -import { Plugin, AudioDataWithUser } from '../types'; +import { AudioDataWithUser, Plugin } from '../types'; import { Space } from '../core/Space'; import { JanusClient } from '../core/JanusClient'; @@ -32,7 +32,6 @@ export class SttTtsPlugin implements Plugin { private space?: Space; private janus?: JanusClient; - // OpenAI + ElevenLabs private openAiApiKey?: string; private elevenLabsApiKey?: string; @@ -40,7 +39,6 @@ export class SttTtsPlugin implements Plugin { private gptModel = 'gpt-3.5-turbo'; private voiceId = '21m00Tcm4TlvDq8ikWAM'; private elevenLabsModel = 'eleven_monolingual_v1'; - private systemPrompt = 'You are a helpful AI assistant.'; private chatContext: Array<{ role: 'system' | 'user' | 'assistant'; @@ -60,7 +58,11 @@ export class SttTtsPlugin implements Plugin { /** * For ignoring near-silence frames (if amplitude < threshold) */ - private silenceThreshold = 50; // default amplitude threshold + private silenceThreshold = 50; + + // TTS queue for sequentially speaking + private ttsQueue: string[] = []; + private isSpeaking = false; onAttach(space: Space) { console.log('[SttTtsPlugin] onAttach => space was attached'); @@ -86,29 +88,26 @@ export class SttTtsPlugin implements Plugin { this.voiceId = config.voiceId; } if (config?.elevenLabsModel) { - this.voiceId = config.elevenLabsModel; + this.elevenLabsModel = config.elevenLabsModel; } - - if (config.systemPrompt) { + if (config?.systemPrompt) { this.systemPrompt = config.systemPrompt; } - if (config.chatContext) { + if (config?.chatContext) { this.chatContext = config.chatContext; } console.log('[SttTtsPlugin] Plugin config =>', config); - // Listen for mute state changes to trigger STT flush + // Listen for mute events this.space.on( 'muteStateChanged', (evt: { userId: string; muted: boolean }) => { console.log('[SttTtsPlugin] Speaker muteStateChanged =>', evt); if (evt.muted) { - // speaker just got muted => flush STT this.handleMute(evt.userId).catch((err) => console.error('[SttTtsPlugin] handleMute error =>', err), ); } else { - // unmuted => start buffering this.speakerUnmuted.set(evt.userId, true); if (!this.pcmBuffers.has(evt.userId)) { this.pcmBuffers.set(evt.userId, []); @@ -122,21 +121,17 @@ export class SttTtsPlugin implements Plugin { * Called whenever we receive PCM from a speaker */ onAudioData(data: AudioDataWithUser): void { - // Skip if speaker is muted or not tracked if (!this.speakerUnmuted.get(data.userId)) return; - // Optional: detect silence let maxVal = 0; for (let i = 0; i < data.samples.length; i++) { const val = Math.abs(data.samples[i]); if (val > maxVal) maxVal = val; } if (maxVal < this.silenceThreshold) { - // It's near-silence => skip return; } - // Add chunk let arr = this.pcmBuffers.get(data.userId); if (!arr) { arr = []; @@ -151,17 +146,16 @@ export class SttTtsPlugin implements Plugin { private async handleMute(userId: string): Promise { this.speakerUnmuted.set(userId, false); const chunks = this.pcmBuffers.get(userId) || []; - this.pcmBuffers.set(userId, []); // reset + this.pcmBuffers.set(userId, []); if (!chunks.length) { console.log('[SttTtsPlugin] No audio chunks for user =>', userId); return; } console.log( - `[SttTtsPlugin] Flushing STT buffer for user=${userId}, total chunks=${chunks.length}`, + `[SttTtsPlugin] Flushing STT buffer for user=${userId}, chunks=${chunks.length}`, ); - // 1) Merge chunks const totalLen = chunks.reduce((acc, c) => acc + c.length, 0); const merged = new Int16Array(totalLen); let offset = 0; @@ -170,44 +164,60 @@ export class SttTtsPlugin implements Plugin { offset += c.length; } - // 2) Convert PCM -> WAV (48kHz) for STT + // Convert PCM to WAV for STT const wavPath = await this.convertPcmToWav(merged, 48000); console.log('[SttTtsPlugin] WAV ready =>', wavPath); - // 3) STT with OpenAI Whisper + // Whisper STT const sttText = await this.transcribeWithOpenAI(wavPath, this.sttLanguage); fs.unlinkSync(wavPath); + if (!sttText.trim()) { console.log('[SttTtsPlugin] No speech recognized for user =>', userId); return; } console.log(`[SttTtsPlugin] STT => user=${userId}, text="${sttText}"`); - // 4) GPT + // GPT answer const replyText = await this.askChatGPT(sttText); console.log(`[SttTtsPlugin] GPT => user=${userId}, reply="${replyText}"`); - // 5) TTS => returns MP3 - const ttsAudio = await this.elevenLabsTts(replyText); - console.log('[SttTtsPlugin] TTS => got MP3 size=', ttsAudio.length); - - // 6) Convert MP3 -> PCM (48kHz, mono) - const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); - console.log( - '[SttTtsPlugin] TTS => converted to PCM => frames=', - pcm.length, - ); + // Use the standard speak method with queue + await this.speakText(replyText); + } - // 7) Push frames to Janus - if (this.janus) { - await this.streamToJanus(pcm, 48000); - console.log('[SttTtsPlugin] TTS => done streaming to space'); + /** + * Public method to queue a TTS request + */ + public async speakText(text: string): Promise { + this.ttsQueue.push(text); + if (!this.isSpeaking) { + this.isSpeaking = true; + this.processTtsQueue().catch((err) => { + console.error('[SttTtsPlugin] processTtsQueue error =>', err); + }); } } /** - * Convert Int16 PCM -> WAV using ffmpeg + * Process TTS requests one by one */ + private async processTtsQueue(): Promise { + while (this.ttsQueue.length > 0) { + const text = this.ttsQueue.shift(); + if (!text) continue; + + try { + const ttsAudio = await this.elevenLabsTts(text); + const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); + await this.streamToJanus(pcm, 48000); + } catch (err) { + console.error('[SttTtsPlugin] TTS streaming error =>', err); + } + } + this.isSpeaking = false; + } + private convertPcmToWav( samples: Int16Array, sampleRate: number, @@ -275,19 +285,13 @@ export class SttTtsPlugin implements Plugin { body: formData, }, ); - - // Handle errors if (!response.ok) { const errorText = await response.text(); - console.error('[SttTtsPlugin] API Error:', errorText); + console.error('[SttTtsPlugin] OpenAI API Error:', errorText); throw new Error(`OpenAI API error: ${response.status} ${errorText}`); } - - // Parse response const data = (await response.json()) as { text: string }; - const text = data.text?.trim() || ''; - console.log('[SttTtsPlugin] Transcription =>', text); - return text; + return data.text?.trim() || ''; } catch (err) { console.error('[SttTtsPlugin] OpenAI STT Error =>', err); throw new Error('OpenAI STT failed'); @@ -302,8 +306,6 @@ export class SttTtsPlugin implements Plugin { throw new Error('[SttTtsPlugin] No OpenAI API key for ChatGPT'); } const url = 'https://api.openai.com/v1/chat/completions'; - - // Build the final array of messages const messages = [ { role: 'system', content: this.systemPrompt }, ...this.chatContext, @@ -331,11 +333,8 @@ export class SttTtsPlugin implements Plugin { const json = await resp.json(); const reply = json.choices?.[0]?.message?.content || ''; - - // Optionally store the conversation in the chatContext this.chatContext.push({ role: 'user', content: userText }); this.chatContext.push({ role: 'assistant', content: reply }); - return reply.trim(); } @@ -389,11 +388,12 @@ export class SttTtsPlugin implements Plugin { 'pipe:1', ]); let raw = Buffer.alloc(0); + ff.stdout.on('data', (chunk: Buffer) => { raw = Buffer.concat([raw, chunk]); }); ff.stderr.on('data', () => { - /* ignore ffmpeg logs */ + // ignoring ffmpeg logs }); ff.on('close', (code) => { if (code !== 0) { @@ -407,6 +407,7 @@ export class SttTtsPlugin implements Plugin { ); resolve(samples); }); + ff.stdin.write(mp3Buf); ff.stdin.end(); }); @@ -420,40 +421,23 @@ export class SttTtsPlugin implements Plugin { samples: Int16Array, sampleRate: number, ): Promise { - // 10 ms => 480 samples @48k - const FRAME_SIZE = 480; + // TODO: Check if better than 480 fixed + const FRAME_SIZE = Math.floor(sampleRate * 0.01); // 10ms frames => 480 @48kHz for ( let offset = 0; offset + FRAME_SIZE <= samples.length; offset += FRAME_SIZE ) { - // Option 1: subarray + .set const frame = new Int16Array(FRAME_SIZE); frame.set(samples.subarray(offset, offset + FRAME_SIZE)); - this.janus?.pushLocalAudio(frame, sampleRate, 1); - await new Promise((r) => setTimeout(r, 10)); - } - } - - public async speakText(text: string): Promise { - // 1) TTS => MP3 - const ttsAudio = await this.elevenLabsTts(text); - // 2) Convert MP3 -> PCM - const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); - - // 3) Stream to Janus - if (this.janus) { - await this.streamToJanus(pcm, 48000); - console.log('[SttTtsPlugin] speakText => done streaming to space'); + // Short pause so we don't overload + await new Promise((r) => setTimeout(r, 10)); } } - /** - * Change the system prompt at runtime. - */ public setSystemPrompt(prompt: string) { this.systemPrompt = prompt; console.log('[SttTtsPlugin] setSystemPrompt =>', prompt); @@ -490,5 +474,7 @@ export class SttTtsPlugin implements Plugin { console.log('[SttTtsPlugin] cleanup => releasing resources'); this.pcmBuffers.clear(); this.speakerUnmuted.clear(); + this.ttsQueue = []; + this.isSpeaking = false; } } diff --git a/src/spaces/test.ts b/src/spaces/test.ts index 811dd00..06aeb9c 100644 --- a/src/spaces/test.ts +++ b/src/spaces/test.ts @@ -1,12 +1,12 @@ // src/test.ts import 'dotenv/config'; -import { Space } from './core/Space'; +import { Space, SpaceConfig } from './core/Space'; import { Scraper } from '../scraper'; -import { SpaceConfig } from './types'; import { RecordToDiskPlugin } from './plugins/RecordToDiskPlugin'; import { SttTtsPlugin } from './plugins/SttTtsPlugin'; import { IdleMonitorPlugin } from './plugins/IdleMonitorPlugin'; +import { HlsRecordPlugin } from './plugins/HlsRecordPlugin'; /** * Main test entry point @@ -22,12 +22,24 @@ async function main() { ); // 2) Create the Space instance - const space = new Space(scraper); + // Set debug=true if you want more logs + const space = new Space(scraper, { debug: false }); - // Add a plugin to record audio + // -------------------------------------------------------------------------------- + // EXAMPLE 1: Record raw speaker audio via RecordToDiskPlugin (local plugin approach) + // -------------------------------------------------------------------------------- const recordPlugin = new RecordToDiskPlugin(); space.use(recordPlugin); + // -------------------------------------------------------------------------------- + // EXAMPLE 2: HLSRecordPlugin => record final Space mix as .ts file via HLS + // (Requires the "scraper" to fetch the HLS URL, and ffmpeg installed.) + // -------------------------------------------------------------------------------- + const hlsPlugin = new HlsRecordPlugin(); + // If you want, you can override the default output path in pluginConfig, for example: + // space.use(hlsPlugin, { outputPath: '/tmp/my_custom_space.ts' }); + space.use(hlsPlugin); + // Create our TTS/STT plugin instance const sttTtsPlugin = new SttTtsPlugin(); space.use(sttTtsPlugin, { @@ -113,15 +125,15 @@ async function main() { // Remove the speaker after 10 seconds (testing only) setTimeout(() => { console.log( - `[Test] Removing speaker => userId=${req.userId} (after 10s)`, + `[Test] Removing speaker => userId=${req.userId} (after 60s)`, ); space.removeSpeaker(req.userId).catch((err) => { console.error('[Test] removeSpeaker error =>', err); }); - }, 10_000); + }, 60_000); }); - // When a user react, reply with some reactions to test the flow + // When a user reacts, send back an emoji to test the flow space.on('guestReaction', (evt) => { // Pick a random emoji from the list const emojis = ['💯', '✨', '🙏', '🎮']; @@ -156,25 +168,19 @@ async function main() { async function sendBeep() { console.log('[Test] Starting beep...'); for (let offset = 0; offset < beepFull.length; offset += FRAME_SIZE) { - // subarray => simple "view" const portion = beepFull.subarray(offset, offset + FRAME_SIZE); - - // Make a real copy const frame = new Int16Array(FRAME_SIZE); frame.set(portion); - - // Now frame.length = 160, and frame.byteLength = 320 space.pushAudio(frame, sampleRate); - await new Promise((r) => setTimeout(r, 10)); } console.log('[Test] Finished beep'); } - // 5) Send beep every 5s - //setInterval(() => { - // sendBeep().catch((err) => console.error('[Test] beep error =>', err)); - //}, 5000); + // Example: Send beep every 5s (currently commented out) + // setInterval(() => { + // sendBeep().catch((err) => console.error('[Test] beep error =>', err)); + // }, 5000); console.log('[Test] Space is running... press Ctrl+C to exit.'); diff --git a/src/spaces/types.ts b/src/spaces/types.ts index b559120..aaaabbc 100644 --- a/src/spaces/types.ts +++ b/src/spaces/types.ts @@ -31,13 +31,6 @@ export interface GuestReaction { emoji: string; } -export interface SpaceConfig { - mode: 'BROADCAST' | 'LISTEN' | 'INTERACTIVE'; - title?: string; - description?: string; - languages?: string[]; -} - export interface BroadcastCreated { room_id: string; credential: string;