Skip to content

Commit

Permalink
Merge pull request #41 from slkzgm/feature/joinSpaces
Browse files Browse the repository at this point in the history
feat: Joining existings Spaces
  • Loading branch information
wtfsayo authored Jan 9, 2025
2 parents ffcfed4 + deeb29c commit a9b4ec3
Show file tree
Hide file tree
Showing 16 changed files with 2,160 additions and 452 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"license": "MIT",
"scripts": {
"build": "rimraf dist && rollup -c",
"prepare": "npm run build",
"docs:generate": "typedoc --options typedoc.json",
"docs:deploy": "npm run docs:generate && gh-pages -d docs",
"format": "prettier --write src/**/*.ts",
Expand Down
2 changes: 2 additions & 0 deletions src/_module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ export type { QueryProfilesResponse, QueryTweetsResponse } from './timeline-v1';
export type { Tweet } from './tweets';

export { Space } from './spaces/core/Space'
export { SpaceParticipant } from './spaces/core/SpaceParticipant'
export { Logger } from './spaces/logger'
export { SttTtsPlugin } from './spaces/plugins/SttTtsPlugin'
export { RecordToDiskPlugin } from './spaces/plugins/RecordToDiskPlugin'
export { MonitorAudioPlugin } from './spaces/plugins/MonitorAudioPlugin'
Expand Down
90 changes: 79 additions & 11 deletions src/spaces/core/ChatClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,40 @@ import { EventEmitter } from 'events';
import type { SpeakerRequest, OccupancyUpdate } from '../types';
import { Logger } from '../logger';

/**
* Configuration object for ChatClient.
*/
interface ChatClientConfig {
/**
* The space ID (e.g., "1vOGwAbcdE...") for this audio space.
*/
spaceId: string;

/**
* The access token obtained from accessChat or the live_video_stream/status.
*/
accessToken: string;

/**
* The endpoint host for the chat server (e.g., "https://prod-chatman-ancillary-eu-central-1.pscp.tv").
*/
endpoint: string;

/**
* An instance of Logger for debug/info logs.
*/
logger: Logger;
}

/**
* ChatClient handles the WebSocket connection to the Twitter/Periscope chat API.
* It emits events such as "speakerRequest", "occupancyUpdate", "muteStateChanged", etc.
*/
export class ChatClient extends EventEmitter {
private ws?: WebSocket;
private connected = false;
private logger: Logger;

private readonly logger: Logger;
private readonly spaceId: string;
private readonly accessToken: string;
private endpoint: string;
Expand All @@ -28,7 +51,10 @@ export class ChatClient extends EventEmitter {
this.logger = config.logger;
}

async connect() {
/**
* Establishes a WebSocket connection to the chat endpoint and sets up event handlers.
*/
public async connect(): Promise<void> {
const wsUrl = `${this.endpoint}/chatapi/v1/chatnow`.replace(
'https://',
'wss://',
Expand All @@ -45,9 +71,12 @@ export class ChatClient extends EventEmitter {
await this.setupHandlers();
}

/**
* Internal method to set up WebSocket event listeners (open, message, close, error).
*/
private setupHandlers(): Promise<void> {
if (!this.ws) {
throw new Error('No WebSocket instance');
throw new Error('[ChatClient] No WebSocket instance available');
}

return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -75,16 +104,21 @@ export class ChatClient extends EventEmitter {
});
}

private sendAuthAndJoin() {
/**
* Sends two WebSocket messages to authenticate and join the specified space.
*/
private sendAuthAndJoin(): void {
if (!this.ws) return;

// 1) Send authentication (access token)
this.ws.send(
JSON.stringify({
payload: JSON.stringify({ access_token: this.accessToken }),
kind: 3,
}),
);

// 2) Send a "join" message specifying the room (space ID)
this.ws.send(
JSON.stringify({
payload: JSON.stringify({
Expand All @@ -96,8 +130,18 @@ export class ChatClient extends EventEmitter {
);
}

reactWithEmoji(emoji: string) {
if (!this.ws) return;
/**
* Sends an emoji reaction to the chat server.
* @param emoji - The emoji string, e.g. '🔥', '🙏', etc.
*/
public reactWithEmoji(emoji: string): void {
if (!this.ws || !this.connected) {
this.logger.warn(
'[ChatClient] Not connected or WebSocket missing; ignoring reactWithEmoji.',
);
return;
}

const payload = JSON.stringify({
body: JSON.stringify({ body: emoji, type: 2, v: 2 }),
kind: 1,
Expand All @@ -117,15 +161,20 @@ export class ChatClient extends EventEmitter {
}),
type: 2,
});

this.ws.send(payload);
}

private handleMessage(raw: string) {
/**
* Handles inbound WebSocket messages, parsing JSON payloads
* and emitting relevant events (speakerRequest, occupancyUpdate, etc.).
*/
private handleMessage(raw: string): void {
let msg: any;
try {
msg = JSON.parse(raw);
} catch {
return;
return; // Invalid JSON, ignoring
}
if (!msg.payload) return;

Expand All @@ -134,6 +183,7 @@ export class ChatClient extends EventEmitter {

const body = safeJson(payload.body);

// 1) Speaker request => "guestBroadcastingEvent=1"
if (body.guestBroadcastingEvent === 1) {
const req: SpeakerRequest = {
userId: body.guestRemoteID,
Expand All @@ -144,6 +194,7 @@ export class ChatClient extends EventEmitter {
this.emit('speakerRequest', req);
}

// 2) Occupancy update => body.occupancy
if (typeof body.occupancy === 'number') {
const update: OccupancyUpdate = {
occupancy: body.occupancy,
Expand All @@ -152,6 +203,7 @@ export class ChatClient extends EventEmitter {
this.emit('occupancyUpdate', update);
}

// 3) Mute/unmute => "guestBroadcastingEvent=16" (mute) or "17" (unmute)
if (body.guestBroadcastingEvent === 16) {
this.emit('muteStateChanged', {
userId: body.guestRemoteID,
Expand All @@ -164,17 +216,30 @@ export class ChatClient extends EventEmitter {
muted: false,
});
}
// Example of guest reaction

// 4) "guestBroadcastingEvent=12" => host accepted a speaker
if (body.guestBroadcastingEvent === 12) {
this.emit('newSpeakerAccepted', {
userId: body.guestRemoteID,
username: body.guestUsername,
sessionUUID: body.sessionUUID,
});
}

// 5) Reaction => body.type=2
if (body?.type === 2) {
this.logger.info('[ChatClient] Emitting guest reaction event =>', body);
this.logger.info('[ChatClient] Emitting guestReaction =>', body);
this.emit('guestReaction', {
displayName: body.displayName,
emoji: body.body,
});
}
}

async disconnect() {
/**
* Closes the WebSocket connection if open, and resets internal state.
*/
public async disconnect(): Promise<void> {
if (this.ws) {
this.logger.info('[ChatClient] Disconnecting...');
this.ws.close();
Expand All @@ -184,6 +249,9 @@ export class ChatClient extends EventEmitter {
}
}

/**
* Helper function to safely parse JSON without throwing.
*/
function safeJson(text: string): any {
try {
return JSON.parse(text);
Expand Down
63 changes: 56 additions & 7 deletions src/spaces/core/JanusAudio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,30 @@ const { nonstandard } = wrtc;
const { RTCAudioSource, RTCAudioSink } = nonstandard;
import { Logger } from '../logger';

/**
* Configuration options for the JanusAudioSource.
*/
interface AudioSourceOptions {
/**
* Optional logger instance for debug/info/warn logs.
*/
logger?: Logger;
}

/**
* Configuration options for the JanusAudioSink.
*/
interface AudioSinkOptions {
/**
* Optional logger instance for debug/info/warn logs.
*/
logger?: Logger;
}

/**
* JanusAudioSource wraps a RTCAudioSource, allowing you to push
* raw PCM frames (Int16Array) into the WebRTC pipeline.
*/
export class JanusAudioSource extends EventEmitter {
private source: any;
private readonly track: MediaStreamTrack;
Expand All @@ -26,16 +42,31 @@ export class JanusAudioSource extends EventEmitter {
this.track = this.source.createTrack();
}

getTrack() {
/**
* Returns the MediaStreamTrack associated with this audio source.
*/
public getTrack(): MediaStreamTrack {
return this.track;
}

pushPcmData(samples: Int16Array, sampleRate: number, channels = 1) {
/**
* Pushes PCM data into the RTCAudioSource. Typically 16-bit, single- or multi-channel frames.
* @param samples - The Int16Array audio samples.
* @param sampleRate - The sampling rate (e.g., 48000).
* @param channels - Number of channels (e.g., 1 for mono).
*/
public pushPcmData(
samples: Int16Array,
sampleRate: number,
channels = 1,
): void {
if (this.logger?.isDebugEnabled()) {
this.logger?.debug(
`[JanusAudioSource] pushPcmData => sampleRate=${sampleRate}, channels=${channels}`,
`[JanusAudioSource] pushPcmData => sampleRate=${sampleRate}, channels=${channels}, frames=${samples.length}`,
);
}

// Feed data into the RTCAudioSource
this.source.onData({
samples,
sampleRate,
Expand All @@ -46,6 +77,10 @@ export class JanusAudioSource extends EventEmitter {
}
}

/**
* JanusAudioSink wraps a RTCAudioSink, providing an event emitter
* that forwards raw PCM frames (Int16Array) to listeners.
*/
export class JanusAudioSink extends EventEmitter {
private sink: any;
private active = true;
Expand All @@ -54,31 +89,45 @@ export class JanusAudioSink extends EventEmitter {
constructor(track: MediaStreamTrack, options?: AudioSinkOptions) {
super();
this.logger = options?.logger;

if (track.kind !== 'audio') {
throw new Error('JanusAudioSink must be an audio track');
throw new Error('[JanusAudioSink] Provided track is not an audio track');
}

// Create RTCAudioSink to listen for PCM frames
this.sink = new RTCAudioSink(track);

// Register callback for PCM frames
this.sink.ondata = (frame: {
samples: Int16Array;
sampleRate: number;
bitsPerSample: number;
channelCount: number;
}) => {
if (!this.active) return;

if (this.logger?.isDebugEnabled()) {
this.logger?.debug(
`[JanusAudioSink] ondata => sampleRate=${frame.sampleRate}, bitsPerSample=${frame.bitsPerSample}, channelCount=${frame.channelCount}`,
`[JanusAudioSink] ondata => ` +
`sampleRate=${frame.sampleRate}, ` +
`bitsPerSample=${frame.bitsPerSample}, ` +
`channelCount=${frame.channelCount}, ` +
`frames=${frame.samples.length}`,
);
}

// Emit 'audioData' event with the raw PCM frame
this.emit('audioData', frame);
};
}

stop() {
/**
* Stops receiving audio data. Once called, no further 'audioData' events will be emitted.
*/
public stop(): void {
this.active = false;
if (this.logger?.isDebugEnabled()) {
this.logger?.debug('[JanusAudioSink] stop');
this.logger?.debug('[JanusAudioSink] stop called => stopping the sink');
}
this.sink?.stop();
}
Expand Down
Loading

0 comments on commit a9b4ec3

Please sign in to comment.