Skip to content

Commit

Permalink
space parallel task starts apart via rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
hfxbse committed May 4, 2024
1 parent 52b7a35 commit 7ab363a
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions src/instagram/follower.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ function randomDelay(limit: RandomDelayLimit) {
}


async function rateLimiter({graph, user, phase, batchCount, limits, controller}: {
async function rateLimiter({graph, user, phase, taskCount, limits, controller}: {
graph: UnsettledUserGraph,
user: UnsettledUser,
phase: number,
batchCount: number
taskCount: number
limits: Limits,
controller: ReadableStreamDefaultController<FollowerFetcherEvent>
}) {
const phaseProgression = Math.floor(
Object.entries(graph).length / (limits.rate.batch.size - batchCount * 25)
Object.entries(graph).length / (limits.rate.batch.size - taskCount * 25)
)

if (phase < phaseProgression) {
Expand Down Expand Up @@ -206,22 +206,33 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
const done: Set<number> = new Set()
let phase = 0

for (let i = 0; i <= limits.depth.generations && !graph.canceled; ++i) {
for (let gen = 0; gen <= limits.depth.generations && !graph.canceled; ++gen) {
const open = Object.values(graph)
.filter(user => !done.has(user.id))
.map(user => user.id)

if (open.length < 1 || graph.canceled) break; // no open task, skip remaining generations

while (open.length > 0 && !graph.canceled) {
const batchSize = Math.min(Math.floor(limits.rate.batch.size / 100), limits.rate.parallelTasks)
const batch = open.splice(0, batchSize < 1 ? 1 : batchSize).map(async task => {
const taskCount = Math.min(Math.floor(limits.rate.batch.size / 100), limits.rate.parallelTasks)
const tasks = open.splice(0, taskCount < 1 ? 1 : taskCount).map(async task => {
graph[task].followerIds = graph[task].followerIds ?? []

const followers = async () => {
let nextPage = undefined

while (nextPage !== null && !graph.canceled) {
if (gen > 0) {
phase = await rateLimiter({
graph,
user: graph[task],
phase,
limits: limits,
taskCount: taskCount,
controller,
})
}

const followers = await fetchFollowers({
session,
targetUser: graph[task],
Expand All @@ -234,15 +245,6 @@ async function createFollowerGraph({controller, limits, graph, session, includeF

nextPage = followers.nextPage

phase = await rateLimiter({
graph,
user: graph[task],
phase,
limits: limits,
batchCount: batch.length,
controller,
})

const userFollowerCount = graph[task].followerIds.length;
if (limits.depth.followers > 0 && userFollowerCount >= limits.depth.followers) {
excess(userFollowerCount, limits.depth.followers, followers.page)
Expand All @@ -264,6 +266,17 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
let followingCount = 0

while (nextPage !== null && !graph.canceled) {
if (gen > 0) {
phase = await rateLimiter({
graph,
user: graph[task],
phase,
taskCount: taskCount,
limits,
controller
})
}

const following = await fetchFollowers({
session,
targetUser: graph[task],
Expand All @@ -280,15 +293,6 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
task: graph[task].id
})

phase = await rateLimiter({
graph,
user: graph[task],
phase,
batchCount: batch.length,
limits,
controller
})

followingCount += following.page.length

if (limits.depth.followers > 0 && followingCount >= limits.depth.followers) {
Expand All @@ -313,7 +317,7 @@ async function createFollowerGraph({controller, limits, graph, session, includeF
done.add(task);
});

await Promise.all(batch)
await Promise.all(tasks)
}
}

Expand Down

0 comments on commit 7ab363a

Please sign in to comment.