Skip to content

Commit

Permalink
write graph into file
Browse files Browse the repository at this point in the history
  • Loading branch information
hfxbse committed May 1, 2024
1 parent 81857f4 commit 6fcff2a
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 58 deletions.
138 changes: 89 additions & 49 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import {
VerificationData,
verify2FA
} from "./instagram/login";
import {FollowerFetcherEventTypes, getFollowerGraph, printGraph} from "./instagram/follower";
import {FollowerFetcherEvent, FollowerFetcherEventTypes, getFollowerGraph, printGraph} from "./instagram/follower";
import SessionData from "./instagram/session-data";
import {fetchUser, UserGraph} from "./instagram/user";
import {fetchUser, User, UserGraph} from "./instagram/user";
import {writeFileSync} from "node:fs";
import {ReadableStream} from "node:stream/web";


async function authenticate(): Promise<SessionData> {
Expand Down Expand Up @@ -111,9 +113,9 @@ async function wholeNumberPrompt({message, defaultValue}: { message: string, def
}).then(input => parseInt(input, 10))
}

let graph: UserGraph = {}
async function settleGraph(graph: UserGraph) {
delete graph["canceled"]

async function dumpGraph(graph: UserGraph) {
const downloads = Object.values(graph).map(async user => {
return {
...user,
Expand All @@ -133,12 +135,84 @@ async function dumpGraph(graph: UserGraph) {
}
})

const dump: UserGraph = (await Promise.all(downloads)).reduce((graph, user) => {
const settled: UserGraph = (await Promise.all(downloads)).reduce((graph, user) => {
graph[user.id] = user
return graph
}, {})

return JSON.stringify(dump)
return settled
}

const writeGraphToFile = async (root: User, graph: UserGraph) => {
const filename = `${root.id}:${root.profile.username}:${new Date().toISOString()}.json`
const data = await settleGraph(graph)

try {
writeFileSync(filename, JSON.stringify(data, null, 2))
console.log(`Wrote graph into ${filename}.`)
} catch (error) {
console.error({message: `Cannot write graph into ${filename}. Using stdout instead.`, error})
await new Promise(resolve => setTimeout(() => {
console.log(JSON.stringify(data));
resolve(undefined);
}, 500))
}

return filename
}

async function streamGraph(stream: ReadableStream<FollowerFetcherEvent>) {
let graph: UserGraph = {}
let cancellation: Promise<void>

const reader = stream.getReader()

process.on('SIGINT', () => {
console.info("Process will terminate as soon as it is cleanly possible.")
reader.releaseLock()
stream.cancel();
});

try {
while (stream.locked) {
const {done, value} = await reader.read()
if (done) break;

graph = value.graph

const identifier = `(User: ${value.user.profile.username})`

if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER) {
console.log(`Reached the maximum amount of followers to include. Currently included are ${value.amount}. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING) {
console.log(`Reached the maximum amount of followed users to include. Currently included are ${value.amount}. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_BATCH) {
printGraph(value.graph)
console.log(`Reached follower batch limit. Resuming after ${value.delay} milliseconds. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_DAILY) {
printGraph(value.graph)
console.log(`Reached follower daily limit. Resuming after ${value.delay} milliseconds. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.UPDATE) {
const total = Object.entries(value.graph).length
const followers = value.added.followers.length;
const users = value.added.users.length

console.log(
`Added ${followers > 0 ? followers : 'no'} follower${followers > 1 ? 's' : ''} to ${value.user.profile.username}. ` +
`Discovered ${users > 0 ? users : 'no'} new user${users > 1 ? 's' : ''}. ` +
`Total user count: ${total}, completely queried users ${value.added.progress.done}.`
)
}
}
} catch (e) {
if (stream.locked) {
reader.releaseLock()
cancellation = stream.cancel()
console.error(e)
}
}

return {graph, cancellation}
}


Expand All @@ -163,7 +237,7 @@ try {

const includeFollowing = await prompt.confirm({message: "Include following?", default: true})

const reader = getFollowerGraph({
const stream = getFollowerGraph({
includeFollowing,
root,
session,
Expand Down Expand Up @@ -191,51 +265,17 @@ try {
}
}
}
}).getReader()

process.on('SIGINT', async () => {
console.log(await dumpGraph(graph))
printGraph(graph)
process.exit(0)
});

while (true) {
const {done, value} = await reader.read()
if (done) break;

graph = value.graph

const identifier = `(User: ${value.user.profile.username})`

if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER) {
console.log(`Reached the maximum amount of followers to include. Currently included are ${value.amount}. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING) {
console.log(`Reached the maximum amount of followed users to include. Currently included are ${value.amount}. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_BATCH) {
printGraph(value.graph)
console.log(`Reached follower batch limit. Resuming after ${value.delay} milliseconds. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.RATE_LIMIT_DAILY) {
printGraph(value.graph)
console.log(`Reached follower daily limit. Resuming after ${value.delay} milliseconds. ${identifier}`)
} else if (value.type === FollowerFetcherEventTypes.UPDATE) {
const total = Object.entries(value.graph).length
const followers = value.added.followers.length;
const users = value.added.users.length

console.log(
`Added ${followers > 0 ? followers : 'no'} follower${followers > 1 ? 's' : ''} to ${value.user.profile.username}. ` +
`Discovered ${users > 0 ? users : 'no'} new user${users > 1 ? 's' : ''}. ` +
`Total user count: ${total}, completely queried users ${value.added.progress.done}.`
)
}
}
})

printGraph(graph)
const {graph, cancellation} = await streamGraph(stream)
await Promise.all([writeGraphToFile(root, graph).then(() => {
console.info(
"The may process still needs to wait on the rate limiting timeouts to exit cleanly. " +
"Killing it should not cause any data lose."
)
}), cancellation])
} catch (e) {
if (!(e instanceof ExitPromptError)) {
console.error(e)
}
}

console.log(await dumpGraph(graph))
printGraph(graph)
22 changes: 14 additions & 8 deletions src/instagram/follower.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ export function getFollowerGraph({root, session, limits, includeFollowing}: {
}): ReadableStream<FollowerFetcherEvent> {
const graph: UserGraph = {[root.id]: root}

let controller: ReadableStreamDefaultController<FollowerFetcherEvent>

return new ReadableStream<FollowerFetcherEvent>({
async start(controller: ReadableStreamDefaultController<FollowerFetcherEvent>) {
start: async (c: ReadableStreamDefaultController<FollowerFetcherEvent>) => {
controller = c

if (root.private) {
controller.enqueue({
type: FollowerFetcherEventTypes.UPDATE,
Expand All @@ -188,8 +192,11 @@ export function getFollowerGraph({root, session, limits, includeFollowing}: {
}

await createFollowerGraph({limits, graph, session, controller, includeFollowing});
return controller.close();
controller.close();
},
cancel: async () => {
graph.canceled = true
}
})
}

Expand All @@ -207,22 +214,22 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
const done: Set<number> = new Set()
let phase = 0

for (let i = 0; i <= limits.depth.generations; i++) {
for (let i = 0; i <= limits.depth.generations && !graph.canceled; ++i) {
const open = Object.values(graph)
.filter(user => !done.has(user.id))
.map(user => user.id)

if (open.length < 1) break; // no open task, skip remaining generations
if (open.length < 1 || graph.canceled) break; // no open task, skip remaining generations

while (open.length > 0) {
while (open.length > 0 && !graph.canceled) {
const batchSize = Math.floor(limits.rate.batchSize / 100)
const batch = open.splice(0, batchSize < 1 ? 1 : batchSize).map(async task => {
graph[task].followerIds = graph[task].followerIds ?? []

const followers = async () => {
let nextPage = undefined

while (nextPage !== null) {
while (nextPage !== null && !graph.canceled) {
const followers = await fetchFollowers({
session,
targetUser: graph[task],
Expand Down Expand Up @@ -264,7 +271,7 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
let nextPage = undefined
let followingCount = 0

while (nextPage !== null) {
while (nextPage !== null && !graph.canceled) {
const following = await fetchFollowers({
session,
targetUser: graph[task],
Expand All @@ -281,7 +288,6 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
task: graph[task].id
})


phase = await rateLimiter({
graph,
user: graph[task],
Expand Down
4 changes: 3 additions & 1 deletion src/instagram/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ export interface User {
personal?: boolean
}

export type UserGraph = Record<number, User>;
export interface UserGraph extends Record<number, User> {
canceled?: boolean
}

export async function fetchUser(username: string, session?: SessionData): Promise<User> {
const response = await fetch(`https://www.instagram.com/api/v1/users/web_profile_info/?username=${username}`, {
Expand Down

0 comments on commit 6fcff2a

Please sign in to comment.