From 83c8cee64ee9712f00ea1e72d381720ebb8dc06d Mon Sep 17 00:00:00 2001 From: slkzgm Date: Sat, 28 Dec 2024 04:03:25 +0100 Subject: [PATCH] feat: add full Spaces support (creation, Janus integration, STT/TTS pipeline) --- package-lock.json | 143 ++++- package.json | 5 +- src/scraper.ts | 100 ++++ src/spaces.ts | 450 +++++++++++++++ src/spaces/core/ChatClient.ts | 149 +++++ src/spaces/core/JanusAudioSource.ts | 58 ++ src/spaces/core/JanusClient.ts | 679 +++++++++++++++++++++++ src/spaces/core/Space.ts | 334 +++++++++++ src/spaces/plugins/MonitorAudioPlugin.ts | 67 +++ src/spaces/plugins/RecordToDiskPlugin.ts | 16 + src/spaces/plugins/SttTtsPlugin.ts | 381 +++++++++++++ src/spaces/test.ts | 130 +++++ src/spaces/types.ts | 79 +++ src/spaces/utils.ts | 149 +++++ src/types/spaces.ts | 263 +++++++++ 15 files changed, 2999 insertions(+), 4 deletions(-) create mode 100644 src/spaces.ts create mode 100644 src/spaces/core/ChatClient.ts create mode 100644 src/spaces/core/JanusAudioSource.ts create mode 100644 src/spaces/core/JanusClient.ts create mode 100644 src/spaces/core/Space.ts create mode 100644 src/spaces/plugins/MonitorAudioPlugin.ts create mode 100644 src/spaces/plugins/RecordToDiskPlugin.ts create mode 100644 src/spaces/plugins/SttTtsPlugin.ts create mode 100644 src/spaces/test.ts create mode 100644 src/spaces/types.ts create mode 100644 src/spaces/utils.ts create mode 100644 src/types/spaces.ts diff --git a/package-lock.json b/package-lock.json index 25c04e1..0bf3212 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,14 +1,15 @@ { "name": "agent-twitter-client", - "version": "0.0.16", + "version": "0.0.17", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "agent-twitter-client", - "version": "0.0.16", + "version": "0.0.17", "license": "MIT", "dependencies": { + "@roamhq/wrtc": "^0.8.0", "@sinclair/typebox": "^0.32.20", "headers-polyfill": "^3.1.2", "json-stable-stringify": "^1.0.2", @@ -18,7 +19,8 @@ "tough-cookie": "^4.1.2", "tslib": "^2.5.2", "twitter-api-v2": "^1.18.2", - "undici": "^7.1.1" + "undici": "^7.1.1", + "ws": "^8.18.0" }, "devDependencies": { "@commitlint/cli": "^17.6.3", @@ -29,6 +31,7 @@ "@types/node": "^22.9.1", "@types/set-cookie-parser": "^2.4.2", "@types/tough-cookie": "^4.0.2", + "@types/ws": "^8.5.13", "@typescript-eslint/eslint-plugin": "^5.59.7", "@typescript-eslint/parser": "^5.59.7", "dotenv": "^16.4.5", @@ -1970,6 +1973,85 @@ "node": ">=14" } }, + "node_modules/@roamhq/wrtc": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@roamhq/wrtc/-/wrtc-0.8.0.tgz", + "integrity": "sha512-C0V/nqc4/2xzORI5qa4mIeN/8UO3ywN1kInrJ9u6GljFx0D18JMUJEqe8yYHa61RrEeoWN3PKdW++k8TocSx/A==", + "license": "BSD-2-Clause", + "optionalDependencies": { + "@roamhq/wrtc-darwin-arm64": "0.8.0", + "@roamhq/wrtc-darwin-x64": "0.8.0", + "@roamhq/wrtc-linux-arm64": "0.8.1", + "@roamhq/wrtc-linux-x64": "0.8.1", + "@roamhq/wrtc-win32-x64": "0.8.0", + "domexception": "^4.0.0" + } + }, + "node_modules/@roamhq/wrtc-darwin-arm64": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@roamhq/wrtc-darwin-arm64/-/wrtc-darwin-arm64-0.8.0.tgz", + "integrity": "sha512-OtV2KWO7zOG3L8TF3KCt9aucynVCD/ww2xeXXgg+FLkya3ca0uzehN8EQJ3BL4tkInksbFJ2ssyu9cehfJ3ZuA==", + "cpu": [ + "arm64" + ], + "license": "BSD-2-Clause", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@roamhq/wrtc-darwin-x64": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@roamhq/wrtc-darwin-x64/-/wrtc-darwin-x64-0.8.0.tgz", + "integrity": "sha512-VY7Vzt/SDDDCpW//h8GW9bOZrOr8gWXPZVD9473ypl4jyBIoO57yyLbHzd1G0vBUkS6szsHlQCz1WwpI30YL+g==", + "cpu": [ + "x64" + ], + "license": "BSD-2-Clause", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@roamhq/wrtc-linux-arm64": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@roamhq/wrtc-linux-arm64/-/wrtc-linux-arm64-0.8.1.tgz", + "integrity": "sha512-FBJLLazlWkGQUXaokC/rTbrUQbb0CNFYry52fZGstufrGLTWu+g4HcwXdVvxh1tnVtVMvkQGk+mlOL52sCxw0A==", + "cpu": [ + "arm64" + ], + "license": "BSD-2-Clause", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@roamhq/wrtc-linux-x64": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@roamhq/wrtc-linux-x64/-/wrtc-linux-x64-0.8.1.tgz", + "integrity": "sha512-I9oWG7b4uvWO1IOR/aF34n+ID6TKVuSs0jd19h5KdhfRtw7FFh9xxuwN9rONPxLVa6fS0q+MCZgAf8Scz89L8Q==", + "cpu": [ + "x64" + ], + "license": "BSD-2-Clause", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@roamhq/wrtc-win32-x64": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@roamhq/wrtc-win32-x64/-/wrtc-win32-x64-0.8.0.tgz", + "integrity": "sha512-R2fxl41BLWPiP4eaTHGLzbbVvRjx1mV/OsgINCvawO7Hwz5Zx9I45+Fhrw3hd4n5amIeSG9VIF7Kz8eeTFXTGQ==", + "cpu": [ + "x64" + ], + "license": "BSD-2-Clause", + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@rollup/pluginutils": { "version": "5.1.3", "resolved": "https://registry.npmjs.org/@rollup/pluginutils/-/pluginutils-5.1.3.tgz", @@ -2495,6 +2577,16 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/ws": { + "version": "8.5.13", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.13.tgz", + "integrity": "sha512-osM/gWBTPKgHV8XkTunnegTRIsvF6owmf5w+JtAfOw472dptdm0dlGv4xCt6GwQRcC2XVOvvRE/0bAoQcL2QkA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yargs": { "version": "17.0.33", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.33.tgz", @@ -3755,6 +3847,20 @@ "node": ">=6.0.0" } }, + "node_modules/domexception": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/domexception/-/domexception-4.0.0.tgz", + "integrity": "sha512-A2is4PLG+eeSfoTMA95/s4pvAoSo2mKtiM5jlHkAVewmiO8ISFTFKZjH7UAM1Atli/OT/7JHOrJRJiMKUZKYBw==", + "deprecated": "Use your platform's native DOMException instead", + "license": "MIT", + "optional": true, + "dependencies": { + "webidl-conversions": "^7.0.0" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/dot-prop": { "version": "5.3.0", "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-5.3.0.tgz", @@ -9045,6 +9151,16 @@ "node": ">= 8" } }, + "node_modules/webidl-conversions": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", + "integrity": "sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==", + "license": "BSD-2-Clause", + "optional": true, + "engines": { + "node": ">=12" + } + }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -9203,6 +9319,27 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/package.json b/package.json index a293475..bc1fe64 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "test": "jest" }, "dependencies": { + "@roamhq/wrtc": "^0.8.0", "@sinclair/typebox": "^0.32.20", "headers-polyfill": "^3.1.2", "json-stable-stringify": "^1.0.2", @@ -35,7 +36,8 @@ "tough-cookie": "^4.1.2", "tslib": "^2.5.2", "twitter-api-v2": "^1.18.2", - "undici": "^7.1.1" + "undici": "^7.1.1", + "ws": "^8.18.0" }, "devDependencies": { "@commitlint/cli": "^17.6.3", @@ -46,6 +48,7 @@ "@types/node": "^22.9.1", "@types/set-cookie-parser": "^2.4.2", "@types/tough-cookie": "^4.0.2", + "@types/ws": "^8.5.13", "@typescript-eslint/eslint-plugin": "^5.59.7", "@typescript-eslint/parser": "^5.59.7", "dotenv": "^16.4.5", diff --git a/src/scraper.ts b/src/scraper.ts index 29c9030..c3f8075 100644 --- a/src/scraper.ts +++ b/src/scraper.ts @@ -71,6 +71,13 @@ import { sendDirectMessage, SendDirectMessageResponse, } from './messages'; +import { + fetchAudioSpaceById, + fetchAuthenticatePeriscope, + fetchBrowseSpaceTopics, + fetchCommunitySelectQuery, fetchLiveVideoStreamStatus, fetchLoginTwitterToken +} from './spaces'; +import {AudioSpace, Community, LiveVideoStreamStatus, LoginTwitterTokenResponse, Subtopic} from './types/spaces'; const twUrl = 'https://twitter.com'; const UserTweetsUrl = @@ -899,4 +906,97 @@ export class Scraper { return res.value; } + + /** + * Retrieves the details of an Audio Space by its ID. + * @param id The ID of the Audio Space. + * @returns The details of the Audio Space. + */ + public async getAudioSpaceById(id: string): Promise { + const variables = { + id, + isMetatagsQuery: false, + withReplays: true, + withListeners: true, + }; + + return await fetchAudioSpaceById(variables, this.auth); + } + + /** + * Retrieves available space topics. + * @returns An array of space topics. + */ + public async browseSpaceTopics(): Promise { + return await fetchBrowseSpaceTopics(this.auth); + } + + /** + * Retrieves available communities. + * @returns An array of communities. + */ + public async communitySelectQuery(): Promise { + return await fetchCommunitySelectQuery(this.auth); + } + + /** + * Retrieves the status of an Audio Space stream by its media key. + * @param mediaKey The media key of the Audio Space. + * @returns The status of the Audio Space stream. + */ + public async getAudioSpaceStreamStatus( + mediaKey: string, + ): Promise { + return await fetchLiveVideoStreamStatus(mediaKey, this.auth); + } + + /** + * Retrieves the status of an Audio Space by its ID. + * This method internally fetches the Audio Space to obtain the media key, + * then retrieves the stream status using the media key. + * @param audioSpaceId The ID of the Audio Space. + * @returns The status of the Audio Space stream. + */ + public async getAudioSpaceStatus( + audioSpaceId: string, + ): Promise { + const audioSpace = await this.getAudioSpaceById(audioSpaceId); + + const mediaKey = audioSpace.metadata.media_key; + if (!mediaKey) { + throw new Error('Media Key not found in Audio Space metadata.'); + } + + return await this.getAudioSpaceStreamStatus(mediaKey); + } + + /** + * Authenticates Periscope to obtain a token. + * @returns The Periscope authentication token. + */ + public async authenticatePeriscope(): Promise { + return await fetchAuthenticatePeriscope(this.auth); + } + + /** + * Logs in to Twitter via Proxsee using the Periscope JWT. + * @param jwt The JWT obtained from AuthenticatePeriscope. + * @returns The response containing the cookie and user information. + */ + public async loginTwitterToken( + jwt: string, + ): Promise { + return await fetchLoginTwitterToken(jwt, this.auth); + } + + /** + * Orchestrates the flow: get token -> login -> return Periscope cookie + */ + public async getPeriscopeCookie(): Promise { + const periscopeToken = await this.authenticatePeriscope(); + + const loginResponse = await this.loginTwitterToken(periscopeToken); + + return loginResponse.cookie; + } } diff --git a/src/spaces.ts b/src/spaces.ts new file mode 100644 index 0000000..5d0acfb --- /dev/null +++ b/src/spaces.ts @@ -0,0 +1,450 @@ +import { TwitterAuth } from './auth'; +import { updateCookieJar } from './requests'; +import { + AudioSpace, + AudioSpaceByIdResponse, + AudioSpaceByIdVariables, + AuthenticatePeriscopeResponse, + BrowseSpaceTopicsResponse, + Community, + CommunitySelectQueryResponse, + LiveVideoStreamStatus, + LoginTwitterTokenResponse, + Subtopic, +} from './types/spaces'; + +/** + * Generates a random string that mimics a UUID v4. + */ +// TODO: install and replace with uuidv4 +function generateRandomId(): string { + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { + const r = (Math.random() * 16) | 0; + const v = c === 'x' ? r : (r & 0x3) | 0x8; + return v.toString(16); + }); +} + +/** + * Fetches details of an Audio Space by its ID. + * @param variables The variables required for the GraphQL query. + * @param auth The authentication object. + * @returns The details of the Audio Space. + */ +export async function fetchAudioSpaceById( + variables: AudioSpaceByIdVariables, + auth: TwitterAuth, +): Promise { + const queryId = 'Tvv_cNXCbtTcgdy1vWYPMw'; // Specific to the AudioSpaceById GraphQL query + const operationName = 'AudioSpaceById'; + + // URL encode the variables and features + const variablesEncoded = encodeURIComponent(JSON.stringify(variables)); + const features = { + spaces_2022_h2_spaces_communities: true, + spaces_2022_h2_clipping: true, + creator_subscriptions_tweet_preview_api_enabled: true, + profile_label_improvements_pcf_label_in_post_enabled: false, + rweb_tipjar_consumption_enabled: true, + responsive_web_graphql_exclude_directive_enabled: true, + verified_phone_label_enabled: false, + premium_content_api_read_enabled: false, + communities_web_enable_tweet_community_results_fetch: true, + c9s_tweet_anatomy_moderator_badge_enabled: true, + responsive_web_grok_analyze_button_fetch_trends_enabled: true, + articles_preview_enabled: true, + responsive_web_graphql_skip_user_profile_image_extensions_enabled: false, + responsive_web_edit_tweet_api_enabled: true, + graphql_is_translatable_rweb_tweet_is_translatable_enabled: true, + view_counts_everywhere_api_enabled: true, + longform_notetweets_consumption_enabled: true, + responsive_web_twitter_article_tweet_consumption_enabled: true, + tweet_awards_web_tipping_enabled: false, + creator_subscriptions_quote_tweet_preview_enabled: false, + freedom_of_speech_not_reach_fetch_enabled: true, + standardized_nudges_misinfo: true, + tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled: + true, + rweb_video_timestamps_enabled: true, + longform_notetweets_rich_text_read_enabled: true, + longform_notetweets_inline_media_enabled: true, + responsive_web_graphql_timeline_navigation_enabled: true, + responsive_web_enhance_cards_enabled: false, + }; + const featuresEncoded = encodeURIComponent(JSON.stringify(features)); + + const url = `https://x.com/i/api/graphql/${queryId}/${operationName}?variables=${variablesEncoded}&features=${featuresEncoded}`; + + const onboardingTaskUrl = 'https://api.twitter.com/1.1/onboarding/task.json'; + + // Retrieve necessary cookies and tokens + const cookies = await auth.cookieJar().getCookies(onboardingTaskUrl); + const xCsrfToken = cookies.find((cookie) => cookie.key === 'ct0'); + + const headers = new Headers({ + Accept: '*/*', + Authorization: `Bearer ${(auth as any).bearerToken}`, + 'Content-Type': 'application/json', + Cookie: await auth.cookieJar().getCookieString(onboardingTaskUrl), + 'User-Agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'x-guest-token': (auth as any).guestToken, + 'x-twitter-auth-type': 'OAuth2Client', + 'x-twitter-active-user': 'yes', + 'x-csrf-token': xCsrfToken?.value as string, + }); + + const response = await auth.fetch(url, { + headers, + method: 'GET', + }); + + // Update the cookie jar with any new cookies from the response + await updateCookieJar(auth.cookieJar(), response.headers); + + // Check for errors in the response + if (!response.ok) { + throw new Error(`Failed to fetch Audio Space: ${await response.text()}`); + } + + const data: AudioSpaceByIdResponse = await response.json(); + + if (data.errors && data.errors.length > 0) { + throw new Error(`API Errors: ${JSON.stringify(data.errors)}`); + } + + return data.data.audioSpace; +} + +/** + * Fetches available space topics from Twitter. + * @param auth The authentication object. + * @returns An array of space topics. + */ +export async function fetchBrowseSpaceTopics( + auth: TwitterAuth, +): Promise { + const queryId = 'TYpVV9QioZfViHqEqRZxJA'; + const operationName = 'BrowseSpaceTopics'; + + const variables = {}; + const features = {}; + + const variablesEncoded = encodeURIComponent(JSON.stringify(variables)); + const featuresEncoded = encodeURIComponent(JSON.stringify(features)); + + const url = `https://x.com/i/api/graphql/${queryId}/${operationName}?variables=${variablesEncoded}&features=${featuresEncoded}`; + + const onboardingTaskUrl = 'https://api.twitter.com/1.1/onboarding/task.json'; + + // Retrieve necessary cookies and tokens + const cookies = await auth.cookieJar().getCookies(onboardingTaskUrl); + const xCsrfToken = cookies.find((cookie) => cookie.key === 'ct0'); + + const headers = new Headers({ + Accept: '*/*', + Authorization: `Bearer ${(auth as any).bearerToken}`, + 'Content-Type': 'application/json', + Cookie: await auth.cookieJar().getCookieString(onboardingTaskUrl), + 'User-Agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'x-guest-token': (auth as any).guestToken, + 'x-twitter-auth-type': 'OAuth2Client', + 'x-twitter-active-user': 'yes', + 'x-csrf-token': xCsrfToken?.value as string, + }); + + const response = await auth.fetch(url, { + headers, + method: 'GET', + }); + + // Update the cookie jar with any new cookies from the response + await updateCookieJar(auth.cookieJar(), response.headers); + + // Check for errors in the response + if (!response.ok) { + throw new Error(`Failed to fetch Space Topics: ${await response.text()}`); + } + + const data: BrowseSpaceTopicsResponse = await response.json(); + + if (data.errors && data.errors.length > 0) { + throw new Error(`API Errors: ${JSON.stringify(data.errors)}`); + } + + // Flatten the subtopics from all categories into a single array + return data.data.browse_space_topics.categories.flatMap( + (category) => category.subtopics, + ); +} + +/** + * Fetches available communities from Twitter. + * @param auth The authentication object. + * @returns An array of communities. + */ +export async function fetchCommunitySelectQuery( + auth: TwitterAuth, +): Promise { + const queryId = 'Lue1DfmoW2cc0225t_8z1w'; // Specific to the CommunitySelectQuery GraphQL query + const operationName = 'CommunitySelectQuery'; + + const variables = {}; + const features = {}; + + const variablesEncoded = encodeURIComponent(JSON.stringify(variables)); + const featuresEncoded = encodeURIComponent(JSON.stringify(features)); + + const url = `https://x.com/i/api/graphql/${queryId}/${operationName}?variables=${variablesEncoded}&features=${featuresEncoded}`; + + const onboardingTaskUrl = 'https://api.twitter.com/1.1/onboarding/task.json'; + + // Retrieve necessary cookies and tokens + const cookies = await auth.cookieJar().getCookies(onboardingTaskUrl); + const xCsrfToken = cookies.find((cookie) => cookie.key === 'ct0'); + + const headers = new Headers({ + Accept: '*/*', + Authorization: `Bearer ${(auth as any).bearerToken}`, + 'Content-Type': 'application/json', + Cookie: await auth.cookieJar().getCookieString(onboardingTaskUrl), + 'User-Agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'x-guest-token': (auth as any).guestToken, + 'x-twitter-auth-type': 'OAuth2Client', + 'x-twitter-active-user': 'yes', + 'x-csrf-token': xCsrfToken?.value as string, + }); + + const response = await auth.fetch(url, { + headers, + method: 'GET', + }); + + // Update the cookie jar with any new cookies from the response + await updateCookieJar(auth.cookieJar(), response.headers); + + // Check for errors in the response + if (!response.ok) { + throw new Error( + `Failed to fetch Community Select Query: ${await response.text()}`, + ); + } + + const data: CommunitySelectQueryResponse = await response.json(); + + if (data.errors && data.errors.length > 0) { + throw new Error(`API Errors: ${JSON.stringify(data.errors)}`); + } + + // Return the space_hostable_communities array, which may be empty + return data.data.space_hostable_communities; +} + +/** + * Fetches the status of an Audio Space stream by its media key. + * @param mediaKey The media key of the Audio Space. + * @param auth The authentication object. + * @returns The status of the Audio Space stream. + */ +export async function fetchLiveVideoStreamStatus( + mediaKey: string, + auth: TwitterAuth, +): Promise { + const baseUrl = `https://x.com/i/api/1.1/live_video_stream/status/${mediaKey}`; + const queryParams = new URLSearchParams({ + client: 'web', + use_syndication_guest_id: 'false', + cookie_set_host: 'x.com', + }); + + const url = `${baseUrl}?${queryParams.toString()}`; + + const onboardingTaskUrl = 'https://api.twitter.com/1.1/onboarding/task.json'; + + // Retrieve necessary cookies and tokens + const cookies = await auth.cookieJar().getCookies(onboardingTaskUrl); + const xCsrfToken = cookies.find((cookie) => cookie.key === 'ct0'); + + const headers = new Headers({ + Accept: '*/*', + Authorization: `Bearer ${(auth as any).bearerToken}`, + 'Content-Type': 'application/json', + Cookie: await auth.cookieJar().getCookieString(onboardingTaskUrl), + 'User-Agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'x-guest-token': (auth as any).guestToken, + 'x-twitter-auth-type': 'OAuth2Client', + 'x-twitter-active-user': 'yes', + 'x-csrf-token': xCsrfToken?.value as string, + }); + + try { + const response = await auth.fetch(url, { + method: 'GET', + headers: headers, + }); + + // Update the cookie jar with any new cookies from the response + await updateCookieJar(auth.cookieJar(), response.headers); + + // Check for errors in the response + if (!response.ok) { + throw new Error( + `Failed to fetch live video stream status: ${await response.text()}`, + ); + } + + return await response.json(); + } catch (error) { + console.error( + `Error fetching live video stream status for mediaKey ${mediaKey}:`, + error, + ); + throw error; + } +} + +/** + * Authenticates Periscope to obtain a token. + * @param auth The authentication object. + * @returns The Periscope authentication token. + */ +export async function fetchAuthenticatePeriscope( + auth: TwitterAuth, +): Promise { + const queryId = 'r7VUmxbfqNkx7uwjgONSNw'; + const operationName = 'AuthenticatePeriscope'; + + const variables = {}; + const features = {}; + + const variablesEncoded = encodeURIComponent(JSON.stringify(variables)); + const featuresEncoded = encodeURIComponent(JSON.stringify(features)); + + const url = `https://x.com/i/api/graphql/${queryId}/${operationName}?variables=${variablesEncoded}&features=${featuresEncoded}`; + + const onboardingTaskUrl = 'https://api.twitter.com/1.1/onboarding/task.json'; + + const cookies = await auth.cookieJar().getCookies(onboardingTaskUrl); + const xCsrfToken = cookies.find((cookie) => cookie.key === 'ct0'); + + if (!xCsrfToken) { + throw new Error('CSRF Token (ct0) not found in cookies.'); + } + + const clientTransactionId = generateRandomId(); + + const headers = new Headers({ + Accept: '*/*', + Authorization: `Bearer ${(auth as any).bearerToken}`, + 'Content-Type': 'application/json', + Cookie: await auth.cookieJar().getCookieString(onboardingTaskUrl), + 'User-Agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', + 'x-guest-token': (auth as any).guestToken, + 'x-twitter-auth-type': 'OAuth2Session', + 'x-twitter-active-user': 'yes', + 'x-csrf-token': xCsrfToken.value, + 'x-client-transaction-id': clientTransactionId, + 'sec-ch-ua-platform': '"Windows"', + 'sec-ch-ua': + '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', + 'x-twitter-client-language': 'en', + 'sec-ch-ua-mobile': '?0', + Referer: 'https://x.com/i/spaces/start', + }); + + try { + const response = await auth.fetch(url, { + method: 'GET', + headers: headers, + }); + + await updateCookieJar(auth.cookieJar(), response.headers); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Error ${response.status}: ${errorText}`); + } + + const data: AuthenticatePeriscopeResponse = await response.json(); + + if (data.errors && data.errors.length > 0) { + throw new Error(`API Errors: ${JSON.stringify(data.errors)}`); + } + + if (!data.data.authenticate_periscope) { + throw new Error('Periscope authentication failed, no data returned.'); + } + + return data.data.authenticate_periscope; + } catch (error) { + console.error('Error during Periscope authentication:', error); + throw error; + } +} + +/** + * Logs in to Twitter via Proxsee using the Periscope JWT to obtain a login cookie. + * @param jwt The JWT obtained via AuthenticatePeriscope. + * @param auth The authentication object. + * @returns The response containing the cookie and user information. + */ +export async function fetchLoginTwitterToken( + jwt: unknown, + auth: TwitterAuth, +): Promise { + const url = 'https://proxsee.pscp.tv/api/v2/loginTwitterToken'; + + const idempotenceKey = generateRandomId(); + + const payload = { + jwt: jwt, + vendor_id: 'm5-proxsee-login-a2011357b73e', + create_user: true, + }; + + const headers = new Headers({ + 'Content-Type': 'application/json', + 'User-Agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', + Referer: 'https://x.com/', + 'sec-ch-ua': + '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', + 'sec-ch-ua-platform': '"Windows"', + 'sec-ch-ua-mobile': '?0', + 'X-Periscope-User-Agent': 'Twitter/m5', + 'X-Idempotence': idempotenceKey, + 'X-Attempt': '1', + }); + + try { + const response = await auth.fetch(url, { + method: 'POST', + headers: headers, + body: JSON.stringify(payload), + }); + + // Update the cookie jar with any new cookies from the response + await updateCookieJar(auth.cookieJar(), response.headers); + + // Check if the response is successful + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Error ${response.status}: ${errorText}`); + } + + const data: LoginTwitterTokenResponse = await response.json(); + + if (!data.cookie || !data.user) { + throw new Error('Twitter authentication failed, missing data.'); + } + + return data; + } catch (error) { + console.error('Error logging into Twitter via Proxsee:', error); + throw error; + } +} \ No newline at end of file diff --git a/src/spaces/core/ChatClient.ts b/src/spaces/core/ChatClient.ts new file mode 100644 index 0000000..c2ea89e --- /dev/null +++ b/src/spaces/core/ChatClient.ts @@ -0,0 +1,149 @@ +// src/core/ChatClient.ts + +import WebSocket from 'ws'; +import { EventEmitter } from 'events'; +import type { SpeakerRequest, OccupancyUpdate } from '../types'; + +export class ChatClient extends EventEmitter { + private ws?: WebSocket; + private connected = false; + + constructor( + private readonly spaceId: string, + private readonly accessToken: string, + private readonly endpoint: string, + ) { + super(); + } + + async connect() { + const wsUrl = `${this.endpoint}/chatapi/v1/chatnow`.replace( + 'https://', + 'wss://', + ); + console.log('[ChatClient] Connecting =>', wsUrl); + + this.ws = new WebSocket(wsUrl, { + headers: { + Origin: 'https://x.com', + 'User-Agent': 'Mozilla/5.0', + }, + }); + + await this.setupHandlers(); + } + + private setupHandlers(): Promise { + if (!this.ws) throw new Error('No WebSocket instance'); + + return new Promise((resolve, reject) => { + this.ws!.on('open', () => { + console.log('[ChatClient] Connected'); + this.connected = true; + this.sendAuthAndJoin(); + resolve(); + }); + + this.ws!.on('message', (data) => { + this.handleMessage(data.toString()); + }); + + this.ws!.on('close', () => { + console.log('[ChatClient] Closed'); + this.connected = false; + this.emit('disconnected'); + }); + + this.ws!.on('error', (err) => { + console.error('[ChatClient] Error =>', err); + reject(err); + }); + }); + } + + private sendAuthAndJoin() { + if (!this.ws) return; + // Auth + this.ws.send( + JSON.stringify({ + payload: JSON.stringify({ access_token: this.accessToken }), + kind: 3, + }), + ); + // Join + this.ws.send( + JSON.stringify({ + payload: JSON.stringify({ + body: JSON.stringify({ room: this.spaceId }), + kind: 1, + }), + kind: 2, + }), + ); + } + + private handleMessage(raw: string) { + let msg: any; + try { + msg = JSON.parse(raw); + } catch { + return; + } + if (!msg.payload) return; + + const payload = safeJson(msg.payload); + if (!payload?.body) return; + + const body = safeJson(payload.body); + + // Example of speaker request detection + if (body.guestBroadcastingEvent === 1) { + const req: SpeakerRequest = { + userId: body.guestRemoteID, + username: body.guestUsername, + displayName: payload.sender?.display_name || body.guestUsername, + sessionUUID: body.sessionUUID, + }; + this.emit('speakerRequest', req); + } + + // Example of occupancy update + if (typeof body.occupancy === 'number') { + const update: OccupancyUpdate = { + occupancy: body.occupancy, + totalParticipants: body.total_participants || 0, + }; + this.emit('occupancyUpdate', update); + } + + // Example of mute state + if (body.guestBroadcastingEvent === 16) { + this.emit('muteStateChanged', { + userId: body.guestRemoteID, + muted: true, + }); + } + if (body.guestBroadcastingEvent === 17) { + this.emit('muteStateChanged', { + userId: body.guestRemoteID, + muted: false, + }); + } + } + + async disconnect() { + if (this.ws) { + this.ws.close(); + this.ws = undefined; + this.connected = false; + } + } +} + +function safeJson(text: string): any { + try { + return JSON.parse(text); + } catch { + return null; + } +} diff --git a/src/spaces/core/JanusAudioSource.ts b/src/spaces/core/JanusAudioSource.ts new file mode 100644 index 0000000..ccee7ab --- /dev/null +++ b/src/spaces/core/JanusAudioSource.ts @@ -0,0 +1,58 @@ +// src/core/audio.ts + +import { EventEmitter } from 'events'; +import { nonstandard } from '@roamhq/wrtc'; +const { RTCAudioSource, RTCAudioSink } = nonstandard; + +export class JanusAudioSource extends EventEmitter { + private source: any; + private track: MediaStreamTrack; + + constructor() { + super(); + this.source = new RTCAudioSource(); + this.track = this.source.createTrack(); + } + + getTrack() { + return this.track; + } + + pushPcmData(samples: Int16Array, sampleRate: number, channels = 1) { + this.source.onData({ + samples, + sampleRate, + bitsPerSample: 16, + channelCount: channels, + numberOfFrames: samples.length / channels, + }); + } +} + +export class JanusAudioSink extends EventEmitter { + private sink: any; + private active = true; + + constructor(track: MediaStreamTrack) { + super(); + if (track.kind !== 'audio') + throw new Error('JanusAudioSink must be an audio track'); + + this.sink = new RTCAudioSink(track); + + this.sink.ondata = (frame: { + samples: Int16Array; + sampleRate: number; + bitsPerSample: number; + channelCount: number; + }) => { + if (!this.active) return; + this.emit('audioData', frame); + }; + } + + stop() { + this.active = false; + this.sink?.stop(); + } +} diff --git a/src/spaces/core/JanusClient.ts b/src/spaces/core/JanusClient.ts new file mode 100644 index 0000000..bc24d4b --- /dev/null +++ b/src/spaces/core/JanusClient.ts @@ -0,0 +1,679 @@ +// src/core/JanusClient.ts + +import { EventEmitter } from 'events'; +import { RTCPeerConnection, MediaStream } from '@roamhq/wrtc'; +import { JanusAudioSink, JanusAudioSource } from './JanusAudioSource'; +import type { AudioDataWithUser, TurnServersInfo } from '../types'; + +interface JanusConfig { + webrtcUrl: string; + roomId: string; + credential: string; + userId: string; + streamName: string; + turnServers: TurnServersInfo; +} + +/** + * This class is in charge of the Janus session, handle, + * joining the videoroom, and polling events. + */ +export class JanusClient extends EventEmitter { + private sessionId?: number; + private handleId?: number; + private publisherId?: number; + private pc?: RTCPeerConnection; + private localAudioSource?: JanusAudioSource; + private pollActive = false; + private eventWaiters: Array<{ + predicate: (evt: any) => boolean; + resolve: (value: any) => void; + reject: (error: Error) => void; + }> = []; + private subscribers = new Map< + string, + { + handleId: number; + pc: RTCPeerConnection; + } + >(); + + constructor(private readonly config: JanusConfig) { + super(); + } + + async initialize() { + // 1) Create session + this.sessionId = await this.createSession(); + + // 2) Attach plugin + this.handleId = await this.attachPlugin(); + + // 3) Start polling events right now + this.pollActive = true; + this.startPolling(); + + // 4) Create room + await this.createRoom(); + + // 3) Join room + this.publisherId = await this.joinRoom(); + + // 4) Create local RTCPeerConnection + this.pc = new RTCPeerConnection({ + iceServers: [ + { + urls: this.config.turnServers.uris, + username: this.config.turnServers.username, + credential: this.config.turnServers.password, + }, + ], + }); + this.setupPeerEvents(); + + this.enableLocalAudio(); + // 6) Do the initial configure -> generate Offer -> setLocalDesc -> send -> setRemoteDesc + await this.configurePublisher(); + + console.log('[JanusClient] Initialization complete'); + } + + public async subscribeSpeaker(userId: string): Promise { + console.log('[JanusClient] subscribeSpeaker => userId=', userId); + + // 1) Attach plugin as subscriber + const subscriberHandleId = await this.attachPlugin(); + console.log('[JanusClient] subscriber handle =>', subscriberHandleId); + + // 2) Wait for an event with "publishers" to discover feedId + // We do *not* check sender === subscriberHandleId because Hydra + // might send it from the main handle or another handle. + const publishersEvt = await this.waitForJanusEvent( + (e) => + e.janus === 'event' && + e.plugindata?.plugin === 'janus.plugin.videoroom' && + e.plugindata?.data?.videoroom === 'event' && + Array.isArray(e.plugindata?.data?.publishers) && + e.plugindata?.data?.publishers.length > 0, + 8000, + 'discover feed_id from "publishers"', + ); + + // Extract the feedId from the first publisher whose 'display' or 'periscope_user_id' = userId + // (in your logs, 'display' or 'periscope_user_id' is the actual user) + const list = publishersEvt.plugindata.data.publishers as any[]; + const pub = list.find( + (p) => p.display === userId || p.periscope_user_id === userId, + ); + if (!pub) { + throw new Error( + `[JanusClient] subscribeSpeaker => No publisher found for userId=${userId}`, + ); + } + const feedId = pub.id; + console.log('[JanusClient] found feedId =>', feedId); + + // 3) "join" as subscriber with "streams: [{ feed, mid: '0', send: true }]" + const joinBody = { + request: 'join', + room: this.config.roomId, + periscope_user_id: this.config.userId, + ptype: 'subscriber', + streams: [ + { + feed: feedId, + mid: '0', + send: true, + }, + ], + }; + await this.sendJanusMessage(subscriberHandleId, joinBody); + + // 4) Wait for "attached" + jsep.offer from *this subscriber handle* + // Now we do filter on e.sender === subscriberHandleId + const attachedEvt = await this.waitForJanusEvent( + (e) => + e.janus === 'event' && + e.sender === subscriberHandleId && + e.plugindata?.plugin === 'janus.plugin.videoroom' && + e.plugindata?.data?.videoroom === 'attached' && + e.jsep?.type === 'offer', + 8000, + 'subscriber attached + offer', + ); + console.log('[JanusClient] subscriber => "attached" with offer'); + + const offer = attachedEvt.jsep; + + // 5) Create subPc, setRemoteDescription(offer), createAnswer, setLocalDescription(answer) + const subPc = new RTCPeerConnection({ + iceServers: [ + { + urls: this.config.turnServers.uris, + username: this.config.turnServers.username, + credential: this.config.turnServers.password, + }, + ], + }); + + // Ontrack => parse PCM via JanusAudioSink + subPc.ontrack = (evt) => { + console.log('[JanusClient] subscriber track =>', evt.track.kind); + + // TODO: REMOVE DEBUG + console.log( + '[JanusClient] subscriber track => kind=', + evt.track.kind, + 'readyState=', + evt.track.readyState, + 'muted=', + evt.track.muted, + ); + + const sink = new JanusAudioSink(evt.track); + sink.on('audioData', (frame) => { + // TODO: REMOVE DEBUG + // console.log( + // '[AudioSink] got an audio frame => sampleRate=', + // frame.sampleRate, + // 'length=', + // frame.samples.length, + // ); + // console.log( + // '[AudioSink] sampleRate=', + // frame.sampleRate, + // 'channels=', + // frame.channelCount, + // ); + // const { samples } = frame; // Int16Array + // let maxVal = 0; + // for (let i = 0; i < samples.length; i++) { + // const val = Math.abs(samples[i]); + // if (val > maxVal) maxVal = val; + // } + // console.log(`[AudioSink] userId=${userId}, maxAmplitude=${maxVal}`); + + this.emit('audioDataFromSpeaker', { + userId, + bitsPerSample: frame.bitsPerSample, + sampleRate: frame.sampleRate, + numberOfFrames: frame.numberOfFrames, + channelCount: frame.channelCount, + samples: frame.samples, + } as AudioDataWithUser); + }); + }; + + await subPc.setRemoteDescription(offer); + const answer = await subPc.createAnswer(); + await subPc.setLocalDescription(answer); + + // 6) Send "start" with jsep=answer + await this.sendJanusMessage( + subscriberHandleId, + { + request: 'start', + room: this.config.roomId, + periscope_user_id: this.config.userId, + }, + answer, + ); + console.log('[JanusClient] subscriber => done (user=', userId, ')'); + + // Store for potential cleanup + this.subscribers.set(userId, { handleId: subscriberHandleId, pc: subPc }); + } + + pushLocalAudio(samples: Int16Array, sampleRate: number, channels = 1) { + if (!this.localAudioSource) { + console.warn('[JanusClient] No localAudioSource; enabling now...'); + this.enableLocalAudio(); + } + this.localAudioSource?.pushPcmData(samples, sampleRate, channels); + } + + enableLocalAudio() { + if (!this.pc) { + console.warn('[JanusClient] No RTCPeerConnection'); + return; + } + if (this.localAudioSource) { + console.log('[JanusClient] localAudioSource already active'); + return; + } + this.localAudioSource = new JanusAudioSource(); + const track = this.localAudioSource.getTrack(); + const localStream = new MediaStream(); + localStream.addTrack(track); + this.pc.addTrack(track, localStream); + } + + async stop() { + console.log('[JanusClient] Stopping...'); + this.pollActive = false; + // leave the room, etc. + // close PC + if (this.pc) { + this.pc.close(); + this.pc = undefined; + } + } + + getSessionId() { + return this.sessionId; + } + getHandleId() { + return this.handleId; + } + getPublisherId() { + return this.publisherId; + } + + // ------------------- Private Methods -------------------- + + private async createSession(): Promise { + const transaction = this.randomTid(); + const resp = await fetch(this.config.webrtcUrl, { + method: 'POST', + headers: { + Authorization: this.config.credential, + 'Content-Type': 'application/json', + Referer: 'https://x.com', + }, + body: JSON.stringify({ + janus: 'create', + transaction, + }), + }); + if (!resp.ok) throw new Error('[JanusClient] createSession failed'); + const json = await resp.json(); + if (json.janus !== 'success') + throw new Error('[JanusClient] createSession invalid response'); + return json.data.id; // sessionId + } + + private async attachPlugin(): Promise { + if (!this.sessionId) throw new Error('[JanusClient] no sessionId'); + + const transaction = this.randomTid(); + const resp = await fetch(`${this.config.webrtcUrl}/${this.sessionId}`, { + method: 'POST', + headers: { + Authorization: this.config.credential, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + janus: 'attach', + plugin: 'janus.plugin.videoroom', + transaction, + }), + }); + if (!resp.ok) throw new Error('[JanusClient] attachPlugin failed'); + const json = await resp.json(); + if (json.janus !== 'success') + throw new Error('[JanusClient] attachPlugin invalid response'); + return json.data.id; + } + + private async createRoom() { + if (!this.sessionId || !this.handleId) { + throw new Error('[JanusClient] No session/handle'); + } + + const transaction = this.randomTid(); + const body = { + request: 'create', + room: this.config.roomId, + periscope_user_id: this.config.userId, + audiocodec: 'opus', + videocodec: 'h264', + transport_wide_cc_ext: true, + app_component: 'audio-room', + h264_profile: '42e01f', + dummy_publisher: false, + }; + + // Send the "create" request + const resp = await fetch( + `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, + { + method: 'POST', + headers: { + Authorization: this.config.credential, + 'Content-Type': 'application/json', + Referer: 'https://x.com', + }, + body: JSON.stringify({ + janus: 'message', + transaction, + body, + }), + }, + ); + + if (!resp.ok) { + throw new Error(`[JanusClient] createRoom failed => ${resp.status}`); + } + + const json = await resp.json(); + console.log('[JanusClient] createRoom =>', JSON.stringify(json)); + + // Check if Janus responded with videoroom:"created" + if (json.janus === 'error') { + throw new Error( + `[JanusClient] createRoom error => ${ + json.error?.reason || 'Unknown error' + }`, + ); + } + + if (json.plugindata?.data?.videoroom !== 'created') { + throw new Error( + `[JanusClient] unexpected createRoom response => ${JSON.stringify( + json, + )}`, + ); + } + + console.log( + `[JanusClient] Room '${this.config.roomId}' created successfully`, + ); + } + + private async joinRoom(): Promise { + if (!this.sessionId || !this.handleId) + throw new Error('[JanusClient] no session/handle'); + + // Wait for the event that indicates we joined + // Typically: evt.janus === 'event' && evt.plugindata?.data?.videoroom === 'joined' + const evtPromise = this.waitForJanusEvent( + (e) => { + return ( + e.janus === 'event' && + e.plugindata?.plugin === 'janus.plugin.videoroom' && + e.plugindata?.data?.videoroom === 'joined' + ); + }, + 12000, + 'Host Joined Event', + ); + + const body = { + request: 'join', + room: this.config.roomId, + ptype: 'publisher', + display: this.config.userId, + periscope_user_id: this.config.userId, + }; + await this.sendJanusMessage(this.handleId, body); + + const evt = await evtPromise; + + const publisherId = evt.plugindata.data.id; + console.log('[JanusClient] joined room => publisherId=', publisherId); + return publisherId; + } + + private async configurePublisher() { + if (!this.pc || !this.sessionId || !this.handleId) return; + + console.log('[JanusClient] createOffer...'); + const offer = await this.pc.createOffer({ + offerToReceiveAudio: true, + offerToReceiveVideo: false, + }); + await this.pc.setLocalDescription(offer); + + console.log('[JanusClient] sending configure with JSEP...'); + await this.sendJanusMessage( + this.handleId, + { + request: 'configure', + room: this.config.roomId, + periscope_user_id: this.config.userId, + session_uuid: '', + stream_name: this.config.streamName, + vidman_token: this.config.credential, + }, + offer, + ); + + console.log('[JanusClient] waiting for answer...'); + // In a real scenario, we do an event-based wait for jsep.type === 'answer'. + // For MVP, let's do a poll in handleJanusEvent for that "answer" and setRemoteDesc + } + + private async sendJanusMessage( + handleId: number, + body: any, + jsep?: any, + ): Promise { + if (!this.sessionId) { + throw new Error('[JanusClient] No session'); + } + const transaction = this.randomTid(); + + const resp = await fetch( + `${this.config.webrtcUrl}/${this.sessionId}/${handleId}`, + { + method: 'POST', + headers: { + Authorization: this.config.credential, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + janus: 'message', + transaction, + body, + jsep, + }), + }, + ); + + if (!resp.ok) { + throw new Error( + '[JanusClient] sendJanusMessage failed => ' + resp.status, + ); + } + } + + private startPolling() { + console.log('[JanusClient] Starting polling...'); + const doPoll = async () => { + if (!this.pollActive || !this.sessionId) { + console.log('[JanusClient] Polling stopped'); + return; + } + try { + const url = `${this.config.webrtcUrl}/${ + this.sessionId + }?maxev=1&_=${Date.now()}`; + const resp = await fetch(url, { + headers: { Authorization: this.config.credential }, + }); + if (resp.ok) { + const event = await resp.json(); + this.handleJanusEvent(event); + } else { + console.log('[JanusClient] poll error =>', resp.status); + } + } catch (err) { + console.error('[JanusClient] poll exception =>', err); + } + setTimeout(doPoll, 500); + }; + doPoll(); + } + + private handleJanusEvent(evt: any) { + console.log('[JanusClient] handleJanusEvent =>', JSON.stringify(evt)); + + if (!evt.janus) return; + if (evt.janus === 'keepalive') { + return; + } + if (evt.janus === 'webrtcup') { + console.log('[JanusClient] webrtcup =>', evt.sender); + } + if (evt.jsep && evt.jsep.type === 'answer') { + this.onReceivedAnswer(evt.jsep); + } + if (evt.plugindata?.data?.id) { + // e.g. publisherId + this.publisherId = evt.plugindata.data.id; + } + if (evt.error) { + console.error('[JanusClient] Janus error =>', evt.error.reason); + this.emit('error', new Error(evt.error.reason)); + } + + for (let i = 0; i < this.eventWaiters.length; i++) { + const waiter = this.eventWaiters[i]; + if (waiter.predicate(evt)) { + // remove from the array + this.eventWaiters.splice(i, 1); + // resolve the promise + waiter.resolve(evt); + break; // important: only resolve one waiter + } + } + // Add more logic if needed + } + + private async onReceivedAnswer(answer: any) { + if (!this.pc) return; + console.log('[JanusClient] got answer => setRemoteDescription'); + await this.pc.setRemoteDescription(answer); + } + + private setupPeerEvents() { + if (!this.pc) return; + + this.pc.addEventListener('iceconnectionstatechange', () => { + console.log('[JanusClient] ICE state =>', this.pc?.iceConnectionState); + if (this.pc?.iceConnectionState === 'failed') { + this.emit('error', new Error('ICE connection failed')); + } + }); + + this.pc.addEventListener('track', (evt) => { + console.log('[JanusClient] track =>', evt.track.kind); + // Here you can attach a JanusAudioSink to parse PCM frames + }); + } + + private randomTid() { + return Math.random().toString(36).slice(2, 10); + } + + /** + * Allows code to wait for a specific Janus event that matches a predicate + */ + private async waitForJanusEvent( + predicate: (evt: any) => boolean, + timeoutMs = 5000, + description = 'some event', + ): Promise { + return new Promise((resolve, reject) => { + const waiter = { + predicate, + resolve, + reject, + }; + this.eventWaiters.push(waiter); + + setTimeout(() => { + const idx = this.eventWaiters.indexOf(waiter); + if (idx !== -1) { + this.eventWaiters.splice(idx, 1); + console.log( + `[JanusClient] waitForJanusEvent => timed out waiting for: ${description}`, + ); + reject( + new Error( + `[JanusClient] waitForJanusEvent (expecting "${description}") timed out after ${timeoutMs}ms`, + ), + ); + } + }, timeoutMs); + }); + } + + public async destroyRoom(): Promise { + if (!this.sessionId || !this.handleId) { + console.warn('[JanusClient] destroyRoom => no session/handle'); + return; + } + if (!this.config.roomId || !this.config.userId) { + console.warn('[JanusClient] destroyRoom => no roomId/userId'); + return; + } + + const transaction = this.randomTid(); + const body = { + request: 'destroy', + room: this.config.roomId, + periscope_user_id: this.config.userId, + }; + + console.log('[JanusClient] destroying room =>', body); + const resp = await fetch( + `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, + { + method: 'POST', + headers: { + Authorization: this.config.credential, + 'Content-Type': 'application/json', + Referer: 'https://x.com', + }, + body: JSON.stringify({ + janus: 'message', + transaction, + body, + }), + }, + ); + + if (!resp.ok) { + throw new Error(`[JanusClient] destroyRoom failed => ${resp.status}`); + } + + const json = await resp.json(); + console.log('[JanusClient] destroyRoom =>', JSON.stringify(json)); + } + + public async leaveRoom(): Promise { + if (!this.sessionId || !this.handleId) { + console.warn('[JanusClient] leaveRoom => no session/handle'); + return; + } + const transaction = this.randomTid(); + const body = { + request: 'leave', + room: this.config.roomId, + periscope_user_id: this.config.userId, + }; + + console.log('[JanusClient] leaving room =>', body); + const resp = await fetch( + `${this.config.webrtcUrl}/${this.sessionId}/${this.handleId}`, + { + method: 'POST', + headers: { + Authorization: this.config.credential, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + janus: 'message', + transaction, + body, + }), + }, + ); + if (!resp.ok) { + throw new Error(`[JanusClient] leaveRoom => error code ${resp.status}`); + } + const json = await resp.json(); + console.log('[JanusClient] leaveRoom =>', JSON.stringify(json)); + } +} diff --git a/src/spaces/core/Space.ts b/src/spaces/core/Space.ts new file mode 100644 index 0000000..42c50df --- /dev/null +++ b/src/spaces/core/Space.ts @@ -0,0 +1,334 @@ +// src/core/Space.ts + +import { EventEmitter } from 'events'; +import { ChatClient } from './ChatClient'; +import { JanusClient } from './JanusClient'; +import { + getTurnServers, + createBroadcast, + publishBroadcast, + authorizeToken, + getRegion, +} from '../utils'; +import type { + SpaceConfig, + BroadcastCreated, + SpeakerRequest, + OccupancyUpdate, + Plugin, + AudioDataWithUser, + PluginRegistration, +} from '../types'; +import { Scraper } from '../../scraper'; + +/** + * This class orchestrates: + * 1) Creation of the broadcast + * 2) Instantiation of Janus + Chat + * 3) Approve speakers, push audio, etc. + */ +export class Space extends EventEmitter { + private janusClient?: JanusClient; + private chatClient?: ChatClient; + private authToken?: string; + private broadcastInfo?: BroadcastCreated; + private isInitialized = false; + private plugins = new Set(); + + constructor(private readonly scraper: Scraper) { + super(); + } + + public use(plugin: Plugin, config?: Record) { + const registration: PluginRegistration = { plugin, config }; + this.plugins.add(registration); + + console.log('[Space] Plugin added =>', plugin.constructor.name); + + plugin.onAttach?.(this); + + if (this.isInitialized && plugin.init) { + plugin.init({ + space: this, + pluginConfig: config, + }); + } + + return this; + } + + /** + * Main entry point + */ + async initialize(config: SpaceConfig) { + console.log('[Space] Initializing...'); + + // 1) get Periscope cookie + const cookie = await this.scraper.getPeriscopeCookie(); + + // 2) get region + const region = await getRegion(); + console.log('[Space] Got region =>', region); + + // 3) create broadcast + console.log('[Space] Creating broadcast...'); + const broadcast = await createBroadcast({ + description: config.description, + languages: config.languages, + cookie, + region, + }); + this.broadcastInfo = broadcast; + + // 4) Authorize token if needed + console.log('[Space] Authorizing token...'); + this.authToken = await authorizeToken(cookie); + + // 5) Get TURN servers + console.log('[Space] Getting turn servers...'); + const turnServers = await getTurnServers(cookie); + + // 6) Create Janus client + this.janusClient = new JanusClient({ + webrtcUrl: broadcast.webrtc_gw_url, + roomId: broadcast.room_id, + credential: broadcast.credential, + userId: broadcast.broadcast.user_id, + streamName: broadcast.stream_name, + turnServers, + }); + await this.janusClient.initialize(); + + this.janusClient.on('audioDataFromSpeaker', (data: AudioDataWithUser) => { + // console.log('[Space] Received PCM from speaker =>', data.userId); + this.handleAudioData(data); + // You can store or forward to a plugin, run STT, etc. + }); + + // 7) Publish the broadcast + console.log('[Space] Publishing broadcast...'); + await publishBroadcast({ + title: config.title || '', + broadcast, + cookie, + janusSessionId: this.janusClient.getSessionId(), + janusHandleId: this.janusClient.getHandleId(), + janusPublisherId: this.janusClient.getPublisherId(), + }); + + // 8) If interactive, open chat + if (config.mode === 'INTERACTIVE') { + console.log('[Space] Connecting chat...'); + this.chatClient = new ChatClient( + broadcast.room_id, + broadcast.access_token, + broadcast.endpoint, + ); + await this.chatClient.connect(); + this.setupChatEvents(); + } + + this.isInitialized = true; + console.log('[Space] Initialized =>', broadcast.share_url); + + for (const { plugin, config: pluginConfig } of this.plugins) { + if (plugin.init) { + plugin.init({ + space: this, + pluginConfig, + }); + } + } + + console.log('[Space] All plugins initialized'); + return broadcast; + } + + private setupChatEvents() { + if (!this.chatClient) return; + + this.chatClient.on('speakerRequest', (req: SpeakerRequest) => { + console.log('[Space] Speaker request =>', req); + this.emit('speakerRequest', req); + }); + this.chatClient.on('occupancyUpdate', (update: OccupancyUpdate) => { + this.emit('occupancyUpdate', update); + }); + this.chatClient.on('muteStateChanged', (evt) => { + this.emit('muteStateChanged', evt); + }); + } + + /** + * Approves a speaker on Periscope side, then subscribes on Janus side + */ + async approveSpeaker(userId: string, sessionUUID: string) { + if (!this.isInitialized || !this.broadcastInfo) { + throw new Error('[Space] Not initialized or no broadcastInfo'); + } + + if (!this.authToken) { + throw new Error('[Space] No auth token available'); + } + + // 1) Call the "request/approve" endpoint + await this.callApproveEndpoint( + this.broadcastInfo, + this.authToken, + userId, + sessionUUID, + ); + + // 2) Subscribe in Janus => receive speaker's audio + await this.janusClient?.subscribeSpeaker(userId); + } + + private async callApproveEndpoint( + broadcast: BroadcastCreated, + authorizationToken: string, + userId: string, + sessionUUID: string, + ): Promise { + const endpoint = 'https://guest.pscp.tv/api/v1/audiospace/request/approve'; + + const headers = { + 'Content-Type': 'application/json', + Referer: 'https://x.com/', + Authorization: authorizationToken, + }; + + const body = { + ntpForBroadcasterFrame: '2208988800024000300', + ntpForLiveFrame: '2208988800024000300', + chat_token: broadcast.access_token, + session_uuid: sessionUUID, + }; + + console.log('[Space] Approving speaker =>', endpoint, body); + const resp = await fetch(endpoint, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + + if (!resp.ok) { + const error = await resp.text(); + throw new Error( + `[Space] Failed to approve speaker => ${resp.status}: ${error}`, + ); + } + + console.log('[Space] Speaker approved =>', userId); + } + + pushAudio(samples: Int16Array, sampleRate: number) { + this.janusClient?.pushLocalAudio(samples, sampleRate); + } + + /** + * This method is called by JanusClient on 'audioDataFromSpeaker' + * or we do it from the 'initialize(...)' once Janus is set up. + */ + private handleAudioData(data: AudioDataWithUser) { + // Forward to plugins + for (const { plugin } of this.plugins) { + plugin.onAudioData?.(data); + } + } + + /** + * Gracefully end the Space (stop broadcast, destroy Janus room, etc.) + */ + public async finalizeSpace(): Promise { + console.log('[Space] finalizeSpace => stopping broadcast gracefully'); + + const tasks: Array> = []; + + if (this.janusClient) { + tasks.push( + this.janusClient.destroyRoom().catch((err) => { + console.error('[Space] destroyRoom error =>', err); + }), + ); + } + + if (this.broadcastInfo) { + tasks.push( + this.endAudiospace({ + broadcastId: this.broadcastInfo.room_id, + chatToken: this.broadcastInfo.access_token, + }).catch((err) => { + console.error('[Space] endAudiospace error =>', err); + }), + ); + } + + if (this.janusClient) { + tasks.push( + this.janusClient.leaveRoom().catch((err) => { + console.error('[Space] leaveRoom error =>', err); + }), + ); + } + + await Promise.all(tasks); + console.log('[Space] finalizeSpace => done.'); + } + + /** + * Calls the endAudiospace endpoint from Twitter + */ + private async endAudiospace(params: { + broadcastId: string; + chatToken: string; + }): Promise { + const url = 'https://guest.pscp.tv/api/v1/audiospace/admin/endAudiospace'; + const headers = { + 'Content-Type': 'application/json', + Referer: 'https://x.com/', + Authorization: this.authToken || '', + }; + + const body = { + broadcast_id: params.broadcastId, + chat_token: params.chatToken, + }; + + console.log('[Space] endAudiospace =>', body); + const resp = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + + if (!resp.ok) { + const errText = await resp.text(); + throw new Error(`[Space] endAudiospace => ${resp.status} ${errText}`); + } + const json = await resp.json(); + console.log('[Space] endAudiospace => success =>', json); + } + + async stop() { + console.log('[Space] Stopping...'); + + await this.finalizeSpace().catch((err) => { + console.error('[Space] finalizeBroadcast error =>', err); + }); + + if (this.chatClient) { + await this.chatClient.disconnect(); + this.chatClient = undefined; + } + if (this.janusClient) { + await this.janusClient.stop(); + this.janusClient = undefined; + } + for (const { plugin } of this.plugins) { + plugin.cleanup?.(); + } + this.plugins.clear(); + + this.isInitialized = false; + } +} diff --git a/src/spaces/plugins/MonitorAudioPlugin.ts b/src/spaces/plugins/MonitorAudioPlugin.ts new file mode 100644 index 0000000..ddc6337 --- /dev/null +++ b/src/spaces/plugins/MonitorAudioPlugin.ts @@ -0,0 +1,67 @@ +// src/plugins/MonitorAudioPlugin.ts + +import { spawn, ChildProcessWithoutNullStreams } from 'child_process'; +import { Plugin, AudioDataWithUser } from '../types'; + +export class MonitorAudioPlugin implements Plugin { + private ffplay?: ChildProcessWithoutNullStreams; + + constructor(private readonly sampleRate = 48000) { + // spawn ffplay reading raw PCM s16le from stdin + // "-nodisp" hides any video window, "-loglevel quiet" reduces console spam + this.ffplay = spawn('ffplay', [ + '-f', + 's16le', + '-ar', + this.sampleRate.toString(), // e.g. "16000" + '-ac', + '1', // mono + '-nodisp', + '-loglevel', + 'quiet', + '-i', + 'pipe:0', + ]); + + this.ffplay.on('error', (err) => { + console.error('[MonitorAudioPlugin] ffplay error =>', err); + }); + this.ffplay.on('close', (code) => { + console.log('[MonitorAudioPlugin] ffplay closed => code=', code); + this.ffplay = undefined; + }); + + console.log('[MonitorAudioPlugin] Started ffplay for real-time monitoring'); + } + + onAudioData(data: AudioDataWithUser): void { + // TODO: REMOVE DEBUG + console.log( + '[MonitorAudioPlugin] onAudioData => user=', + data.userId, + 'samples=', + data.samples.length, + 'sampleRate=', + data.sampleRate, + ); + + // Check sampleRate if needed + if (!this.ffplay?.stdin.writable) return; + + // Suppose data.sampleRate = this.sampleRate + // Convert Int16Array => Buffer + const buf = Buffer.from(data.samples.buffer); + + // Write raw 16-bit PCM samples to ffplay stdin + this.ffplay.stdin.write(buf); + } + + cleanup(): void { + console.log('[MonitorAudioPlugin] Cleanup => stopping ffplay'); + if (this.ffplay) { + this.ffplay.stdin.end(); // close the pipe + this.ffplay.kill(); // kill ffplay process + this.ffplay = undefined; + } + } +} diff --git a/src/spaces/plugins/RecordToDiskPlugin.ts b/src/spaces/plugins/RecordToDiskPlugin.ts new file mode 100644 index 0000000..024f1d7 --- /dev/null +++ b/src/spaces/plugins/RecordToDiskPlugin.ts @@ -0,0 +1,16 @@ +import * as fs from 'fs'; +import { AudioDataWithUser, Plugin } from '../types'; + +export class RecordToDiskPlugin implements Plugin { + private outStream = fs.createWriteStream('/tmp/speaker_audio.raw'); + + onAudioData(data: AudioDataWithUser): void { + // Convert Int16Array -> Buffer + const buf = Buffer.from(data.samples.buffer); + this.outStream.write(buf); + } + + cleanup(): void { + this.outStream.end(); + } +} diff --git a/src/spaces/plugins/SttTtsPlugin.ts b/src/spaces/plugins/SttTtsPlugin.ts new file mode 100644 index 0000000..4888bdd --- /dev/null +++ b/src/spaces/plugins/SttTtsPlugin.ts @@ -0,0 +1,381 @@ +// src/plugins/SttTtsPlugin.ts + +import fs from 'fs'; +import path from 'path'; +import { spawn } from 'child_process'; +import { Plugin, AudioDataWithUser } from '../types'; +import { Space } from '../core/Space'; +import { JanusClient } from '../core/JanusClient'; +import OpenAI from 'openai'; + +interface PluginConfig { + openAiApiKey?: string; // for STT & ChatGPT + elevenLabsApiKey?: string; // for TTS + sttLanguage?: string; // e.g. "en" for Whisper + gptModel?: string; // e.g. "gpt-3.5-turbo" + silenceThreshold?: number; // amplitude threshold for ignoring silence +} + +/** + * MVP plugin for speech-to-text (OpenAI) + conversation + TTS (ElevenLabs) + * Approach: + * - Collect each speaker's unmuted PCM in a memory buffer (only if above silence threshold) + * - On speaker mute -> flush STT -> GPT -> TTS -> push to Janus + */ +export class SttTtsPlugin implements Plugin { + private space?: Space; + private janus?: JanusClient; + + // OpenAI + ElevenLabs + private openAiApiKey?: string; + private openAiClient?: OpenAI; + private elevenLabsApiKey?: string; + + private sttLanguage = 'en'; + private gptModel = 'gpt-3.5-turbo'; + + /** + * userId => arrayOfChunks (PCM Int16) + */ + private pcmBuffers = new Map(); + + /** + * Track mute states: userId => boolean (true=unmuted) + */ + private speakerUnmuted = new Map(); + + /** + * For ignoring near-silence frames (if amplitude < threshold) + */ + private silenceThreshold = 50; // default amplitude threshold + + onAttach(space: Space) { + console.log('[SttTtsPlugin] onAttach => space was attached'); + } + + init(params: { space: Space; pluginConfig?: Record }): void { + console.log( + '[SttTtsPlugin] init => Space fully ready. Subscribing to events.', + ); + + this.space = params.space; + this.janus = (this.space as any)?.janusClient as JanusClient | undefined; + + const config = params.pluginConfig as PluginConfig; + this.openAiApiKey = config?.openAiApiKey; + this.elevenLabsApiKey = config?.elevenLabsApiKey; + if (config?.sttLanguage) this.sttLanguage = config.sttLanguage; + if (config?.gptModel) this.gptModel = config.gptModel; + if (typeof config?.silenceThreshold === 'number') { + this.silenceThreshold = config.silenceThreshold; + } + console.log('[SttTtsPlugin] Plugin config =>', config); + + // Create official OpenAI client if we have an API key + if (this.openAiApiKey) { + this.openAiClient = new OpenAI({ apiKey: this.openAiApiKey }); + console.log('[SttTtsPlugin] OpenAI client initialized'); + } + + // Listen for mute state changes to trigger STT flush + this.space.on( + 'muteStateChanged', + (evt: { userId: string; muted: boolean }) => { + console.log('[SttTtsPlugin] Speaker muteStateChanged =>', evt); + if (evt.muted) { + // speaker just got muted => flush STT + this.handleMute(evt.userId).catch((err) => + console.error('[SttTtsPlugin] handleMute error =>', err), + ); + } else { + // unmuted => start buffering + this.speakerUnmuted.set(evt.userId, true); + if (!this.pcmBuffers.has(evt.userId)) { + this.pcmBuffers.set(evt.userId, []); + } + } + }, + ); + } + + /** + * Called whenever we receive PCM from a speaker + */ + onAudioData(data: AudioDataWithUser): void { + // Skip if speaker is muted or not tracked + if (!this.speakerUnmuted.get(data.userId)) return; + + // Optional: detect silence + let maxVal = 0; + for (let i = 0; i < data.samples.length; i++) { + const val = Math.abs(data.samples[i]); + if (val > maxVal) maxVal = val; + } + if (maxVal < this.silenceThreshold) { + // It's near-silence => skip + return; + } + + // Add chunk + let arr = this.pcmBuffers.get(data.userId); + if (!arr) { + arr = []; + this.pcmBuffers.set(data.userId, arr); + } + arr.push(data.samples); + } + + /** + * On speaker mute => flush STT => GPT => TTS => push to Janus + */ + private async handleMute(userId: string): Promise { + this.speakerUnmuted.set(userId, false); + const chunks = this.pcmBuffers.get(userId) || []; + this.pcmBuffers.set(userId, []); // reset + + if (!chunks.length) { + console.log('[SttTtsPlugin] No audio chunks for user =>', userId); + return; + } + console.log( + `[SttTtsPlugin] Flushing STT buffer for user=${userId}, total chunks=${chunks.length}`, + ); + + // 1) Merge chunks + const totalLen = chunks.reduce((acc, c) => acc + c.length, 0); + const merged = new Int16Array(totalLen); + let offset = 0; + for (const c of chunks) { + merged.set(c, offset); + offset += c.length; + } + + // 2) Convert PCM -> WAV (48kHz) for STT + const wavPath = await this.convertPcmToWav(merged, 48000); + console.log('[SttTtsPlugin] WAV ready =>', wavPath); + + // 3) STT with OpenAI Whisper + const sttText = await this.transcribeWithOpenAI(wavPath, this.sttLanguage); + fs.unlinkSync(wavPath); + if (!sttText.trim()) { + console.log('[SttTtsPlugin] No speech recognized for user =>', userId); + return; + } + console.log(`[SttTtsPlugin] STT => user=${userId}, text="${sttText}"`); + + // 4) GPT + const replyText = await this.askChatGPT(sttText); + console.log(`[SttTtsPlugin] GPT => user=${userId}, reply="${replyText}"`); + + // 5) TTS => returns MP3 + const ttsAudio = await this.elevenLabsTts(replyText); + console.log('[SttTtsPlugin] TTS => got MP3 size=', ttsAudio.length); + + // 6) Convert MP3 -> PCM (48kHz, mono) + const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); + console.log( + '[SttTtsPlugin] TTS => converted to PCM => frames=', + pcm.length, + ); + + // 7) Push frames to Janus + if (this.janus) { + await this.streamToJanus(pcm, 48000); + console.log('[SttTtsPlugin] TTS => done streaming to space'); + } + } + + /** + * Convert Int16 PCM -> WAV using ffmpeg + */ + private convertPcmToWav( + samples: Int16Array, + sampleRate: number, + ): Promise { + return new Promise((resolve, reject) => { + const tmpPath = path.resolve('/tmp', `stt-${Date.now()}.wav`); + const ff = spawn('ffmpeg', [ + '-f', + 's16le', + '-ar', + sampleRate.toString(), + '-ac', + '1', + '-i', + 'pipe:0', + '-y', + tmpPath, + ]); + ff.stdin.write(Buffer.from(samples.buffer)); + ff.stdin.end(); + ff.on('close', (code) => { + if (code === 0) resolve(tmpPath); + else reject(new Error(`ffmpeg error code=${code}`)); + }); + }); + } + + /** + * OpenAI Whisper STT + */ + private async transcribeWithOpenAI(wavPath: string, language: string) { + if (!this.openAiClient) { + throw new Error('[SttTtsPlugin] No OpenAI client available'); + } + try { + console.log('[SttTtsPlugin] Transcribe =>', wavPath); + const fileStream = fs.createReadStream(wavPath); + + const resp = await this.openAiClient.audio.transcriptions.create({ + file: fileStream, + model: 'whisper-1', + language: language, + temperature: 0, + }); + + const text = resp.text?.trim() || ''; + console.log('[SttTtsPlugin] Transcription =>', text); + return text; + } catch (err) { + console.error('[SttTtsPlugin] OpenAI STT Error =>', err); + throw new Error('OpenAI STT failed'); + } + } + + /** + * Simple ChatGPT call + */ + private async askChatGPT(userText: string): Promise { + if (!this.openAiApiKey) { + throw new Error('[SttTtsPlugin] No OpenAI API key for ChatGPT'); + } + const url = 'https://api.openai.com/v1/chat/completions'; + const resp = await fetch(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.openAiApiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + model: this.gptModel, + messages: [ + { role: 'system', content: 'You are a helpful AI assistant.' }, + { role: 'user', content: userText }, + ], + }), + }); + if (!resp.ok) { + const errText = await resp.text(); + throw new Error( + `[SttTtsPlugin] ChatGPT error => ${resp.status} ${errText}`, + ); + } + const json = await resp.json(); + const reply = json.choices?.[0]?.message?.content || ''; + return reply.trim(); + } + + /** + * ElevenLabs TTS => returns MP3 Buffer + */ + private async elevenLabsTts(text: string): Promise { + if (!this.elevenLabsApiKey) { + throw new Error('[SttTtsPlugin] No ElevenLabs API key'); + } + const voiceId = '21m00Tcm4TlvDq8ikWAM'; // example + const url = `https://api.elevenlabs.io/v1/text-to-speech/${voiceId}`; + const resp = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'xi-api-key': this.elevenLabsApiKey, + }, + body: JSON.stringify({ + text, + voice_settings: { stability: 0.4, similarity_boost: 0.8 }, + }), + }); + if (!resp.ok) { + const errText = await resp.text(); + throw new Error( + `[SttTtsPlugin] ElevenLabs TTS error => ${resp.status} ${errText}`, + ); + } + const arrayBuf = await resp.arrayBuffer(); + return Buffer.from(arrayBuf); + } + + /** + * Convert MP3 => PCM via ffmpeg + */ + private convertMp3ToPcm( + mp3Buf: Buffer, + outRate: number, + ): Promise { + return new Promise((resolve, reject) => { + const ff = spawn('ffmpeg', [ + '-i', + 'pipe:0', + '-f', + 's16le', + '-ar', + outRate.toString(), + '-ac', + '1', + 'pipe:1', + ]); + let raw = Buffer.alloc(0); + ff.stdout.on('data', (chunk: Buffer) => { + raw = Buffer.concat([raw, chunk]); + }); + ff.stderr.on('data', () => { + /* ignore ffmpeg logs */ + }); + ff.on('close', (code) => { + if (code !== 0) { + reject(new Error(`ffmpeg error code=${code}`)); + return; + } + const samples = new Int16Array( + raw.buffer, + raw.byteOffset, + raw.byteLength / 2, + ); + resolve(samples); + }); + ff.stdin.write(mp3Buf); + ff.stdin.end(); + }); + } + + /** + * Push PCM back to Janus in small frames + * We'll do 10ms @48k => 960 samples per frame + */ + private async streamToJanus( + samples: Int16Array, + sampleRate: number, + ): Promise { + // 10 ms => 480 samples @48k + const FRAME_SIZE = 480; + + for ( + let offset = 0; + offset + FRAME_SIZE <= samples.length; + offset += FRAME_SIZE + ) { + // Option 1: subarray + .set + const frame = new Int16Array(FRAME_SIZE); + frame.set(samples.subarray(offset, offset + FRAME_SIZE)); + + this.janus?.pushLocalAudio(frame, sampleRate, 1); + await new Promise((r) => setTimeout(r, 10)); + } + } + + cleanup(): void { + console.log('[SttTtsPlugin] cleanup => releasing resources'); + this.pcmBuffers.clear(); + this.speakerUnmuted.clear(); + } +} diff --git a/src/spaces/test.ts b/src/spaces/test.ts new file mode 100644 index 0000000..ab93281 --- /dev/null +++ b/src/spaces/test.ts @@ -0,0 +1,130 @@ +// src/test.ts + +import 'dotenv/config'; +import { Space } from './core/Space'; +import { Scraper } from '../scraper'; // Adjust the path if needed +import { SpaceConfig } from './types'; +import { MonitorAudioPlugin } from './plugins/MonitorAudioPlugin'; +import { RecordToDiskPlugin } from './plugins/RecordToDiskPlugin'; +import { SttTtsPlugin } from './plugins/SttTtsPlugin'; + +/** + * Main test entry point + */ +async function main() { + console.log('[Test] Starting...'); + + // 1) Twitter login with your scraper + const scraper = new Scraper(); + await scraper.login( + process.env.TWITTER_USERNAME!, + process.env.TWITTER_PASSWORD!, + ); + + // 2) Create Space instance + const space = new Space(scraper); + + // const monitorPlugin = new MonitorAudioPlugin(1600); + // space.use(monitorPlugin); + const recordPlugin = new RecordToDiskPlugin(); + space.use(recordPlugin); + const sttTtsPlugin = new SttTtsPlugin(); + space.use(sttTtsPlugin, { + openAiApiKey: process.env.OPENAI_API_KEY, + elevenLabsApiKey: process.env.ELEVENLABS_API_KEY, + }); + + const config: SpaceConfig = { + mode: 'INTERACTIVE', + title: 'Chunked beep test', + description: 'Proper chunked beep to avoid .byteLength error', + languages: ['en'], + }; + + + // 3) Initialize the Space + const broadcastInfo = await space.initialize(config); + const spaceUrl = broadcastInfo.share_url.replace('broadcasts', 'spaces'); + console.log( + '[Test] Space created =>', + spaceUrl, + ); + + await scraper.sendTweet(`${config.title} ${spaceUrl}`); + console.log('[Test] Tweet sent'); + + // 4) Listen to events + space.on('occupancyUpdate', (upd) => { + console.log( + '[Test] Occupancy =>', + upd.occupancy, + 'participants =>', + upd.totalParticipants, + ); + }); + space.on('speakerRequest', async (req) => { + console.log('[Test] Speaker request =>', req); + await space.approveSpeaker(req.userId, req.sessionUUID); + }); + space.on('error', (err) => { + console.error('[Test] Space Error =>', err); + }); + + // ================================================== + // BEEP GENERATION (500 ms) @16kHz => 8000 samples + // ================================================== + const beepDurationMs = 500; + const sampleRate = 16000; + const totalSamples = (sampleRate * beepDurationMs) / 1000; // 8000 + const beepFull = new Int16Array(totalSamples); + + // Sine wave: 440Hz, amplitude ~12000 + const freq = 440; + const amplitude = 12000; + for (let i = 0; i < beepFull.length; i++) { + const t = i / sampleRate; + beepFull[i] = amplitude * Math.sin(2 * Math.PI * freq * t); + } + + const FRAME_SIZE = 160; + /** + * Send a beep by slicing beepFull into frames of 160 samples + */ + async function sendBeep() { + console.log('[Test] Starting beep...'); + for (let offset = 0; offset < beepFull.length; offset += FRAME_SIZE) { + // subarray => simple "view" + const portion = beepFull.subarray(offset, offset + FRAME_SIZE); + + // Make a real copy + const frame = new Int16Array(FRAME_SIZE); + frame.set(portion); + + // Now frame.length = 160, and frame.byteLength = 320 + space.pushAudio(frame, sampleRate); + + await new Promise((r) => setTimeout(r, 10)); + } + console.log('[Test] Finished beep'); + } + + // 5) Send beep every 5s + // setInterval(() => { + // sendBeep().catch((err) => console.error('[Test] beep error =>', err)); + // }, 5000); + + console.log('[Test] Space is running... press Ctrl+C to exit.'); + + // Graceful shutdown + process.on('SIGINT', async () => { + console.log('\n[Test] Caught interrupt signal, stopping...'); + await space.stop(); + console.log('[Test] Space stopped. Bye!'); + process.exit(0); + }); +} + +main().catch((err) => { + console.error('[Test] Unhandled main error =>', err); + process.exit(1); +}); diff --git a/src/spaces/types.ts b/src/spaces/types.ts new file mode 100644 index 0000000..55a0416 --- /dev/null +++ b/src/spaces/types.ts @@ -0,0 +1,79 @@ +// src/types.ts + +import { Space } from './core/Space'; + +export interface AudioData { + bitsPerSample: number; // e.g., 16 + sampleRate: number; // e.g., 48000 + channelCount: number; // e.g., 1 for mono, 2 for stereo + numberOfFrames: number; // how many samples per channel + samples: Int16Array; // the raw PCM data +} + +export interface AudioDataWithUser extends AudioData { + userId: string; // The ID of the speaker or user +} + +export interface SpeakerRequest { + userId: string; + username: string; + displayName: string; + sessionUUID: string; +} + +export interface OccupancyUpdate { + occupancy: number; + totalParticipants: number; +} + +export interface SpaceConfig { + mode: 'BROADCAST' | 'LISTEN' | 'INTERACTIVE'; + title?: string; + description?: string; + languages?: string[]; +} + +export interface BroadcastCreated { + room_id: string; + credential: string; + stream_name: string; + webrtc_gw_url: string; + broadcast: { + user_id: string; + twitter_id: string; + media_key: string; // often needed for stream status + }; + access_token: string; + endpoint: string; + share_url: string; + stream_url: string; +} + +export interface TurnServersInfo { + ttl: string; + username: string; + password: string; + uris: string[]; +} + +export interface Plugin { + /** + * onAttach is called immediately when .use(plugin) is invoked, + * passing the Space instance (if needed for immediate usage). + */ + onAttach?(space: Space): void; + + /** + * init is called once the Space has *fully* initialized (Janus, broadcast, etc.) + * so the plugin can get references to Janus or final config, etc. + */ + init?(params: { space: Space; pluginConfig?: Record }): void; + + onAudioData(data: AudioDataWithUser): void; + cleanup?(): void; +} + +export interface PluginRegistration { + plugin: Plugin; + config?: Record; +} diff --git a/src/spaces/utils.ts b/src/spaces/utils.ts new file mode 100644 index 0000000..815e58c --- /dev/null +++ b/src/spaces/utils.ts @@ -0,0 +1,149 @@ +// src/utils.ts + +import { Headers } from 'headers-polyfill'; +import type { BroadcastCreated, TurnServersInfo } from './types'; + +export async function authorizeToken(cookie: string): Promise { + const headers = new Headers({ + 'X-Periscope-User-Agent': 'Twitter/m5', + 'Content-Type': 'application/json', + 'X-Idempotence': Date.now().toString(), + Referer: 'https://x.com/', + 'X-Attempt': '1', + }); + + const resp = await fetch('https://proxsee.pscp.tv/api/v2/authorizeToken', { + method: 'POST', + headers, + body: JSON.stringify({ + service: 'guest', + cookie: cookie, + }), + }); + + if (!resp.ok) { + throw new Error(`Failed to authorize token => ${resp.status}`); + } + + const data = (await resp.json()) as { authorization_token: string }; + if (!data.authorization_token) { + throw new Error('authorizeToken: Missing authorization_token in response'); + } + + return data.authorization_token; +} + +export async function publishBroadcast(params: { + title: string; + broadcast: BroadcastCreated; + cookie: string; + janusSessionId?: number; + janusHandleId?: number; + janusPublisherId?: number; +}) { + const headers = new Headers({ + 'X-Periscope-User-Agent': 'Twitter/m5', + 'Content-Type': 'application/json', + Referer: 'https://x.com/', + 'X-Idempotence': Date.now().toString(), + 'X-Attempt': '1', + }); + + await fetch('https://proxsee.pscp.tv/api/v2/publishBroadcast', { + method: 'POST', + headers, + body: JSON.stringify({ + accept_guests: true, + broadcast_id: params.broadcast.room_id, + webrtc_handle_id: params.janusHandleId, + webrtc_session_id: params.janusSessionId, + janus_publisher_id: params.janusPublisherId, + janus_room_id: params.broadcast.room_id, + cookie: params.cookie, + status: params.title, + conversation_controls: 0, + }), + }); +} + +export async function getTurnServers(cookie: string): Promise { + const headers = new Headers({ + 'X-Periscope-User-Agent': 'Twitter/m5', + 'Content-Type': 'application/json', + Referer: 'https://x.com/', + 'X-Idempotence': Date.now().toString(), + 'X-Attempt': '1', + }); + + const resp = await fetch('https://proxsee.pscp.tv/api/v2/turnServers', { + method: 'POST', + headers, + body: JSON.stringify({ cookie }), + }); + if (!resp.ok) throw new Error('Failed to get turn servers => ' + resp.status); + return resp.json(); +} + +/** + * Get region from signer.pscp.tv + */ +export async function getRegion(): Promise { + const resp = await fetch('https://signer.pscp.tv/region', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Referer: 'https://x.com', + }, + body: JSON.stringify({}), + }); + if (!resp.ok) { + throw new Error(`Failed to get region => ${resp.status}`); + } + const data = (await resp.json()) as { region: string }; + return data.region; +} + +/** + * Create broadcast on Periscope + */ +export async function createBroadcast(params: { + description?: string; + languages?: string[]; + cookie: string; + region: string; +}): Promise { + const headers = new Headers({ + 'X-Periscope-User-Agent': 'Twitter/m5', + 'Content-Type': 'application/json', + 'X-Idempotence': Date.now().toString(), + Referer: 'https://x.com/', + 'X-Attempt': '1', + }); + + const resp = await fetch('https://proxsee.pscp.tv/api/v2/createBroadcast', { + method: 'POST', + headers, + body: JSON.stringify({ + app_component: 'audio-room', + content_type: 'visual_audio', + cookie: params.cookie, + conversation_controls: 0, + description: params.description || '', + height: 1080, + is_360: false, + is_space_available_for_replay: false, + is_webrtc: true, + languages: params.languages ?? [], + region: params.region, + width: 1920, + }), + }); + + if (!resp.ok) { + const text = await resp.text(); + throw new Error(`Failed to create broadcast => ${resp.status} ${text}`); + } + + const data = await resp.json(); + return data as BroadcastCreated; +} diff --git a/src/types/spaces.ts b/src/types/spaces.ts new file mode 100644 index 0000000..ff2731b --- /dev/null +++ b/src/types/spaces.ts @@ -0,0 +1,263 @@ +/** + * Represents a Community that can host Spaces. + */ +export interface Community { + id: string; + name: string; + rest_id: string; +} + +/** + * Represents the response structure for the CommunitySelectQuery. + */ +export interface CommunitySelectQueryResponse { + data: { + space_hostable_communities: Community[]; + }; + errors?: any[]; +} + +/** + * Represents a Subtopic within a Category. + */ +export interface Subtopic { + icon_url: string; + name: string; + topic_id: string; +} + +/** + * Represents a Category containing multiple Subtopics. + */ +export interface Category { + icon: string; + name: string; + semantic_core_entity_id: string; + subtopics: Subtopic[]; +} + +/** + * Represents the data structure for BrowseSpaceTopics. + */ +export interface BrowseSpaceTopics { + categories: Category[]; +} + +/** + * Represents the response structure for the BrowseSpaceTopics query. + */ +export interface BrowseSpaceTopicsResponse { + data: { + browse_space_topics: BrowseSpaceTopics; + }; + errors?: any[]; +} + +/** + * Represents the result details of a Creator. + */ +export interface CreatorResult { + __typename: string; + id: string; + rest_id: string; + affiliates_highlighted_label: Record; + has_graduated_access: boolean; + is_blue_verified: boolean; + profile_image_shape: string; + legacy: { + following: boolean; + can_dm: boolean; + can_media_tag: boolean; + created_at: string; + default_profile: boolean; + default_profile_image: boolean; + description: string; + entities: { + description: { + urls: any[]; + }; + }; + fast_followers_count: number; + favourites_count: number; + followers_count: number; + friends_count: number; + has_custom_timelines: boolean; + is_translator: boolean; + listed_count: number; + location: string; + media_count: number; + name: string; + needs_phone_verification: boolean; + normal_followers_count: number; + pinned_tweet_ids_str: string[]; + possibly_sensitive: boolean; + profile_image_url_https: string; + profile_interstitial_type: string; + screen_name: string; + statuses_count: number; + translator_type: string; + verified: boolean; + want_retweets: boolean; + withheld_in_countries: string[]; + }; + tipjar_settings: Record; +} + +/** + * Represents user results within an Admin. + */ +export interface UserResults { + rest_id: string; + result: { + __typename: string; + identity_profile_labels_highlighted_label: Record; + is_blue_verified: boolean; + legacy: Record; + }; +} + +/** + * Represents an Admin participant in an Audio Space. + */ +export interface Admin { + periscope_user_id: string; + start: number; + twitter_screen_name: string; + display_name: string; + avatar_url: string; + is_verified: boolean; + is_muted_by_admin: boolean; + is_muted_by_guest: boolean; + user_results: UserResults; +} + +/** + * Represents Participants in an Audio Space. + */ +export interface Participants { + total: number; + admins: Admin[]; + speakers: any[]; + listeners: any[]; +} + +/** + * Represents Metadata of an Audio Space. + */ +export interface Metadata { + rest_id: string; + state: string; + media_key: string; + created_at: number; + started_at: number; + ended_at: string; + updated_at: number; + content_type: string; + creator_results: { + result: CreatorResult; + }; + conversation_controls: number; + disallow_join: boolean; + is_employee_only: boolean; + is_locked: boolean; + is_muted: boolean; + is_space_available_for_clipping: boolean; + is_space_available_for_replay: boolean; + narrow_cast_space_type: number; + no_incognito: boolean; + total_replay_watched: number; + total_live_listeners: number; + tweet_results: Record; + max_guest_sessions: number; + max_admin_capacity: number; +} + +/** + * Represents Sharings within an Audio Space. + */ +export interface Sharings { + items: any[]; + slice_info: Record; +} + +/** + * Represents an Audio Space. + */ +export interface AudioSpace { + metadata: Metadata; + is_subscribed: boolean; + participants: Participants; + sharings: Sharings; +} + +/** + * Represents the response structure for the AudioSpaceById query. + */ +export interface AudioSpaceByIdResponse { + data: { + audioSpace: AudioSpace; + }; + errors?: any[]; +} + +/** + * Represents the variables required for the AudioSpaceById query. + */ +export interface AudioSpaceByIdVariables { + id: string; + isMetatagsQuery: boolean; + withReplays: boolean; + withListeners: boolean; +} + +export interface LiveVideoSource { + location: string; + noRedirectPlaybackUrl: string; + status: string; + streamType: string; +} + +export interface LiveVideoStreamStatus { + source: LiveVideoSource; + sessionId: string; + chatToken: string; + lifecycleToken: string; + shareUrl: string; + chatPermissionType: string; +} + +export interface AuthenticatePeriscopeResponse { + data: { + authenticate_periscope: string; + }; + errors?: any[]; +} + +export interface LoginTwitterTokenResponse { + cookie: string; + user: { + class_name: string; + id: string; + created_at: string; + is_beta_user: boolean; + is_employee: boolean; + is_twitter_verified: boolean; + verified_type: number; + is_bluebird_user: boolean; + twitter_screen_name: string; + username: string; + display_name: string; + description: string; + profile_image_urls: { + url: string; + ssl_url: string; + width: number; + height: number; + }[]; + twitter_id: string; + initials: string; + n_followers: number; + n_following: number; + }; + type: string; +}