Skip to content

fix: release concurrency system only consumes tokens when releasings are executed successfully #1883

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"type": "node-terminal",
"request": "launch",
"name": "Debug RunEngine tests",
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'",
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should retrieve metrics for all queues via getQueueMetrics'",
"cwd": "${workspaceFolder}/internal-packages/run-engine",
"sourceMaps": true
},
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const defaultOptions: Partial<RedisOptions> = {
const delay = Math.min(times * 50, 1000);
return delay;
},
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 1 : 20,
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20,
};

const logger = new Logger("Redis", "debug");
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ export class RunEngine {
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
batchSize: options.releaseConcurrency?.batchSize ?? 10,
executor: async (descriptor, snapshotId) => {
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
snapshotId
);
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Callback, createRedisClient, Redis, Result, type RedisOptions } from "@internal/redis";
import { Tracer } from "@internal/tracing";
import { startSpan, Tracer } from "@internal/tracing";
import { Logger } from "@trigger.dev/core/logger";
import { setInterval } from "node:timers/promises";
import { z } from "zod";
import { setInterval } from "node:timers/promises";
import { flattenAttributes } from "@trigger.dev/core/v3";

export type ReleaseConcurrencyQueueRetryOptions = {
maxRetries?: number;
Expand All @@ -15,7 +16,10 @@ export type ReleaseConcurrencyQueueRetryOptions = {

export type ReleaseConcurrencyQueueOptions<T> = {
redis: RedisOptions;
executor: (releaseQueue: T, releaserId: string) => Promise<void>;
/**
* @returns true if the run was successful, false if the token should be returned to the bucket
*/
executor: (releaseQueue: T, releaserId: string) => Promise<boolean>;
keys: {
fromDescriptor: (releaseQueue: T) => string;
toDescriptor: (releaseQueue: string) => T;
Expand Down Expand Up @@ -78,6 +82,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {

if (!options.disableConsumers) {
this.#startConsumers();
this.#startMetricsProducer();
}
}

Expand Down Expand Up @@ -119,7 +124,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
String(Date.now())
);

this.logger.debug("Consumed token in attemptToRelease", {
this.logger.info("Consumed token in attemptToRelease", {
releaseQueueDescriptor,
releaserId,
maxTokens,
Expand Down Expand Up @@ -270,7 +275,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
return false;
}

await Promise.all(
await Promise.allSettled(
result.map(([queue, releaserId, metadata]) => {
const itemMetadata = QueueItemMetadata.parse(JSON.parse(metadata));
const releaseQueueDescriptor = this.keys.toDescriptor(queue);
Expand All @@ -283,9 +288,29 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {

async #callExecutor(releaseQueueDescriptor: T, releaserId: string, metadata: QueueItemMetadata) {
try {
this.logger.info("Executing run:", { releaseQueueDescriptor, releaserId });
this.logger.info("Calling executor for release", { releaseQueueDescriptor, releaserId });

const released = await this.options.executor(releaseQueueDescriptor, releaserId);

await this.options.executor(releaseQueueDescriptor, releaserId);
if (released) {
this.logger.info("Executor released concurrency", { releaseQueueDescriptor, releaserId });
} else {
this.logger.info("Executor did not release concurrency", {
releaseQueueDescriptor,
releaserId,
});

// Return the token but don't requeue
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
await this.redis.returnTokenOnly(
this.masterQueuesKey,
this.#bucketKey(releaseQueue),
this.#queueKey(releaseQueue),
this.#metadataKey(releaseQueue),
releaseQueue,
releaserId
);
}
} catch (error) {
this.logger.error("Error executing run:", { error });

Expand Down Expand Up @@ -374,6 +399,30 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
}
}

async #startMetricsProducer() {
try {
// Produce metrics every 60 seconds, using a tracer span
for await (const _ of setInterval(60_000)) {
const metrics = await this.getQueueMetrics();
this.logger.info("Queue metrics:", { metrics });

await startSpan(
this.options.tracer,
"ReleaseConcurrencyTokenBucketQueue.metrics",
async (span) => {},
{
attributes: {
...flattenAttributes(metrics, "queues"),
forceRecording: true,
},
}
);
}
} catch (error) {
this.logger.error("Error starting metrics producer:", { error });
}
}

#calculateBackoffScore(item: QueueItemMetadata): string {
const delay = Math.min(
this.backoff.maxDelay,
Expand All @@ -382,6 +431,137 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
return String(Date.now() + delay);
}

async getQueueMetrics(): Promise<
Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
> {
const streamRedis = this.redis.duplicate();
const queuePattern = `${this.keyPrefix}*:queue`;
const stream = streamRedis.scanStream({
match: queuePattern,
type: "zset",
count: 100,
});

let resolvePromise: (
value: Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
) => void;
let rejectPromise: (reason?: any) => void;

const promise = new Promise<
Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});

const metrics: Map<
string,
{ releaseQueue: string; currentTokens: number; queueLength: number }
> = new Map();

async function getMetricsForKeys(queueKeys: string[]) {
if (queueKeys.length === 0) {
return [];
}

const pipeline = streamRedis.pipeline();

queueKeys.forEach((queueKey) => {
const releaseQueue = queueKey
.replace(":queue", "")
.replace(streamRedis.options.keyPrefix ?? "", "");
const bucketKey = `${releaseQueue}:bucket`;

pipeline.get(bucketKey);
pipeline.zcard(`${releaseQueue}:queue`);
});

const result = await pipeline.exec();

if (!result) {
return [];
}

const results = result.map(([resultError, queueLengthOrCurrentTokens]) => {
if (resultError) {
return null;
}

return queueLengthOrCurrentTokens ? Number(queueLengthOrCurrentTokens) : 0;
});

// Now zip the results with the queue keys
const zippedResults = queueKeys.map((queueKey, index) => {
const releaseQueue = queueKey
.replace(":queue", "")
.replace(streamRedis.options.keyPrefix ?? "", "");

// Current tokens are at indexes 0, 2, 4, 6, etc.
// Queue length are at indexes 1, 3, 5, 7, etc.

const currentTokens = results[index * 2];
const queueLength = results[index * 2 + 1];

if (typeof currentTokens !== "number" || typeof queueLength !== "number") {
return null;
}

return {
releaseQueue,
currentTokens: currentTokens,
queueLength: queueLength,
};
});

return zippedResults.filter((result) => result !== null);
}

stream.on("end", () => {
streamRedis.quit();
resolvePromise(Array.from(metrics.values()));
});

stream.on("error", (error) => {
this.logger.error("Error getting queue metrics:", { error });

stream.pause();
streamRedis.quit();
rejectPromise(error);
});

stream.on("data", async (keys) => {
stream.pause();

const uniqueKeys = Array.from(new Set<string>(keys));

if (uniqueKeys.length === 0) {
stream.resume();
return;
}

const unresolvedKeys = uniqueKeys.filter((key) => !metrics.has(key));

if (unresolvedKeys.length === 0) {
stream.resume();
return;
}

this.logger.debug("Fetching queue metrics for keys", { keys: uniqueKeys });

await getMetricsForKeys(unresolvedKeys).then((results) => {
results.forEach((result) => {
if (result) {
metrics.set(result.releaseQueue, result);
}
});

stream.resume();
});
});

return promise;
}

#registerCommands() {
this.redis.defineCommand("consumeToken", {
numberOfKeys: 4,
Expand All @@ -401,7 +581,9 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)

-- If we have enough tokens, then consume them
if currentTokens >= 1 then
redis.call("SET", bucketKey, currentTokens - 1)
local newCurrentTokens = currentTokens - 1

redis.call("SET", bucketKey, newCurrentTokens)
redis.call("ZREM", queueKey, releaserId)

-- Clean up metadata when successfully consuming
Expand All @@ -411,8 +593,8 @@ if currentTokens >= 1 then
local queueLength = redis.call("ZCARD", queueKey)

-- If we still have tokens and items in queue, update available queues
if currentTokens > 0 and queueLength > 0 then
redis.call("ZADD", masterQueuesKey, currentTokens, releaseQueue)
if newCurrentTokens > 0 and queueLength > 0 then
redis.call("ZADD", masterQueuesKey, newCurrentTokens, releaseQueue)
else
redis.call("ZREM", masterQueuesKey, releaseQueue)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ export class ReleaseConcurrencySystem {
);
}

public async executeReleaseConcurrencyForSnapshot(snapshotId: string) {
public async executeReleaseConcurrencyForSnapshot(snapshotId: string): Promise<boolean> {
if (!this.releaseConcurrencyQueue) {
return;
return false;
}

this.$.logger.debug("Executing released concurrency", {
Expand Down Expand Up @@ -136,14 +136,14 @@ export class ReleaseConcurrencySystem {
snapshotId,
});

return;
return false;
}

// - Runlock the run
// - Get latest snapshot
// - If the run is non suspended or going to be, then bail
// - If the run is suspended or going to be, then release the concurrency
await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
return await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId);

const isValidSnapshot =
Expand All @@ -159,7 +159,7 @@ export class ReleaseConcurrencySystem {
snapshot,
});

return;
return false;
}

if (!canReleaseConcurrency(latestSnapshot.executionStatus)) {
Expand All @@ -168,20 +168,21 @@ export class ReleaseConcurrencySystem {
snapshot: latestSnapshot,
});

return;
return false;
}

const metadata = this.#parseMetadata(snapshot.metadata);

if (typeof metadata.releaseConcurrency === "boolean") {
if (metadata.releaseConcurrency) {
return await this.$.runQueue.releaseAllConcurrency(
snapshot.organizationId,
snapshot.runId
);
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);

return true;
}

return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);

return true;
}

// Get the locked queue
Expand All @@ -198,10 +199,14 @@ export class ReleaseConcurrencySystem {
(typeof taskQueue.concurrencyLimit === "undefined" ||
taskQueue.releaseConcurrencyOnWaitpoint)
) {
return await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);

return true;
}

return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);

return true;
});
}

Expand Down
Loading