Skip to content

Commit

Permalink
implement a fetch task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
hfxbse committed May 4, 2024
1 parent 55b1acf commit de41946
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 119 deletions.
218 changes: 99 additions & 119 deletions src/instagram/follower.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,19 @@ function addFollowerToGraph({graph, followers, done, target, controller}: {
})
}

function addFollowingToGraph({graph, following, done, task, controller}: {
function addFollowingToGraph({graph, following, done, target, controller}: {
graph: UnsettledUserGraph,
following: UnsettledUser[],
done: Set<number>,
task: number,
target: number,
controller: ReadableStreamDefaultController<FollowerFetcherEvent>
},) {
if (!graph[target].followingCount) graph[target].followingCount = 0
graph[target].followingCount += following.length

following.filter(following => graph[following.id] !== undefined).forEach(user => addFollowerToGraph({
graph,
followers: [graph[task]],
followers: [graph[target]],
done,
controller,
target: user.id
Expand All @@ -133,14 +136,14 @@ function addFollowingToGraph({graph, following, done, task, controller}: {
following.filter(following => graph[following.id] === undefined).forEach(user => {
graph[user.id] = {
...user,
followerIds: [task]
followerIds: [target]
};

controller.enqueue({
graph: {...graph},
type: FollowerFetcherEventTypes.UPDATE,
user,
added: {users: [user], progress: {done: done.size}, followers: [task]}
added: {users: [user], progress: {done: done.size}, followers: [target]}
})
})
}
Expand Down Expand Up @@ -196,6 +199,8 @@ function excess(current: number, limit: number, addition: any[]) {
return addition.slice(addition.length - (current - limit))
}

type Task = { job: () => Promise<FollowerPage>, user: UnsettledUser, noWait?: boolean }

async function createFollowerGraph({controller, limits, graph, session, includeFollowing}: {
controller: ReadableStreamDefaultController<FollowerFetcherEvent>,
graph: UnsettledUserGraph,
Expand All @@ -207,119 +212,79 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
let phase = 0

for (let gen = 0; gen <= limits.depth.generations && !graph.canceled; ++gen) {
const open = Object.values(graph)
// create tasks for each uncompleted user, and put new jobs at the end of the queue, creating a more
// meaning full breath first algorithm
const taskQueue: Task[] = Object.values(graph)
.filter(user => !done.has(user.id))
.map(user => user.id)

if (open.length < 1 || graph.canceled) break; // no open task, skip remaining generations

while (open.length > 0 && !graph.canceled) {
const taskCount = Math.min(Math.floor(limits.rate.batch.size / 100), limits.rate.parallelTasks)
const tasks = open.splice(0, taskCount < 1 ? 1 : taskCount).map(async task => {
graph[task].followerIds = graph[task].followerIds ?? []

const followers = async () => {
let nextPage = undefined

while (nextPage !== null && !graph.canceled) {
const newPhase = gen === 0 ? 0 : await rateLimiter({
graph,
user: graph[task],
phase,
limits: limits,
taskCount: taskCount,
controller,
})

const followers = await fetchFollowers({
session,
targetUser: graph[task],
nextPage,
limits,
direction: FollowerDirection.FOLLOWER
})

addFollowerToGraph({graph, followers: followers.page, done, target: task, controller})

nextPage = followers.nextPage
phase = newPhase

const userFollowerCount = graph[task].followerIds.length;
if (limits.depth.followers > 0 && userFollowerCount >= limits.depth.followers) {
excess(userFollowerCount, limits.depth.followers, followers.page)
.forEach(user => done.add(user.id))

controller.enqueue({
type: FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER,
user: graph[task],
graph,
amount: userFollowerCount
})
break;
}
}
.reduce((tasks, user): Task[] => {
tasks.push({
job: () => fetchFollowers({session, user, limits, direction: FollowerDirection.FOLLOWER}),
user,
noWait: true
})

if (includeFollowing) {
tasks.push({
job: () => fetchFollowers({session, user, limits, direction: FollowerDirection.FOLLOWING}),
user,
noWait: true
})
}

const following = async () => {
let nextPage = undefined
let followingCount = 0

while (nextPage !== null && !graph.canceled) {
const newPhase = gen === 0 ? 0 : await rateLimiter({
graph,
user: graph[task],
phase,
taskCount: taskCount,
limits,
controller
})

const following = await fetchFollowers({
session,
targetUser: graph[task],
nextPage,
limits,
direction: FollowerDirection.FOLLOWING
})

addFollowingToGraph({
graph,
following: following.page,
done,
controller,
task: graph[task].id
})

followingCount += following.page.length
phase = newPhase

if (limits.depth.followers > 0 && followingCount >= limits.depth.followers) {
excess(followingCount, limits.depth.followers, following.page)
.forEach(user => done.add(user.id))

controller.enqueue({
type: FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING,
user: graph[task],
graph: {...graph},
amount: followingCount
})
break;
}
return tasks
}, [])

if (taskQueue.length < 1 || graph.canceled) break; // no open task, skip remaining generations

while (taskQueue.length > 0) {
// Users per response: followers = 25, following = 200
const maxParallel = Math.min(Math.floor(limits.rate.batch.size / 225), limits.rate.parallelTasks)

const running = taskQueue.splice(0, Math.max(maxParallel, 1)).map(async (task) => {
if (!task.job) return

if (!task.noWait) phase = await rateLimiter({
graph,
user: task.user,
phase,
limits,
controller,
taskCount: Math.max(Math.min(maxParallel, taskQueue.length), 1)
})

const result = await task.job()
const user = graph[task.user.id]

nextPage = following.nextPage;
if (result.direction === FollowerDirection.FOLLOWER) {
addFollowerToGraph({graph, followers: result.page, done, controller, target: task.user.id})

if (!limits.depth.followers || user.followerIds.length <= limits.depth.followers) {
taskQueue.push({user, job: result.next})
return
}
}
} else if (result.direction === FollowerDirection.FOLLOWING) {
addFollowingToGraph({graph, following: result.page, done, controller, target: task.user.id})

try {
await Promise.all([followers(), (includeFollowing ? following() : Promise.resolve())])
} catch (e) {
controller.error(e)
if (!limits.depth.followers || (user.followingCount ?? 0) <= limits.depth.followers) {
taskQueue.push({user, job: result.next})
return
}
}

done.add(task);
});
const followers = result.direction === FollowerDirection.FOLLOWER;
const amount = followers ? user.followerIds.length : user.followingCount

excess(amount, limits.depth.followers, result.page).forEach(user => done.add(user.id))

await Promise.all(tasks)
controller.enqueue({
type: followers ? FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWER : FollowerFetcherEventTypes.DEPTH_LIMIT_FOLLOWING,
user: user,
graph: {...graph},
amount
})
})

await Promise.all(running)
}
}

Expand All @@ -330,13 +295,19 @@ enum FollowerDirection {
FOLLOWER, FOLLOWING
}

async function fetchFollowers({session, targetUser, nextPage, direction, limits}: {
session: SessionData, targetUser: UnsettledUser, nextPage?: string, direction: FollowerDirection, limits: Limits
}): Promise<{ page: UnsettledUser[], nextPage: string }> {
const query = nextPage ? `?max_id=${nextPage}` : '';
type FollowerPage = { page: UnsettledUser[], next: null | (() => Promise<FollowerPage>), direction: FollowerDirection }

async function fetchFollowers({session, user, page, direction, limits}: {
session: SessionData,
user: UnsettledUser,
page?: undefined | string | null,
direction: FollowerDirection,
limits: Limits
}): Promise<FollowerPage> {
const query = page ? `?max_id=${page}` : '';
const directionPath = direction === FollowerDirection.FOLLOWING ? 'following' : 'followers'

const response = await fetch(`https://www.instagram.com/api/v1/friendships/${targetUser.id}/${directionPath}/${query}`, {
const response = await fetch(`https://www.instagram.com/api/v1/friendships/${user.id}/${directionPath}/${query}`, {
headers: {
"Sec-Fetch-Site": "same-origin",
"X-IG-App-ID": "936619743392459",
Expand All @@ -362,7 +333,7 @@ async function fetchFollowers({session, targetUser, nextPage, direction, limits}
}
}

const page = (await response.json()) as {
const result = (await response.json()) as {
users: {
id: string,
full_name: string,
Expand All @@ -374,9 +345,12 @@ async function fetchFollowers({session, targetUser, nextPage, direction, limits}
}

return {
page: page.users.map((user) => {
direction,
page: result.users.map((user) => {
const id = parseInt(user.id, 10)

return {
id: parseInt(user.id, 10),
id,
profile: {
username: user.username,
name: user.full_name,
Expand All @@ -386,9 +360,15 @@ async function fetchFollowers({session, targetUser, nextPage, direction, limits}
}).delay.then(() => downloadProfilePicture(user.profile_pic_url))
},
public: !user.is_private,
private: user.is_private && targetUser.id != session.user.id
private: user.is_private && id != session.user.id
}
}),
nextPage: page.next_max_id ?? null
next: result.next_max_id ? () => fetchFollowers({
session,
user: user,
page: result.next_max_id,
direction,
limits
}) : null
}
}
2 changes: 2 additions & 0 deletions src/instagram/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export interface User {
image: string
},
followerIds?: number[],
followingCount?: number
private?: boolean,
public: boolean,
personal?: boolean
Expand All @@ -21,6 +22,7 @@ export interface UnsettledUser {
image: Promise<Blob> | null,
}
followerIds?: number[],
followingCount?: number
private?: boolean,
public: boolean,
personal?: boolean
Expand Down

0 comments on commit de41946

Please sign in to comment.