Skip to content

Commit 0cc5604

Browse files
authored
batchTriggerAndWait checkpoint race condition when at max concurrency (#1296)
* Ignore /packages/cli-v3/src/package.json * Added more logs when resuming a dependency, added the runId * A task for reproducing a race condition with checkpoints * Fix for doing remote image build when not self-hosting * Set team members, alerts and schedule limits to 100m for self-hosting * Import fix * Set the checkpointEventId in marqs when the checkpoint is created for batchTriggerAndWait This should fix a horrible race condition when at max concurrency
1 parent 67547d2 commit 0cc5604

9 files changed

+66
-10
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,5 @@ apps/**/public/build
5656
.trigger
5757
.tshy*
5858
.yarn
59-
*.tsbuildinfo
59+
*.tsbuildinfo
60+
/packages/cli-v3/src/package.json

apps/webapp/app/presenters/TeamPresenter.server.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { getTeamMembersAndInvites } from "~/models/member.server";
2-
import { BasePresenter } from "./v3/basePresenter.server";
32
import { getLimit } from "~/services/platform.v3.server";
3+
import { BasePresenter } from "./v3/basePresenter.server";
44

55
export class TeamPresenter extends BasePresenter {
66
public async call({ userId, organizationId }: { userId: string; organizationId: string }) {
@@ -13,7 +13,7 @@ export class TeamPresenter extends BasePresenter {
1313
return;
1414
}
1515

16-
const limit = await getLimit(organizationId, "teamMembers", 25);
16+
const limit = await getLimit(organizationId, "teamMembers", 100_000_000);
1717

1818
return {
1919
...result,

apps/webapp/app/presenters/v3/AlertChannelListPresenter.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class AlertChannelListPresenter extends BasePresenter {
4343
throw new Error(`Project not found: ${projectId}`);
4444
}
4545

46-
const limit = await getLimit(organization.organizationId, "alerts", 25);
46+
const limit = await getLimit(organization.organizationId, "alerts", 100_000_000);
4747

4848
return {
4949
alertChannels: await Promise.all(

apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ export class ScheduleListPresenter extends BasePresenter {
256256
};
257257
});
258258

259-
const limit = await getLimit(project.organizationId, "schedules", 500);
259+
const limit = await getLimit(project.organizationId, "schedules", 100_000_000);
260260

261261
return {
262262
currentPage: page,

apps/webapp/app/v3/services/checkSchedule.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export class CheckScheduleService extends BaseService {
7676
throw new ServiceValidationError("Project not found");
7777
}
7878

79-
const limit = await getLimit(project.organizationId, "schedules", 500);
79+
const limit = await getLimit(project.organizationId, "schedules", 100_000_000);
8080
const schedulesCount = await this._prisma.taskSchedule.count({
8181
where: {
8282
projectId,

apps/webapp/app/v3/services/createCheckpoint.server.ts

+10
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,16 @@ export class CreateCheckpointService extends BaseService {
258258
};
259259
}
260260

261+
//if there's a message in the queue, we make sure the checkpoint event is on it
262+
await marqs?.replaceMessage(
263+
attempt.taskRun.id,
264+
{
265+
checkpointEventId: checkpointEvent.id,
266+
},
267+
undefined,
268+
true
269+
);
270+
261271
await ResumeBatchRunService.enqueue(batchRun.id, this._prisma);
262272

263273
return {

apps/webapp/app/v3/services/initializeDeployment.server.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ export class InitializeDeploymentService extends BaseService {
2929
const nextVersion = calculateNextBuildVersion(latestDeployment?.version);
3030

3131
// Try and create a depot build and get back the external build data
32-
const externalBuildData = !!payload.selfHosted
33-
? await createRemoteImageBuild(environment.project)
34-
: undefined;
32+
const externalBuildData = payload.selfHosted
33+
? undefined
34+
: await createRemoteImageBuild(environment.project);
3535

3636
const triggeredBy = payload.userId
3737
? await this._prisma.user.findUnique({

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

+11
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ export class ResumeTaskDependencyService extends BaseService {
3939
const dependentRun = dependency.dependentAttempt.taskRun;
4040

4141
if (dependency.dependentAttempt.status === "PAUSED" && dependency.checkpointEventId) {
42+
logger.debug(
43+
"Task dependency resume: Attempt is paused and there's a checkpoint. Enqueuing resume with checkpoint.",
44+
{
45+
attemptId: dependency.id,
46+
dependentAttempt: dependency.dependentAttempt,
47+
checkpointEventId: dependency.checkpointEventId,
48+
hasCheckpointEvent: !!dependency.checkpointEventId,
49+
runId: dependentRun.id,
50+
}
51+
);
4252
await marqs?.enqueueMessage(
4353
dependency.taskRun.runtimeEnvironment,
4454
dependentRun.queue,
@@ -61,6 +71,7 @@ export class ResumeTaskDependencyService extends BaseService {
6171
dependentAttempt: dependency.dependentAttempt,
6272
checkpointEventId: dependency.checkpointEventId,
6373
hasCheckpointEvent: !!dependency.checkpointEventId,
74+
runId: dependentRun.id,
6475
});
6576

6677
if (dependency.dependentAttempt.status === "PAUSED" && !dependency.checkpointEventId) {

references/v3-catalog/src/trigger/checkpoints.ts

+35-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logger, task, wait } from "@trigger.dev/sdk/v3";
1+
import { logger, queue, task, wait } from "@trigger.dev/sdk/v3";
22

33
type Payload = {
44
count?: number;
@@ -70,6 +70,7 @@ export const nestedDependencies = task({
7070
maxDepth,
7171
waitSeconds,
7272
failAttemptChance,
73+
batchSize,
7374
});
7475
logger.log(`Triggered complete ${i + 1}/${batchSize}`);
7576

@@ -153,3 +154,36 @@ export const bulkPermanentlyFrozen = task({
153154
);
154155
},
155156
});
157+
158+
const oneAtATime = queue({
159+
name: "race-condition",
160+
concurrencyLimit: 1,
161+
});
162+
163+
export const raceConditionCheckpointDequeue = task({
164+
id: "race-condition-checkpoint-dequeue",
165+
queue: oneAtATime,
166+
run: async ({ isBatch = true }: { isBatch?: boolean }) => {
167+
await holdConcurrency.trigger({ waitSeconds: 45 });
168+
169+
if (isBatch) {
170+
await fixedLengthTask.batchTriggerAndWait(
171+
Array.from({ length: 1 }, (_, i) => ({
172+
payload: { waitSeconds: 5 },
173+
}))
174+
);
175+
} else {
176+
await fixedLengthTask.triggerAndWait({ waitSeconds: 5 });
177+
}
178+
179+
logger.log(`Successfully completed task`);
180+
},
181+
});
182+
183+
export const holdConcurrency = task({
184+
id: "hold-concurrency",
185+
queue: oneAtATime,
186+
run: async ({ waitSeconds = 60 }: { waitSeconds?: number }) => {
187+
await new Promise((resolve) => setTimeout(resolve, waitSeconds * 1000));
188+
},
189+
});

0 commit comments

Comments
 (0)