Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE][drci] Async fetch PR info during reorganizeWorkflows for speed #6380

Merged
merged 5 commits into from
Mar 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 56 additions & 59 deletions torchci/pages/api/drci/drci.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1110,69 +1110,63 @@ export async function reorganizeWorkflows(
workflow.name,
])
);
const workflowsByPR: Map<number, PRandJobs> = new Map();
const headShaTimestamps: Map<string, string> = new Map();

for (const workflow of dedupedRecentWorkflows) {
const prNumber = workflow.pr_number;
if (!workflowsByPR.has(prNumber)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this version check for the prNumber and only apply the logic to the entries missing it, while the new version doesn't need it and runs the code against all entries?

Copy link
Contributor Author

@clee2000 clee2000 Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR regroups and reorders things so that the slow fetches can happen outside of the sequential for loop. The same number of api calls should happen, they just happen at a different time. Here's some pseudo code that kinda explains whats going on

Goal: construct map<pr number, pr info and job list>
Old:

for each workflow (sequential because of for loop without async modifier):
  add to map
  if workflow's pr hasn't been seen yet (new key), fetch pr info and maybe info about the commit (slow)

New:

for each workflow (sequential):
  add to map
for each key (pr number) in map (technically also sequential, but the mapped function is async, creating promises which are all awaited together):
  fetch pr info and maybe info about commit 

let headShaTimestamp = workflow.head_sha_timestamp;
// NB: The head SHA timestamp is currently used as the end date when
// searching for similar failures. However, it's not available on CH for
// commits from forked PRs before a ciflow ref is pushed. In such case,
// the head SHA timestamp will be undefined and we will make an additional
// query to GitHub to get the value
if (octokit && isTime0(headShaTimestamp)) {
headShaTimestamp = await fetchCommitTimestamp(
octokit,
owner,
repo,
workflow.head_sha
);
headShaTimestamps.set(workflow.head_sha, headShaTimestamp);
}

let prTitle = "";
let prBody = "";
let prShas: { sha: string; title: string }[] = [];
// Gate this to PyTorch as disabled tests feature is only available there
if (octokit && repo === "pytorch") {
const prData = await fetchPR(owner, repo, `${prNumber}`, octokit);
prTitle = prData.title;
prBody = prData.body;
prShas = prData.shas;
}

workflowsByPR.set(prNumber, {
pr_number: prNumber,
head_sha: workflow.head_sha,
head_sha_timestamp: headShaTimestamp,
jobs: [],
merge_base: "",
merge_base_date: "",
owner: owner,
repo: repo,
title: prTitle,
body: prBody,
shas: prShas,
});
}
const workflowsByPR: PRandJobs[] = await Promise.all(
_(dedupedRecentWorkflows)
.groupBy("pr_number")
.map(async (workflows, prNumber) => {
// NB: The head SHA timestamp is currently used as the end date when
// searching for similar failures. However, it's not available on CH for
// commits from forked PRs before a ciflow ref is pushed. In such case,
// the head SHA timestamp will be undefined and we will make an additional
// query to GitHub to get the value
let headShaTimestamp = workflows.find(
(workflow) => !isTime0(workflow.head_sha_timestamp)
)?.head_sha_timestamp;
if (octokit && headShaTimestamp === undefined) {
headShaTimestamp = await fetchCommitTimestamp(
octokit,
owner,
repo,
workflows[0].head_sha
);
}
workflows.forEach((workflow) => {
if (isTime0(workflow.head_sha_timestamp) && headShaTimestamp) {
workflow.head_sha_timestamp = headShaTimestamp;
}
});

const headShaTimestamp = headShaTimestamps.get(workflow.head_sha);
if (
isTime0(workflow.head_sha_timestamp) &&
headShaTimestamp &&
!isTime0(headShaTimestamp)
) {
workflow.head_sha_timestamp = headShaTimestamp;
}
let prTitle = "";
let prBody = "";
let prShas: { sha: string; title: string }[] = [];
// Gate this to PyTorch as disabled tests feature is only available there
if (octokit && repo === "pytorch") {
const prData = await fetchPR(owner, repo, `${prNumber}`, octokit);
prTitle = prData.title;
prBody = prData.body;
prShas = prData.shas;
}

workflowsByPR.get(prNumber)!.jobs.push(workflow);
}
return {
pr_number: parseInt(prNumber),
head_sha: workflows[0].head_sha,
head_sha_timestamp: headShaTimestamp ?? "",
jobs: workflows,
merge_base: "",
merge_base_date: "",
owner: owner,
repo: repo,
title: prTitle,
body: prBody,
shas: prShas,
};
})
.value()
);

// clean up the workflows - remove retries, remove workflows that have jobs,
// remove cancelled jobs with weird names
for (const [, prInfo] of workflowsByPR) {
for (const prInfo of workflowsByPR) {
const [workflows, jobs] = _.partition(
prInfo.jobs,
(job) => job.workflowId === 0
Expand Down Expand Up @@ -1220,5 +1214,8 @@ export async function reorganizeWorkflows(
// Remove cancelled jobs with weird names
prInfo.jobs = removeCancelledJobAfterRetry<RecentWorkflowsData>(allJobs);
}
return workflowsByPR;
return workflowsByPR.reduce((acc, prInfo) => {
acc.set(prInfo.pr_number, prInfo);
return acc;
}, new Map<number, PRandJobs>());
}