Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions lib/scheduler/__tests__/tenero-task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import {
beforeEach,
afterEach,
} from "vitest";
import { runTeneroTask } from "../tenero-task";
import {
runTeneroTask,
TENERO_MONTH_QUOTA_BACKOFF_MS,
} from "../tenero-task";

/** Minimal logger double — captures events without console noise. */
function createCapturingLogger() {
Expand Down Expand Up @@ -178,7 +181,7 @@ describe("runTeneroTask", () => {
})
) as unknown as typeof fetch;

const { result, rateLimited } = await runTeneroTask({
const { result, rateLimited, rateLimitBackoffMs } = await runTeneroTask({
logger,
kv,
tokenIds: [
Expand All @@ -189,6 +192,7 @@ describe("runTeneroTask", () => {
});

expect(rateLimited).toBe(true);
expect(rateLimitBackoffMs).toBe(5 * 60 * 1000);
// First token wrote successfully before the break.
Comment thread
whoabuddy marked this conversation as resolved.
expect(result.succeeded).toBe(1);
expect(puts).toHaveLength(1);
Expand All @@ -198,4 +202,36 @@ describe("runTeneroTask", () => {
events.some((e) => e.msg === "tenero.minute_quota_exhausted_mid_run")
).toBe(true);
});

it("monthRemaining <= 0: flags rateLimited and backs off for a day", async () => {
const { logger, events } = createCapturingLogger();
const { kv, puts } = createFakeKv();

globalThis.fetch = vi.fn(async () =>
teneroResponse(429, {
minuteRemaining: 80,
monthRemaining: 0,
})
) as unknown as typeof fetch;

const { result, rateLimited, rateLimitBackoffMs } = await runTeneroTask({
logger,
kv,
tokenIds: [
"stx",
"SM3VDXK3WZZSA84XXFKAFAF15NNZX32CTSG82JFQ4.sbtc-token::sbtc",
],
});

expect(rateLimited).toBe(true);
expect(rateLimitBackoffMs).toBe(TENERO_MONTH_QUOTA_BACKOFF_MS);
expect(result.succeeded).toBe(0);
expect(result.failed).toBe(1);
expect(result.monthRemaining).toBe(0);
expect(puts).toHaveLength(0);
expect((globalThis.fetch as ReturnType<typeof vi.fn>).mock.calls).toHaveLength(3);
expect(
events.some((e) => e.msg === "tenero.month_quota_exhausted_mid_run")
).toBe(true);
});
});
26 changes: 25 additions & 1 deletion lib/scheduler/tenero-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ export interface TeneroTaskOutcome {
* DO-storage `nextRunAfter.tenero` for adaptive backoff.
*/
rateLimited: boolean;
/**
* Suggested scheduler backoff. Monthly quota exhaustion needs a much
* longer pause than minute-level throttling.
*/
rateLimitBackoffMs?: number;
}

export const TENERO_MINUTE_QUOTA_BACKOFF_MS = 5 * 60 * 1000;
export const TENERO_MONTH_QUOTA_BACKOFF_MS = 24 * 60 * 60 * 1000;

export async function runTeneroTask(
deps: TeneroTaskDeps
): Promise<TeneroTaskOutcome> {
Expand All @@ -58,6 +66,7 @@ export async function runTeneroTask(
let lastMinuteRemaining: number | null = null;
let lastMonthRemaining: number | null = null;
let rateLimited = false;
let rateLimitBackoffMs: number | undefined;

for (const tokenId of tokenIds) {
const r = await fetchTokenPriceUsd(tokenId, logger, apiKey);
Expand Down Expand Up @@ -89,11 +98,26 @@ export async function runTeneroTask(
failed++;
}

if (
r.rateLimit.monthRemaining !== null &&
r.rateLimit.monthRemaining <= 0
) {
rateLimited = true;
rateLimitBackoffMs = TENERO_MONTH_QUOTA_BACKOFF_MS;
logger.warn("tenero.month_quota_exhausted_mid_run", {
rlMonthRemaining: r.rateLimit.monthRemaining,
processed: succeeded + failed,
remaining: tokenIds.length - (succeeded + failed),
});
break;
}

if (
r.rateLimit.minuteRemaining !== null &&
r.rateLimit.minuteRemaining <= 0
) {
rateLimited = true;
rateLimitBackoffMs ??= TENERO_MINUTE_QUOTA_BACKOFF_MS;
logger.warn("tenero.minute_quota_exhausted_mid_run", {
rlMinuteRemaining: r.rateLimit.minuteRemaining,
processed: succeeded + failed,
Expand Down Expand Up @@ -122,5 +146,5 @@ export async function runTeneroTask(
rateLimited,
});

return { result, rateLimited };
return { result, rateLimited, rateLimitBackoffMs };
}
11 changes: 6 additions & 5 deletions worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { STATIC_TOKEN_IDS } from "./lib/external/tenero";
import {
runTeneroTask,
type TeneroRunResult,
TENERO_MINUTE_QUOTA_BACKOFF_MS,
} from "./lib/scheduler/tenero-task";

// ─────────────────────────── SchedulerDO ───────────────────────────
Expand All @@ -42,7 +43,6 @@ import {

const TENERO_INTERVAL_MS = 5 * 60 * 1000;
const ALARM_TICK_MS = TENERO_INTERVAL_MS;
const TENERO_RATELIMIT_BACKOFF_MS = 5 * 60 * 1000;

export interface SchedulerStatus {
now: number;
Expand Down Expand Up @@ -172,14 +172,14 @@ export class SchedulerDO extends DurableObject<CloudflareEnv> {
? parentLogger.child({ task: "tenero" })
: parentLogger;

const { result, rateLimited } = await runTeneroTask({
const { result, rateLimited, rateLimitBackoffMs } = await runTeneroTask({
logger,
kv: this.env.VERIFIED_AGENTS,
tokenIds: STATIC_TOKEN_IDS,
apiKey: this.lookupTeneroApiKey(),
});

await this.persistTeneroResult(result, { rateLimited });
await this.persistTeneroResult(result, { rateLimited, rateLimitBackoffMs });
return result;
}

Expand Down Expand Up @@ -217,7 +217,7 @@ export class SchedulerDO extends DurableObject<CloudflareEnv> {

private async persistTeneroResult(
result: TeneroRunResult,
opts: { rateLimited: boolean }
opts: { rateLimited: boolean; rateLimitBackoffMs?: number }
): Promise<void> {
await this.ctx.storage.put("lastTeneroRunAt", Date.now());
await this.ctx.storage.put("lastTeneroResult", result);
Expand All @@ -239,7 +239,8 @@ export class SchedulerDO extends DurableObject<CloudflareEnv> {
const nextRunAfter = ((await this.ctx.storage.get<{ tenero?: number }>(
"nextRunAfter"
)) ?? {}) as { tenero?: number };
nextRunAfter.tenero = Date.now() + TENERO_RATELIMIT_BACKOFF_MS;
nextRunAfter.tenero =
Date.now() + (opts.rateLimitBackoffMs ?? TENERO_MINUTE_QUOTA_BACKOFF_MS);
await this.ctx.storage.put("nextRunAfter", nextRunAfter);
}
}
Expand Down
Loading