Skip to content

Commit e501113

Browse files
authored
fix: refill release concurrency token bucket queue when runs resume before checkpoints are created (#1933)
* fix: refill release concurrency token bucket queue when runs resume before checkpoints are created * Fix the engine package.json tests * Remove log
1 parent 5435405 commit e501113

8 files changed

+704
-1
lines changed

Diff for: internal-packages/run-engine/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,4 @@
4141
"build": "pnpm run clean && tsc -p tsconfig.build.json",
4242
"dev": "tsc --watch -p tsconfig.build.json"
4343
}
44-
}
44+
}

Diff for: internal-packages/run-engine/src/engine/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ export class RunEngine {
305305
executionSnapshotSystem: this.executionSnapshotSystem,
306306
runAttemptSystem: this.runAttemptSystem,
307307
machines: this.options.machines,
308+
releaseConcurrencySystem: this.releaseConcurrencySystem,
308309
});
309310
}
310311

Diff for: internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

+109
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,59 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
259259
});
260260
}
261261

262+
public async getReleaseQueueMetrics(releaseQueueDescriptor: T) {
263+
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
264+
const currentTokensRaw = await this.redis.get(this.#bucketKey(releaseQueue));
265+
const queueLength = await this.redis.zcard(this.#queueKey(releaseQueue));
266+
267+
const currentTokens = currentTokensRaw ? Number(currentTokensRaw) : undefined;
268+
269+
return { currentTokens, queueLength };
270+
}
271+
272+
/**
273+
* Refill a token only if the releaserId is not in the release queue.
274+
* Returns true if the token was refilled, false if the releaserId was found in the queue.
275+
*/
276+
public async refillTokenIfNotInQueue(
277+
releaseQueueDescriptor: T,
278+
releaserId: string
279+
): Promise<boolean> {
280+
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
281+
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
282+
283+
if (maxTokens === 0) {
284+
this.logger.debug("No tokens available, skipping refill", {
285+
releaseQueueDescriptor,
286+
releaserId,
287+
maxTokens,
288+
releaseQueue,
289+
});
290+
291+
return false;
292+
}
293+
294+
const result = await this.redis.refillTokenIfNotInQueue(
295+
this.masterQueuesKey,
296+
this.#bucketKey(releaseQueue),
297+
this.#queueKey(releaseQueue),
298+
this.#metadataKey(releaseQueue),
299+
releaseQueue,
300+
releaserId,
301+
String(maxTokens)
302+
);
303+
304+
this.logger.debug("Attempted to refill token if not in queue", {
305+
releaseQueueDescriptor,
306+
releaserId,
307+
maxTokens,
308+
releaseQueue,
309+
result,
310+
});
311+
312+
return result === "true";
313+
}
314+
262315
/**
263316
* Get the next queue that has available capacity and process one item from it
264317
* Returns true if an item was processed, false if no items were available
@@ -783,6 +836,51 @@ end
783836
return true
784837
`,
785838
});
839+
840+
this.redis.defineCommand("refillTokenIfNotInQueue", {
841+
numberOfKeys: 4,
842+
lua: `
843+
local masterQueuesKey = KEYS[1]
844+
local bucketKey = KEYS[2]
845+
local queueKey = KEYS[3]
846+
local metadataKey = KEYS[4]
847+
848+
local releaseQueue = ARGV[1]
849+
local releaserId = ARGV[2]
850+
local maxTokens = tonumber(ARGV[3])
851+
852+
-- Check if the releaserId is in the queue
853+
local score = redis.call("ZSCORE", queueKey, releaserId)
854+
if score then
855+
-- Item is in queue, don't refill token
856+
return redis.status_reply("false")
857+
end
858+
859+
-- Return the token to the bucket
860+
local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
861+
local remainingTokens = currentTokens + 1
862+
863+
-- Don't exceed maxTokens
864+
if remainingTokens > maxTokens then
865+
remainingTokens = maxTokens
866+
end
867+
868+
redis.call("SET", bucketKey, remainingTokens)
869+
870+
-- Clean up any metadata just in case
871+
redis.call("HDEL", metadataKey, releaserId)
872+
873+
-- Update the master queue based on remaining queue length
874+
local queueLength = redis.call("ZCARD", queueKey)
875+
if queueLength > 0 then
876+
redis.call("ZADD", masterQueuesKey, remainingTokens, releaseQueue)
877+
else
878+
redis.call("ZREM", masterQueuesKey, releaseQueue)
879+
end
880+
881+
return redis.status_reply("true")
882+
`,
883+
});
786884
}
787885
}
788886

@@ -839,6 +937,17 @@ declare module "@internal/redis" {
839937
releaserId: string,
840938
callback?: Callback<void>
841939
): Result<void, Context>;
940+
941+
refillTokenIfNotInQueue(
942+
masterQueuesKey: string,
943+
bucketKey: string,
944+
queueKey: string,
945+
metadataKey: string,
946+
releaseQueue: string,
947+
releaserId: string,
948+
maxTokens: string,
949+
callback?: Callback<string>
950+
): Result<string, Context>;
842951
}
843952
}
844953

Diff for: internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

+10
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,27 @@ import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./execution
1111
import { RunAttemptSystem } from "./runAttemptSystem.js";
1212
import { SystemResources } from "./systems.js";
1313
import { sendNotificationToWorker } from "../eventBus.js";
14+
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
1415

1516
export type DequeueSystemOptions = {
1617
resources: SystemResources;
1718
machines: RunEngineOptions["machines"];
1819
executionSnapshotSystem: ExecutionSnapshotSystem;
1920
runAttemptSystem: RunAttemptSystem;
21+
releaseConcurrencySystem: ReleaseConcurrencySystem;
2022
};
2123

2224
export class DequeueSystem {
2325
private readonly $: SystemResources;
2426
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
2527
private readonly runAttemptSystem: RunAttemptSystem;
28+
private readonly releaseConcurrencySystem: ReleaseConcurrencySystem;
2629

2730
constructor(private readonly options: DequeueSystemOptions) {
2831
this.$ = options.resources;
2932
this.executionSnapshotSystem = options.executionSnapshotSystem;
3033
this.runAttemptSystem = options.runAttemptSystem;
34+
this.releaseConcurrencySystem = options.releaseConcurrencySystem;
3135
}
3236

3337
/**
@@ -158,6 +162,12 @@ export class DequeueSystem {
158162
}
159163
);
160164

165+
if (snapshot.previousSnapshotId) {
166+
await this.releaseConcurrencySystem.refillTokensForSnapshot(
167+
snapshot.previousSnapshotId
168+
);
169+
}
170+
161171
await sendNotificationToWorker({
162172
runId,
163173
snapshot: newSnapshot,

Diff for: internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts

+54
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,52 @@ export class ReleaseConcurrencySystem {
6969
await this.releaseConcurrencyQueue.quit();
7070
}
7171

72+
public async refillTokensForSnapshot(snapshotId: string | undefined): Promise<void>;
73+
public async refillTokensForSnapshot(snapshot: TaskRunExecutionSnapshot): Promise<void>;
74+
public async refillTokensForSnapshot(
75+
snapshotOrId: TaskRunExecutionSnapshot | string | undefined
76+
) {
77+
if (!this.releaseConcurrencyQueue) {
78+
return;
79+
}
80+
81+
if (typeof snapshotOrId === "undefined") {
82+
return;
83+
}
84+
85+
const snapshot =
86+
typeof snapshotOrId === "string"
87+
? await this.$.prisma.taskRunExecutionSnapshot.findFirst({
88+
where: { id: snapshotOrId },
89+
})
90+
: snapshotOrId;
91+
92+
if (!snapshot) {
93+
this.$.logger.error("Snapshot not found", {
94+
snapshotId: snapshotOrId,
95+
});
96+
97+
return;
98+
}
99+
100+
if (snapshot.executionStatus !== "EXECUTING_WITH_WAITPOINTS") {
101+
this.$.logger.debug("Snapshot is not in a valid state to refill tokens", {
102+
snapshot,
103+
});
104+
105+
return;
106+
}
107+
108+
await this.releaseConcurrencyQueue.refillTokenIfNotInQueue(
109+
{
110+
orgId: snapshot.organizationId,
111+
projectId: snapshot.projectId,
112+
envId: snapshot.environmentId,
113+
},
114+
snapshot.id
115+
);
116+
}
117+
72118
public async checkpointCreatedOnEnvironment(environment: RuntimeEnvironment) {
73119
if (!this.releaseConcurrencyQueue) {
74120
return;
@@ -86,11 +132,19 @@ export class ReleaseConcurrencySystem {
86132

87133
public async releaseConcurrencyForSnapshot(snapshot: TaskRunExecutionSnapshot) {
88134
if (!this.releaseConcurrencyQueue) {
135+
this.$.logger.debug("Release concurrency queue not enabled, skipping release", {
136+
snapshotId: snapshot.id,
137+
});
138+
89139
return;
90140
}
91141

92142
// Go ahead and release concurrency immediately if the run is in a development environment
93143
if (snapshot.environmentType === "DEVELOPMENT") {
144+
this.$.logger.debug("Immediate release of concurrency for development environment", {
145+
snapshotId: snapshot.id,
146+
});
147+
94148
return await this.executeReleaseConcurrencyForSnapshot(snapshot.id);
95149
}
96150

Diff for: internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

+2
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,8 @@ export class WaitpointSystem {
543543
}
544544
);
545545

546+
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
547+
546548
await sendNotificationToWorker({
547549
runId,
548550
snapshot: newSnapshot,

0 commit comments

Comments
 (0)