diff --git a/src/index.ts b/src/index.ts index 291606e..86e9a81 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,9 +9,11 @@ import { VerificationData, verify2FA } from "./instagram/login"; -import {FollowerFetcherEventTypes, getFollowerGraph, printGraph} from "./instagram/follower"; +import {FollowerFetcherEvent, FollowerFetcherEventTypes, getFollowerGraph, printGraph} from "./instagram/follower"; import SessionData from "./instagram/session-data"; -import {fetchUser, UserGraph} from "./instagram/user"; +import {fetchUser, User, UserGraph} from "./instagram/user"; +import {writeFileSync} from "node:fs"; +import {ReadableStream} from "node:stream/web"; async function authenticate(): Promise { @@ -111,9 +113,9 @@ async function wholeNumberPrompt({message, defaultValue}: { message: string, def }).then(input => parseInt(input, 10)) } -let graph: UserGraph = {} +async function settleGraph(graph: UserGraph) { + delete graph["canceled"] -async function dumpGraph(graph: UserGraph) { const downloads = Object.values(graph).map(async user => { return { ...user, @@ -133,12 +135,84 @@ async function dumpGraph(graph: UserGraph) { } }) - const dump: UserGraph = (await Promise.all(downloads)).reduce((graph, user) => { + const settled: UserGraph = (await Promise.all(downloads)).reduce((graph, user) => { graph[user.id] = user return graph }, {}) - return JSON.stringify(dump) + return settled +} + +const writeGraphToFile = async (root: User, graph: UserGraph) => { + const filename = `${root.id}:${root.profile.username}:${new Date().toISOString()}.json` + const data = await settleGraph(graph) + + try { + writeFileSync(filename, JSON.stringify(data, null, 2)) + console.log(`Wrote graph into ${filename}.`) + } catch (error) { + console.error({message: `Cannot write graph into ${filename}. Using stdout instead.`, error}) + await new Promise(resolve => setTimeout(() => { + console.log(JSON.stringify(data)); + resolve(undefined); + }, 500)) + } + + return filename +} + +async function streamGraph(stream: ReadableStream) { + let graph: UserGraph = {} + let cancellation: Promise + + const reader = stream.getReader() + + process.on('SIGINT', () => { + console.info("Process will terminate as soon as it is cleanly possible.") + reader.releaseLock() + stream.cancel(); + }); + + try { + while (stream.locked) { + const {done, value} = await reader.read() + if (done) break; + + graph = value.graph + + const identifier = `(User: ${value.user.profile.username})` + + if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER) { + console.log(`Reached the maximum amount of followers to include. Currently included are ${value.amount}. ${identifier}`) + } else if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING) { + console.log(`Reached the maximum amount of followed users to include. Currently included are ${value.amount}. ${identifier}`) + } else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_BATCH) { + printGraph(value.graph) + console.log(`Reached follower batch limit. Resuming after ${value.delay} milliseconds. ${identifier}`) + } else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_DAILY) { + printGraph(value.graph) + console.log(`Reached follower daily limit. Resuming after ${value.delay} milliseconds. ${identifier}`) + } else if (value.type === FollowerFetcherEventTypes.UPDATE) { + const total = Object.entries(value.graph).length + const followers = value.added.followers.length; + const users = value.added.users.length + + console.log( + `Added ${followers > 0 ? followers : 'no'} follower${followers > 1 ? 's' : ''} to ${value.user.profile.username}. ` + + `Discovered ${users > 0 ? users : 'no'} new user${users > 1 ? 's' : ''}. ` + + `Total user count: ${total}, completely queried users ${value.added.progress.done}.` + ) + } + } + } catch (e) { + if (stream.locked) { + reader.releaseLock() + cancellation = stream.cancel() + console.error(e) + } + } + + return {graph, cancellation} } @@ -163,7 +237,7 @@ try { const includeFollowing = await prompt.confirm({message: "Include following?", default: true}) - const reader = getFollowerGraph({ + const stream = getFollowerGraph({ includeFollowing, root, session, @@ -191,51 +265,17 @@ try { } } } - }).getReader() - - process.on('SIGINT', async () => { - console.log(await dumpGraph(graph)) - printGraph(graph) - process.exit(0) - }); - - while (true) { - const {done, value} = await reader.read() - if (done) break; - - graph = value.graph - - const identifier = `(User: ${value.user.profile.username})` - - if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER) { - console.log(`Reached the maximum amount of followers to include. Currently included are ${value.amount}. ${identifier}`) - } else if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING) { - console.log(`Reached the maximum amount of followed users to include. Currently included are ${value.amount}. ${identifier}`) - } else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_BATCH) { - printGraph(value.graph) - console.log(`Reached follower batch limit. Resuming after ${value.delay} milliseconds. ${identifier}`) - } else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_DAILY) { - printGraph(value.graph) - console.log(`Reached follower daily limit. Resuming after ${value.delay} milliseconds. ${identifier}`) - } else if (value.type === FollowerFetcherEventTypes.UPDATE) { - const total = Object.entries(value.graph).length - const followers = value.added.followers.length; - const users = value.added.users.length - - console.log( - `Added ${followers > 0 ? followers : 'no'} follower${followers > 1 ? 's' : ''} to ${value.user.profile.username}. ` + - `Discovered ${users > 0 ? users : 'no'} new user${users > 1 ? 's' : ''}. ` + - `Total user count: ${total}, completely queried users ${value.added.progress.done}.` - ) - } - } + }) - printGraph(graph) + const {graph, cancellation} = await streamGraph(stream) + await Promise.all([writeGraphToFile(root, graph).then(() => { + console.info( + "The may process still needs to wait on the rate limiting timeouts to exit cleanly. " + + "Killing it should not cause any data lose." + ) + }), cancellation]) } catch (e) { if (!(e instanceof ExitPromptError)) { console.error(e) } } - -console.log(await dumpGraph(graph)) -printGraph(graph) diff --git a/src/instagram/follower.ts b/src/instagram/follower.ts index 2b06234..552c665 100644 --- a/src/instagram/follower.ts +++ b/src/instagram/follower.ts @@ -167,8 +167,12 @@ export function getFollowerGraph({root, session, limits, includeFollowing}: { }): ReadableStream { const graph: UserGraph = {[root.id]: root} + let controller: ReadableStreamDefaultController + return new ReadableStream({ - async start(controller: ReadableStreamDefaultController) { + start: async (c: ReadableStreamDefaultController) => { + controller = c + if (root.private) { controller.enqueue({ type: FollowerFetcherEventTypes.UPDATE, @@ -188,8 +192,11 @@ export function getFollowerGraph({root, session, limits, includeFollowing}: { } await createFollowerGraph({limits, graph, session, controller, includeFollowing}); - return controller.close(); + controller.close(); }, + cancel: async () => { + graph.canceled = true + } }) } @@ -207,14 +214,14 @@ async function createFollowerGraph({controller, limits, graph, session, includeF const done: Set = new Set() let phase = 0 - for (let i = 0; i <= limits.depth.generations; i++) { + for (let i = 0; i <= limits.depth.generations && !graph.canceled; ++i) { const open = Object.values(graph) .filter(user => !done.has(user.id)) .map(user => user.id) - if (open.length < 1) break; // no open task, skip remaining generations + if (open.length < 1 || graph.canceled) break; // no open task, skip remaining generations - while (open.length > 0) { + while (open.length > 0 && !graph.canceled) { const batchSize = Math.floor(limits.rate.batchSize / 100) const batch = open.splice(0, batchSize < 1 ? 1 : batchSize).map(async task => { graph[task].followerIds = graph[task].followerIds ?? [] @@ -222,7 +229,7 @@ async function createFollowerGraph({controller, limits, graph, session, includeF const followers = async () => { let nextPage = undefined - while (nextPage !== null) { + while (nextPage !== null && !graph.canceled) { const followers = await fetchFollowers({ session, targetUser: graph[task], @@ -264,7 +271,7 @@ async function createFollowerGraph({controller, limits, graph, session, includeF let nextPage = undefined let followingCount = 0 - while (nextPage !== null) { + while (nextPage !== null && !graph.canceled) { const following = await fetchFollowers({ session, targetUser: graph[task], @@ -281,7 +288,6 @@ async function createFollowerGraph({controller, limits, graph, session, includeF task: graph[task].id }) - phase = await rateLimiter({ graph, user: graph[task], diff --git a/src/instagram/user.ts b/src/instagram/user.ts index 4614918..55f81ec 100644 --- a/src/instagram/user.ts +++ b/src/instagram/user.ts @@ -13,7 +13,9 @@ export interface User { personal?: boolean } -export type UserGraph = Record; +export interface UserGraph extends Record { + canceled?: boolean +} export async function fetchUser(username: string, session?: SessionData): Promise { const response = await fetch(`https://www.instagram.com/api/v1/users/web_profile_info/?username=${username}`, {