Skip to content

Commit a42c9ac

Browse files
authored
v4: Improved run locking (#2173)
1 parent f7f8bc2 commit a42c9ac

File tree

14 files changed

+1335
-170
lines changed

14 files changed

+1335
-170
lines changed

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,15 @@ const EnvironmentSchema = z.object({
429429
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
430430
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
431431

432+
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
433+
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
434+
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),
435+
RUN_ENGINE_RUN_LOCK_BASE_DELAY: z.coerce.number().int().default(100),
436+
RUN_ENGINE_RUN_LOCK_MAX_DELAY: z.coerce.number().int().default(3000),
437+
RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER: z.coerce.number().default(1.8),
438+
RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15),
439+
RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000),
440+
432441
RUN_ENGINE_WORKER_REDIS_HOST: z
433442
.string()
434443
.optional()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ function createRunEngine() {
7575
enableAutoPipelining: true,
7676
...(env.RUN_ENGINE_RUN_LOCK_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
7777
},
78+
duration: env.RUN_ENGINE_RUN_LOCK_DURATION,
79+
automaticExtensionThreshold: env.RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD,
80+
retryConfig: {
81+
maxAttempts: env.RUN_ENGINE_RUN_LOCK_MAX_RETRIES,
82+
baseDelay: env.RUN_ENGINE_RUN_LOCK_BASE_DELAY,
83+
maxDelay: env.RUN_ENGINE_RUN_LOCK_MAX_DELAY,
84+
backoffMultiplier: env.RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER,
85+
jitterFactor: env.RUN_ENGINE_RUN_LOCK_JITTER_FACTOR,
86+
maxTotalWaitTime: env.RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME,
87+
},
7888
},
7989
tracer,
8090
meter,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ export class RunEngine {
9898
logger: this.logger,
9999
tracer: trace.getTracer("RunLocker"),
100100
meter: options.meter,
101+
duration: options.runLock.duration ?? 5000,
102+
automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000,
103+
retryConfig: {
104+
maxAttempts: 10,
105+
baseDelay: 100,
106+
maxDelay: 3000,
107+
backoffMultiplier: 1.8,
108+
jitterFactor: 0.15,
109+
maxTotalWaitTime: 15000,
110+
...options.runLock.retryConfig,
111+
},
101112
});
102113

103114
const keys = new RunQueueFullKeyProducer();
@@ -486,56 +497,52 @@ export class RunEngine {
486497

487498
span.setAttribute("runId", taskRun.id);
488499

489-
await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => {
490-
//create associated waitpoint (this completes when the run completes)
491-
const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint(
492-
prisma,
493-
{
494-
projectId: environment.project.id,
495-
environmentId: environment.id,
496-
completedByTaskRunId: taskRun.id,
497-
}
498-
);
499-
500-
//triggerAndWait or batchTriggerAndWait
501-
if (resumeParentOnCompletion && parentTaskRunId) {
502-
//this will block the parent run from continuing until this waitpoint is completed (and removed)
503-
await this.waitpointSystem.blockRunWithWaitpoint({
504-
runId: parentTaskRunId,
505-
waitpoints: associatedWaitpoint.id,
506-
projectId: associatedWaitpoint.projectId,
507-
organizationId: environment.organization.id,
508-
batch,
509-
workerId,
510-
runnerId,
511-
tx: prisma,
512-
releaseConcurrency,
513-
});
500+
//create associated waitpoint (this completes when the run completes)
501+
const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint(
502+
prisma,
503+
{
504+
projectId: environment.project.id,
505+
environmentId: environment.id,
506+
completedByTaskRunId: taskRun.id,
514507
}
508+
);
515509

516-
//Make sure lock extension succeeded
517-
signal.throwIfAborted();
518-
519-
if (taskRun.delayUntil) {
520-
// Schedule the run to be enqueued at the delayUntil time
521-
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
522-
runId: taskRun.id,
523-
delayUntil: taskRun.delayUntil,
524-
});
525-
} else {
526-
await this.enqueueSystem.enqueueRun({
527-
run: taskRun,
528-
env: environment,
529-
workerId,
530-
runnerId,
531-
tx: prisma,
532-
});
510+
//triggerAndWait or batchTriggerAndWait
511+
if (resumeParentOnCompletion && parentTaskRunId) {
512+
//this will block the parent run from continuing until this waitpoint is completed (and removed)
513+
await this.waitpointSystem.blockRunWithWaitpoint({
514+
runId: parentTaskRunId,
515+
waitpoints: associatedWaitpoint.id,
516+
projectId: associatedWaitpoint.projectId,
517+
organizationId: environment.organization.id,
518+
batch,
519+
workerId,
520+
runnerId,
521+
tx: prisma,
522+
releaseConcurrency,
523+
});
524+
}
533525

534-
if (taskRun.ttl) {
535-
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
536-
}
526+
if (taskRun.delayUntil) {
527+
// Schedule the run to be enqueued at the delayUntil time
528+
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
529+
runId: taskRun.id,
530+
delayUntil: taskRun.delayUntil,
531+
});
532+
} else {
533+
if (taskRun.ttl) {
534+
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
537535
}
538-
});
536+
537+
await this.enqueueSystem.enqueueRun({
538+
run: taskRun,
539+
env: environment,
540+
workerId,
541+
runnerId,
542+
tx: prisma,
543+
skipRunLock: true,
544+
});
545+
}
539546

540547
this.eventBus.emit("runCreated", {
541548
time: new Date(),
@@ -1155,7 +1162,7 @@ export class RunEngine {
11551162
tx?: PrismaClientOrTransaction;
11561163
}) {
11571164
const prisma = tx ?? this.prisma;
1158-
return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => {
1165+
return await this.runLock.lock("handleStalledSnapshot", [runId], async () => {
11591166
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
11601167
if (latestSnapshot.id !== snapshotId) {
11611168
this.logger.log(

0 commit comments

Comments
 (0)