diff --git a/app/api/prices/route.ts b/app/api/prices/route.ts new file mode 100644 index 00000000..b2502f65 --- /dev/null +++ b/app/api/prices/route.ts @@ -0,0 +1,188 @@ +/** + * GET /api/prices — cached USD prices for supported Stacks tokens. + * + * Data comes from KV (`tenero:price:{tokenId}`), which the `SchedulerDO` + * refreshes every ~5 min by calling Tenero. This route is a pure read + * surface — it never calls Tenero itself, so its cost scales with KV + * reads, not upstream API quota. + * + * Shapes: + * - `GET /api/prices` with `Accept: application/json` → all cached prices + * - `GET /api/prices?token={tokenId}` with `Accept: application/json` → single token + * - `GET /api/prices` without `application/json` in `Accept` → self-doc + * + * Rate-limited via the existing `RATE_LIMIT_READ` binding (300 req / 60 s + * per IP). Fails open on binding errors in local dev, closed otherwise — + * matches the project convention (#666). + * + * Adding a new priceable token: edit `STATIC_TOKEN_IDS` in + * `lib/external/tenero/tokens.ts` AND `TOKEN_DECIMALS` in + * `app/leaderboard/page.tsx`. Run a Tenero probe first to confirm the + * contract id has a non-null `price_usd`. + */ + +import { NextRequest, NextResponse } from "next/server"; +import { getCloudflareContext } from "@opennextjs/cloudflare"; +import { + getCachedTokenPrice, + getCachedTokenPrices, +} from "@/lib/external/tenero/kv-cache"; +import { STATIC_TOKEN_IDS } from "@/lib/external/tenero/tokens"; +import { createConsoleLogger, createLogger, isLogsRPC } from "@/lib/logging"; + +const RATE_LIMIT_RETRY_AFTER = 60; + +/** + * In production / preview, fail closed when the rate-limit binding errors + * (matches #666 convention via `DEPLOY_ENV !== undefined`). + */ +function shouldFailClosed(env: CloudflareEnv): boolean { + return env.DEPLOY_ENV !== undefined; +} + +function acceptsJson(request: NextRequest): boolean { + const accept = request.headers.get("Accept") ?? ""; + return accept.toLowerCase().includes("application/json"); +} + +function selfDoc(): NextResponse { + return NextResponse.json( + { + endpoint: "/api/prices", + description: + "USD prices for supported Stacks tokens. Cached by the SchedulerDO " + + "(~5 min refresh cadence) from Tenero. Read-only — no upstream calls.", + methods: { + "GET /api/prices": { + accept: "application/json", + description: "Return cached USD prices for all supported tokens.", + response: { + prices: { + "{tokenId}": { + priceUsd: + "number | null — USD price; null when Tenero confirmed no published price", + fetchedAt: "number — unix millis when the cache entry was written", + }, + }, + supportedTokens: + "string[] — full list of tokenIds the scheduler refreshes", + }, + }, + "GET /api/prices?token={tokenId}": { + accept: "application/json", + description: "Return a single token's cached price.", + response: { + tokenId: "string", + priceUsd: "number | null", + fetchedAt: "number | null — null when no cache entry exists yet", + }, + }, + }, + supportedTokens: STATIC_TOKEN_IDS, + addingATokenRequires: [ + "Adding to STATIC_TOKEN_IDS in lib/external/tenero/tokens.ts", + "Adding to TOKEN_DECIMALS in app/leaderboard/page.tsx", + "Probing https://api.tenero.io/v1/stacks/tokens/{contract_id} for a non-null price_usd", + ], + rateLimit: "300 req / 60 s per IP (RATE_LIMIT_READ binding)", + }, + { + headers: { + "Cache-Control": "public, max-age=300, s-maxage=3600", + }, + } + ); +} + +export async function GET(request: NextRequest) { + const { env, ctx } = await getCloudflareContext({ async: true }); + const rayId = request.headers.get("cf-ray") ?? crypto.randomUUID(); + const logger = isLogsRPC(env.LOGS) + ? createLogger(env.LOGS, ctx, { rayId, path: "/api/prices" }) + : createConsoleLogger({ rayId, path: "/api/prices" }); + + if (!acceptsJson(request)) { + return selfDoc(); + } + + // Rate-limit by IP. RATE_LIMIT_READ is a 300/60s bucket; KV reads are + // cheap so this is the right size. Fails closed in deployed envs. + const ip = + request.headers.get("cf-connecting-ip") || + request.headers.get("x-forwarded-for") || + "unknown"; + let limited = false; + try { + const result = await env.RATE_LIMIT_READ.limit({ key: `prices:${ip}` }); + limited = !result.success; + } catch (err) { + const failClosed = shouldFailClosed(env); + logger.warn("prices.rate_limit_binding_error", { + error: String(err), + failClosed, + }); + if (failClosed) limited = true; + } + if (limited) { + return NextResponse.json( + { + error: "Too many requests. Slow down.", + retryAfter: RATE_LIMIT_RETRY_AFTER, + }, + { + status: 429, + headers: { "Retry-After": String(RATE_LIMIT_RETRY_AFTER) }, + } + ); + } + + const kv = env.VERIFIED_AGENTS; + if (!kv) { + return NextResponse.json( + { error: "Price cache unavailable in this environment." }, + { status: 503 } + ); + } + + const url = new URL(request.url); + const token = url.searchParams.get("token"); + + // Single-token lookup + if (token) { + const cached = await getCachedTokenPrice(kv, token); + return NextResponse.json( + { + tokenId: token, + priceUsd: cached?.priceUsd ?? null, + fetchedAt: cached?.fetchedAt ?? null, + }, + { + headers: { + "Cache-Control": "public, max-age=30, s-maxage=60", + }, + } + ); + } + + // Full set + const cached = await getCachedTokenPrices(kv, STATIC_TOKEN_IDS); + const prices: Record = + {}; + for (const [tokenId, entry] of cached) { + prices[tokenId] = { + priceUsd: entry.priceUsd, + fetchedAt: entry.fetchedAt, + }; + } + return NextResponse.json( + { + prices, + supportedTokens: STATIC_TOKEN_IDS, + }, + { + headers: { + "Cache-Control": "public, max-age=30, s-maxage=60", + }, + } + ); +} diff --git a/app/components/Navbar.tsx b/app/components/Navbar.tsx index 86eb3fe2..4c65cc44 100644 --- a/app/components/Navbar.tsx +++ b/app/components/Navbar.tsx @@ -139,6 +139,7 @@ export default function Navbar() { {[ { href: "/agents", label: "Agent Network" }, + { href: "/leaderboard", label: "Leaderboard" }, { href: "/activity", label: "Activity Feed" }, { href: "/bounty", label: "Bounties" }, { href: "/skills", label: "Skills" }, @@ -146,7 +147,7 @@ export default function Navbar() { {link.label} @@ -173,6 +174,7 @@ export default function Navbar() { > {[ { href: "/agents", label: "Agent Network" }, + { href: "/leaderboard", label: "Leaderboard" }, { href: "/activity", label: "Activity Feed" }, { href: "/bounty", label: "Bounties" }, { href: "/skills", label: "Skills" }, diff --git a/app/leaderboard/LeaderboardClient.tsx b/app/leaderboard/LeaderboardClient.tsx new file mode 100644 index 00000000..970dd434 --- /dev/null +++ b/app/leaderboard/LeaderboardClient.tsx @@ -0,0 +1,217 @@ +"use client"; + +import Link from "next/link"; +import { generateName } from "@/lib/name-generator"; +import { truncateAddress, formatRelativeTime } from "@/lib/utils"; + +export interface LeaderboardRow { + stxAddress: string; + btcAddress: string | null; + displayName: string | null; + bnsName: string | null; + erc8004AgentId: number | null; + tradeCount: number; + latestTradeAt: number; + /** + * USD volume computed server-side from KV-cached Tenero prices. Comes in + * as a plain number so this component stays presentational — no fetch, + * no localStorage, no useEffect. + */ + volumeUsd: number; + /** + * False if any token in the agent's volume couldn't be priced from KV + * (cold scheduler, paused, or token has no published price). Lets the UI + * footnote the row instead of misleadingly under-reporting. + */ + allPriced: boolean; +} + +function formatUsd(value: number | null): string { + if (value == null || !Number.isFinite(value)) return "—"; + const abs = Math.abs(value); + const fractionDigits = abs < 10_000 ? 2 : 0; + const formatted = abs.toLocaleString("en-US", { + minimumFractionDigits: fractionDigits, + maximumFractionDigits: fractionDigits, + }); + const sign = value < 0 ? "-" : ""; + return `${sign}$${formatted}`; +} + +function rowDisplayName(row: LeaderboardRow): string { + return ( + row.displayName?.trim() || + row.bnsName?.trim() || + (row.btcAddress ? generateName(row.btcAddress) : truncateAddress(row.stxAddress)) + ); +} + +function renderVolumeCell(row: LeaderboardRow): React.ReactNode { + if (row.volumeUsd > 0) { + const label = formatUsd(row.volumeUsd); + return row.allPriced ? ( + {label} + ) : ( + + {label}* + + ); + } + return ; +} + +export default function LeaderboardClient({ rows }: { rows: LeaderboardRow[] }) { + if (rows.length === 0) { + return ( +
+

+ No agents have submitted trades yet. Once swaps land via{" "} + POST /api/competition/trades + , they'll appear here. +

+
+ ); + } + + return ( +
+ {/* Desktop / tablet table */} +
+ + + + + + + + + + + + {rows.map((row, idx) => ( + + + + + + + + ))} + +
RankAgentTradesVolume (USD)Latest Trade
#{idx + 1} + {row.btcAddress ? ( + + {/* eslint-disable-next-line @next/next/no-img-element */} + {rowDisplayName(row)} { e.currentTarget.style.display = "none"; }} + /> + + + {rowDisplayName(row)} + + + {truncateAddress(row.stxAddress)} + + + + ) : ( +
+ + )} +
+ {row.tradeCount} + {renderVolumeCell(row)} + {row.latestTradeAt > 0 + ? formatRelativeTime(new Date(row.latestTradeAt * 1000).toISOString()) + : "—"} +
+
+ + {/* Mobile list */} + +
+ ); +} diff --git a/app/leaderboard/page.tsx b/app/leaderboard/page.tsx index 6ca9207c..04d0809e 100644 --- a/app/leaderboard/page.tsx +++ b/app/leaderboard/page.tsx @@ -1,20 +1,269 @@ import type { Metadata } from "next"; -import { redirect } from "next/navigation"; +import { getCloudflareContext } from "@opennextjs/cloudflare"; +import Navbar from "../components/Navbar"; +import AnimatedBackground from "../components/AnimatedBackground"; +import LeaderboardClient, { type LeaderboardRow } from "./LeaderboardClient"; +import { getCachedTokenPrices } from "@/lib/external/tenero/kv-cache"; + +// Reads live Cloudflare bindings (D1, KV, SchedulerDO). Keep this dynamic so +// Next's build-time prerender never needs a Wrangler platform proxy. +export const dynamic = "force-dynamic"; export const metadata: Metadata = { - title: "Agent Registry - AIBTC", + title: "Trading Leaderboard - AIBTC", description: - "AIBTC agent registry with sortable list ranked by level: Genesis, Registered", + "Agents ranked by the number of swaps they've done via aibtc mcp.", openGraph: { - title: "AIBTC Agent Registry", - description: "See all registered AI agents in the Bitcoin economy", + title: "AIBTC Trading Leaderboard", + description: "MCP-submitted swap rankings across the AIBTC agent network.", }, other: { - "aibtc:page-type": "agent-registry", - "aibtc:api-endpoint": "/api/agents", + "aibtc:page-type": "trading-leaderboard", + "aibtc:api-endpoint": "/api/competition/trades", }, }; -export default function LeaderboardPage() { - redirect("/agents"); +/** + * Stacks-canonical decimals for tokens we know how to value. Adding a + * new token requires probing Tenero's `/v1/stacks/tokens/{contract_id}` + * first and confirming a 200 with a non-null price_usd — silently + * shipping the wrong contract id makes that token render as $0 forever. + * + * Keep in sync with `STATIC_TOKEN_IDS` in `lib/external/tenero/tokens.ts` + * (consumed by `SchedulerDO` in `worker.ts`) so the scheduler refreshes + * every token the leaderboard knows how to value. + * + * The unknown-token default is 6 (SIP-10 convention). Volume from + * those legs stays $0 (no price in KV), which is the honest read — we'd + * rather under-report than impute a number. + */ +const TOKEN_DECIMALS: Readonly> = { + stx: 6, + "SM3VDXK3WZZSA84XXFKAFAF15NNZX32CTSG82JFQ4.sbtc-token::sbtc": 8, + "SP4SZE494VC2YC5JYG7AYFQ44F5Q4PYV7DVMDPBG.ststx-token::ststx": 6, +}; + +interface LeaderboardJoinedRow { + sender: string; + token_in: string; + cnt: number; + // D1 returns SUM of an INTEGER column as a JS number, but the runtime + // boundary isn't tightly typed — Cloudflare's docs leave room for + // string returns on very large aggregates. Type defensively here. + sum_in: number | string | null; + latest_at: number; + btc_address: string | null; + display_name: string | null; + bns_name: string | null; + erc8004_agent_id: number | null; +} + +/** + * Parse a D1 aggregate into a safe JS number. Handles: + * - native number (the common case) — passes through if finite, else 0 + * - decimal string (defensive — D1 may return very large sums as strings) + * - non-finite / non-parseable / negative — returns 0 + * + * For the token decimals we support today (6 / 8) and the comp's expected + * volume range, the SUM stays well under `Number.MAX_SAFE_INTEGER` (sBTC + * caps at ~21M * 1e8 ≈ 2.1e15; safe-int boundary ≈ 9e15). The BigInt + * round-trip preserves precision exactly inside that range and clamps at + * the safe-int boundary if a future high-decimal token enters scope — + * an under-report at the ceiling is preferable to silent rounding errors. + */ +function safeAggregateNumber(raw: number | string | null | undefined): number { + if (typeof raw === "number") return Number.isFinite(raw) && raw > 0 ? raw : 0; + if (typeof raw !== "string") return 0; + let big: bigint; + try { + big = BigInt(raw); + } catch { + return 0; + } + // Use `BigInt(0)` rather than `0n` — tsconfig target is below ES2020. + if (big <= BigInt(0)) return 0; + const ceiling = BigInt(Number.MAX_SAFE_INTEGER); + return big > ceiling ? Number.MAX_SAFE_INTEGER : Number(big); +} + +async function fetchLeaderboard(): Promise { + const { env, ctx } = await getCloudflareContext(); + const db = env.DB as D1Database | undefined; + const kv = env.VERIFIED_AGENTS as KVNamespace | undefined; + + // Opportunistic SchedulerDO kick. A DO instance doesn't exist until + // something calls a method on it — the constructor (which arms the + // first alarm) only runs on first invocation. Fire-and-forget here so + // SSR isn't blocked; `ctx.waitUntil` keeps the RPC alive past response + // teardown. Idempotent — subsequent renders just touch a live instance. + // Wrapped in a guard so a missing/misbehaving DO binding never blocks + // the leaderboard render path. + try { + if (env.SCHEDULER) { + ctx.waitUntil( + env.SCHEDULER.get(env.SCHEDULER.idFromName("v1")) + .status() + .then(() => undefined) + .catch(() => undefined) + ); + } + } catch { + // Binding access threw — render proceeds without the kick. + } + + if (!db) return []; + + // Single round-trip: aggregate `swaps` per (sender, token_in) and + // LEFT JOIN the four display fields from `agents`. The agent columns + // are functionally dependent on `sender` (in the GROUP BY) so SQLite + // returns them consistently across the per-token rows for one sender. + let rows: LeaderboardJoinedRow[] = []; + try { + const sql = ` + SELECT s.sender, s.token_in, + COUNT(*) AS cnt, + SUM(s.amount_in) AS sum_in, + MAX(s.burn_block_time) AS latest_at, + a.btc_address, a.display_name, a.bns_name, a.erc8004_agent_id + FROM swaps s + LEFT JOIN agents a ON a.stx_address = s.sender + WHERE s.source = 'agent' + GROUP BY s.sender, s.token_in + `; + const result = await db.prepare(sql).all(); + rows = result.results ?? []; + } catch { + return []; + } + + if (rows.length === 0) return []; + + // Aggregate per sender — sum count, keep max(latest_at), preserve + // per-token breakdown for the client-side volume calculation, and + // capture display fields once (they're identical across per-token + // rows of the same sender). + const bySender = new Map< + string, + { + count: number; + latestAt: number; + tokens: Array<{ tokenId: string; sumAmountIn: number; decimals: number }>; + display: { + btcAddress: string | null; + displayName: string | null; + bnsName: string | null; + erc8004AgentId: number | null; + }; + } + >(); + for (const r of rows) { + const existing = bySender.get(r.sender) ?? { + count: 0, + latestAt: 0, + tokens: [] as Array<{ + tokenId: string; + sumAmountIn: number; + decimals: number; + }>, + display: { + btcAddress: r.btc_address, + displayName: r.display_name, + bnsName: r.bns_name, + erc8004AgentId: r.erc8004_agent_id, + }, + }; + existing.count += r.cnt; + if (r.latest_at > existing.latestAt) existing.latestAt = r.latest_at; + existing.tokens.push({ + tokenId: r.token_in, + sumAmountIn: safeAggregateNumber(r.sum_in), + decimals: TOKEN_DECIMALS[r.token_in] ?? 6, + }); + bySender.set(r.sender, existing); + } + + // Read every distinct tokenId from KV in parallel. SchedulerDO writes + // these on its 5-min alarm tick; SSR is a pure consumer here. Missing + // entries (cold start, scheduler paused) render as "—" downstream. + const distinctTokenIds = new Set(); + for (const agg of bySender.values()) { + for (const t of agg.tokens) distinctTokenIds.add(t.tokenId); + } + const priceMap = kv + ? await getCachedTokenPrices(kv, Array.from(distinctTokenIds)) + : new Map(); + + const ranked: LeaderboardRow[] = Array.from(bySender.entries()) + .map(([sender, agg]) => { + let volumeUsd = 0; + let allPriced = true; + for (const t of agg.tokens) { + const cached = priceMap.get(t.tokenId); + const price = cached?.priceUsd ?? null; + if (price == null) { + allPriced = false; + continue; + } + volumeUsd += (t.sumAmountIn / 10 ** t.decimals) * price; + } + return { + stxAddress: sender, + btcAddress: agg.display.btcAddress, + displayName: agg.display.displayName, + bnsName: agg.display.bnsName, + erc8004AgentId: agg.display.erc8004AgentId, + tradeCount: agg.count, + latestTradeAt: agg.latestAt, + volumeUsd, + allPriced, + }; + }) + .sort((a, b) => { + // Primary: count desc. Tiebreak: latest trade desc. + if (b.tradeCount !== a.tradeCount) return b.tradeCount - a.tradeCount; + return b.latestTradeAt - a.latestTradeAt; + }); + + return ranked; +} + +export default async function LeaderboardPage() { + const rows = await fetchLeaderboard(); + + return ( + <> + {/* + AIBTC Trading Leaderboard — Machine-readable endpoints: + - GET /api/competition/trades?address=… — Per-agent trade list (cursor paginated) + - POST /api/competition/trades — Submit a txid via the MCP (PR #738 / #510) + - Full docs: /llms-full.txt | OpenAPI: /api/openapi.json + */} + + + +
+
+
+
+
+

+ Leaderboard +

+

+ Agents ranked by the number of swaps they've done via aibtc mcp. +

+
+ + +
+
+ + ); } diff --git a/cloudflare-env.d.ts b/cloudflare-env.d.ts index 29c82b84..afc54fd3 100644 --- a/cloudflare-env.d.ts +++ b/cloudflare-env.d.ts @@ -18,4 +18,31 @@ interface CloudflareEnv { X402_RELAY_URL?: string; // x402 relay URL for all payment settlement (default: https://x402-relay.aibtc.com) X402_RELAY?: import("./lib/inbox/relay-rpc").RelayRPC; // x402 sponsor relay RPC service binding (undefined in local dev) INBOX_RECONCILIATION_QUEUE?: Queue; + // Inline stub interface (not `import("./worker").SchedulerDO`) so this + // d.ts doesn't pull worker.ts into Next.js's type-check pass, where + // `./.open-next/worker.js` doesn't resolve until after OpenNext runs. + // + // The `Rpc.DurableObjectBranded` intersection is load-bearing: + // `DurableObjectNamespace` requires T to extend that brand for the + // stub returned from `.get(id)` to surface T's RPC methods (per + // @cloudflare/workers-types — DurableObjectStub = Fetcher which + // only expands T's methods when T is branded). Without it, callers + // see `.fetch()` / `.connect()` only, never `.status()` / etc. + // + // Keep these method signatures in sync with the SchedulerDO class in + // worker.ts. + // Returns are typed as `Promise` not `Promise` because the + // RPC `Result` type only accepts Serializable returns — `unknown` + // falls through to `never` and the call site can't even chain `.then()`. + // Callers that need richer return types should tighten these here when + // they wire up real consumers. + SCHEDULER: DurableObjectNamespace< + Rpc.DurableObjectBranded & { + status(): Promise; + refreshNow(task: "tenero" | "all"): Promise; + pauseUntil(timestamp: number): Promise; + resume(): Promise; + } + >; + TENERO_API_KEY?: string; // Optional Tenero API key (x-api-key header); raises rate limits above the shared web-ui-ip tier } diff --git a/lib/balances/btc.ts b/lib/balances/btc.ts index cde1cb9f..a98cd446 100644 --- a/lib/balances/btc.ts +++ b/lib/balances/btc.ts @@ -20,15 +20,45 @@ export interface BtcBalance { const SBTC_ASSET_ID = `${SBTC_CONTRACTS.mainnet.address}.${SBTC_CONTRACTS.mainnet.name}::${SBTC_CONTRACTS.mainnet.name}`; +/** + * Parse a satoshi string (Stacks `/balances` returns decimal strings) into + * a JS number safely. BigInt round-trip preserves precision exactly within + * `Number.MAX_SAFE_INTEGER` (≈9.007e15) and clamps above it. + * + * Both sBTC and L1 BTC are bounded by ~21M BTC × 1e8 ≈ 2.1e15 sats — well + * inside safe-int range — so the clamp is purely defensive against + * malformed upstream responses. + */ +function parseSatsString(raw: string): number { + let big: bigint; + try { + big = BigInt(raw); + } catch { + return 0; + } + // Use `BigInt(0)` rather than `0n` — tsconfig target is below ES2020. + if (big <= BigInt(0)) return 0; + const ceiling = BigInt(Number.MAX_SAFE_INTEGER); + return big > ceiling ? Number.MAX_SAFE_INTEGER : Number(big); +} + async function fetchL1Sats(btcAddress: string): Promise { const url = `https://mempool.space/api/address/${btcAddress}`; const res = await fetch(url, { headers: { Accept: "application/json" } }); if (!res.ok) return 0; + // mempool.space returns funded_txo_sum / spent_txo_sum as JSON numbers. + // JSON.parse loses precision past 2^53 silently, so re-narrow with BigInt + // via String(...) — preserves the parsed value when within safe range and + // signals overflow (returns 0 via the catch) for malformed responses. const body = (await res.json()) as { chain_stats?: { funded_txo_sum?: number; spent_txo_sum?: number }; }; - const funded = body.chain_stats?.funded_txo_sum ?? 0; - const spent = body.chain_stats?.spent_txo_sum ?? 0; + const fundedRaw = body.chain_stats?.funded_txo_sum; + const spentRaw = body.chain_stats?.spent_txo_sum; + const funded = + typeof fundedRaw === "number" && Number.isFinite(fundedRaw) ? fundedRaw : 0; + const spent = + typeof spentRaw === "number" && Number.isFinite(spentRaw) ? spentRaw : 0; return Math.max(0, funded - spent); } @@ -42,8 +72,7 @@ async function fetchL2Sats(stxAddress: string, hiroApiKey?: string): Promise; }; const raw = body.fungible_tokens?.[SBTC_ASSET_ID]?.balance ?? "0"; - const parsed = Number(raw); - return Number.isFinite(parsed) ? parsed : 0; + return parseSatsString(raw); } export async function fetchBtcBalance( diff --git a/lib/external/tenero-fetch.ts b/lib/external/tenero-fetch.ts new file mode 100644 index 00000000..90f2da79 --- /dev/null +++ b/lib/external/tenero-fetch.ts @@ -0,0 +1,258 @@ +/** + * Shared Tenero API fetch wrapper with bounded retry + structured logging. + * + * Modeled on `lib/stacks-api-fetch.ts` (Hiro wrapper). Differences vs. Hiro: + * - Smaller default 429 retry budget. Tenero's unauthenticated tier is + * web-ui-ip / 100-per-minute / 50k-per-month, and from a Worker the source + * IP is a shared CF-datacenter egress IP. Aggressive retry on 429 only + * speeds up the lockout. The SchedulerDO's next alarm tick is the recovery + * path, not in-attempt retry. + * - Parses `x-ratelimit-*` rate-limit headers (minute + month remaining) so + * callers can surface them up to the scheduler for adaptive cadence. + * - Optional `TENERO_API_KEY` is threaded through as `x-api-key`. The header + * name is the common Tenero convention; if their auth header turns out to + * be different, change it here only. + * + * Observability: callers thread a {@link Logger} from the worker-logs + * pipeline (via `isLogsRPC(env.LOGS) ? createLogger : createConsoleLogger` + * — see `lib/logging.ts`). The wrapper is silent when no logger is given; + * it never falls back to `console.*`, which would bypass the telemetry sink + * and recreate the observability bug the rev'd revert was trying to fix. + */ + +import type { Logger } from "../logging"; + +const TENERO_API_BASE = "https://api.tenero.io/v1/stacks"; + +/** Build headers for Tenero API requests, optionally including an API key. */ +export function buildTeneroHeaders(apiKey?: string): Record { + const headers: Record = { + Accept: "application/json", + "User-Agent": "aibtc-landing-page/1.0 (+https://aibtc.com)", + }; + if (apiKey) { + headers["x-api-key"] = apiKey; + } + return headers; +} + +/** Parsed Tenero rate-limit headers. */ +export interface TeneroRateLimit { + /** Remaining requests this minute (x-ratelimit-minute-remaining). */ + minuteRemaining: number | null; + /** Remaining requests this month (x-ratelimit-month-remaining). */ + monthRemaining: number | null; + /** Rate-limit tier label (x-ratelimit-type, e.g. "web-ui-ip"). */ + type: string | null; +} + +function parseIntHeader(response: Response, name: string): number | null { + const val = response.headers.get(name); + if (!val) return null; + const n = parseInt(val, 10); + return Number.isFinite(n) ? n : null; +} + +export function extractTeneroRateLimit(response: Response): TeneroRateLimit { + return { + minuteRemaining: parseIntHeader(response, "x-ratelimit-minute-remaining"), + monthRemaining: parseIntHeader(response, "x-ratelimit-month-remaining"), + type: response.headers.get("x-ratelimit-type"), + }; +} + +function extractPath(url: string): string { + try { + return new URL(url).pathname; + } catch { + return url; + } +} + +/** Per-attempt fetch timeout. */ +const PER_ATTEMPT_TIMEOUT_MS = 8_000; + +/** Cap Retry-After at 30s — the alarm tick handles longer recovery. */ +const MAX_RETRY_AFTER_MS = 30_000; + +/** Default retry budget: small on purpose — see file header. */ +const DEFAULT_429_RETRIES = 2; +const DEFAULT_5XX_RETRIES = 2; +const DEFAULT_429_BASE_DELAY_MS = 1_500; +const DEFAULT_5XX_BASE_DELAY_MS = 500; + +function isRetryableStatus(status: number): boolean { + return status === 429 || (status >= 500 && status < 600); +} + +function parseRetryAfterMs(response: Response): number | null { + const headerValue = response.headers.get("Retry-After"); + if (!headerValue) return null; + const seconds = parseInt(headerValue, 10); + if (!Number.isFinite(seconds) || seconds <= 0) return null; + return Math.min(seconds * 1000, MAX_RETRY_AFTER_MS); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export interface TeneroFetchConfig { + /** Max attempts for 5xx errors (default: 2). */ + retries?: number; + /** Base delay for 5xx exponential backoff in ms (default: 500). */ + baseDelayMs?: number; + /** Max attempts for 429 rate-limit errors (default: 2). */ + retries429?: number; + /** + * Optional Logger; when provided, emits `tenero.*` telemetry events. + * Silent when omitted — we do not fall back to console.*, which would + * bypass the worker-logs pipeline. + */ + logger?: Logger; + /** Optional Tenero API key (sent as x-api-key). */ + apiKey?: string; +} + +/** + * Fetch a Tenero API path with bounded retry on 429/5xx. + * + * Pass a relative path like `/tokens/SP...sbtc-token` — the base URL is + * concatenated internally so the wrapper owns the host. Each attempt has an + * independent {@link PER_ATTEMPT_TIMEOUT_MS} timeout. After retries are + * exhausted, the final Response is returned for the caller to inspect. + * + * @returns Final Response (status may be 2xx, 429, 5xx, or other 4xx) + * @throws Only on network-level errors after all retries are exhausted + */ +export async function teneroFetch( + path: string, + config: TeneroFetchConfig = {} +): Promise { + const { + retries = DEFAULT_5XX_RETRIES, + baseDelayMs = DEFAULT_5XX_BASE_DELAY_MS, + retries429 = DEFAULT_429_RETRIES, + logger, + apiKey, + } = config; + + const url = `${TENERO_API_BASE}${path.startsWith("/") ? path : `/${path}`}`; + const logPath = extractPath(url); + const headers = buildTeneroHeaders(apiKey); + + let attempts429 = 0; + let attempts5xx = 0; + const maxTotal = retries429 + retries + 1; + let total = 0; + + while (total < maxTotal) { + total++; + try { + const response = await fetch(url, { + headers, + signal: AbortSignal.timeout(PER_ATTEMPT_TIMEOUT_MS), + }); + + const rl = extractTeneroRateLimit(response); + if (rl.minuteRemaining !== null && rl.minuteRemaining <= 5) { + logger?.warn("tenero.minute_quota_low", { + path: logPath, + rlMinuteRemaining: rl.minuteRemaining, + rlType: rl.type, + }); + } + if (rl.monthRemaining !== null && rl.monthRemaining <= 5_000) { + logger?.warn("tenero.month_quota_low", { + path: logPath, + rlMonthRemaining: rl.monthRemaining, + rlType: rl.type, + }); + } + + if (!isRetryableStatus(response.status)) { + return response; + } + + const cfRay = response.headers.get("cf-ray"); + const is429 = response.status === 429; + + if (is429) { + attempts429++; + if (attempts429 > retries429) { + logger?.warn("tenero.retry_budget_exhausted", { + path: logPath, + status: 429, + attempts: attempts429, + budget: "429", + rlMinuteRemaining: rl.minuteRemaining, + rlMonthRemaining: rl.monthRemaining, + ...(cfRay ? { cfRay } : {}), + }); + return response; + } + + const retryAfterMs = + parseRetryAfterMs(response) ?? + DEFAULT_429_BASE_DELAY_MS * Math.pow(2, attempts429 - 1); + const delayMs = Math.min(retryAfterMs, MAX_RETRY_AFTER_MS); + logger?.warn("tenero.retrying", { + path: logPath, + status: 429, + attempt: attempts429, + maxAttempts: retries429, + delayMs, + ...(cfRay ? { cfRay } : {}), + }); + await sleep(delayMs); + } else { + attempts5xx++; + if (attempts5xx > retries) { + logger?.warn("tenero.retry_budget_exhausted", { + path: logPath, + status: response.status, + attempts: attempts5xx, + budget: "5xx", + ...(cfRay ? { cfRay } : {}), + }); + return response; + } + + const delayMs = baseDelayMs * Math.pow(2, attempts5xx - 1); + logger?.warn("tenero.retrying", { + path: logPath, + status: response.status, + attempt: attempts5xx, + maxAttempts: retries, + delayMs, + ...(cfRay ? { cfRay } : {}), + }); + await sleep(delayMs); + } + } catch (error) { + attempts5xx++; + if (attempts5xx > retries) { + logger?.warn("tenero.retry_budget_exhausted", { + path: logPath, + attempts: attempts5xx, + budget: "network", + error: String(error), + }); + throw error; + } + + const delayMs = baseDelayMs * Math.pow(2, attempts5xx - 1); + logger?.warn("tenero.retrying", { + path: logPath, + attempt: attempts5xx, + maxAttempts: retries, + delayMs, + budget: "network", + error: String(error), + }); + await sleep(delayMs); + } + } + + throw new Error(`[teneroFetch] Unexpected: retry loop exited without return`); +} diff --git a/lib/external/tenero/__tests__/kv-cache.test.ts b/lib/external/tenero/__tests__/kv-cache.test.ts new file mode 100644 index 00000000..4cea4713 --- /dev/null +++ b/lib/external/tenero/__tests__/kv-cache.test.ts @@ -0,0 +1,114 @@ +import { describe, it, expect, vi } from "vitest"; +import { + getCachedTokenPrice, + setCachedTokenPrice, + TENERO_PRICE_KV_PREFIX, + TENERO_PRICE_KV_TTL_SECONDS, +} from "../kv-cache"; + +/** + * Hand-rolled KV double — just enough surface for `get("...", "json")` and + * `put("...", string, options)` to round-trip. Mirrors the inline-double + * pattern used in `lib/__tests__/edge-cache.test.ts` rather than miniflare. + */ +function createFakeKv() { + const store = new Map(); + const puts: Array<{ + key: string; + value: string; + options?: KVNamespacePutOptions; + }> = []; + + const kv = { + get: vi.fn(async (key: string, type?: "json") => { + const raw = store.get(key); + if (raw === undefined) return null; + if (type === "json") { + try { + return JSON.parse(raw); + } catch { + return null; + } + } + return raw; + }), + put: vi.fn( + async (key: string, value: string, options?: KVNamespacePutOptions) => { + store.set(key, value); + puts.push({ key, value, options }); + } + ), + }; + + return { kv, store, puts }; +} + +describe("getCachedTokenPrice", () => { + it("returns null for a tokenId that hasn't been cached", async () => { + const { kv } = createFakeKv(); + const result = await getCachedTokenPrice( + kv as unknown as KVNamespace, + "stx" + ); + expect(result).toBeNull(); + }); + + it("round-trips a written entry through setCachedTokenPrice", async () => { + const { kv, puts } = createFakeKv(); + const tokenId = "stx"; + const now = 1_715_000_000_000; + + await setCachedTokenPrice(kv as unknown as KVNamespace, tokenId, { + priceUsd: 1.85, + fetchedAt: now, + minuteRemaining: 47, + monthRemaining: 12_345, + }); + + expect(puts).toHaveLength(1); + expect(puts[0].key).toBe(`${TENERO_PRICE_KV_PREFIX}${tokenId}`); + expect(puts[0].options?.expirationTtl).toBe(TENERO_PRICE_KV_TTL_SECONDS); + + const read = await getCachedTokenPrice( + kv as unknown as KVNamespace, + tokenId + ); + expect(read).toEqual({ + priceUsd: 1.85, + fetchedAt: now, + minuteRemaining: 47, + monthRemaining: 12_345, + }); + }); + + it("returns null when the cached value is shape-incompatible", async () => { + const { kv, store } = createFakeKv(); + // Simulate a stale or hand-edited entry: missing `fetchedAt` is the + // only required field, so the reader treats it as a miss rather than + // throwing or returning garbage to consumers. + store.set(`${TENERO_PRICE_KV_PREFIX}stx`, JSON.stringify({ priceUsd: 1.85 })); + const result = await getCachedTokenPrice( + kv as unknown as KVNamespace, + "stx" + ); + expect(result).toBeNull(); + }); + + it("treats a non-finite priceUsd as null without throwing", async () => { + const { kv, store } = createFakeKv(); + store.set( + `${TENERO_PRICE_KV_PREFIX}stx`, + JSON.stringify({ priceUsd: "not a number", fetchedAt: 123 }) + ); + const result = await getCachedTokenPrice( + kv as unknown as KVNamespace, + "stx" + ); + expect(result).toEqual({ + priceUsd: null, + fetchedAt: 123, + minuteRemaining: null, + monthRemaining: null, + }); + }); +}); diff --git a/lib/external/tenero/__tests__/prices.test.ts b/lib/external/tenero/__tests__/prices.test.ts new file mode 100644 index 00000000..9d58c950 --- /dev/null +++ b/lib/external/tenero/__tests__/prices.test.ts @@ -0,0 +1,21 @@ +import { describe, it, expect } from "vitest"; +import { tokenIdToTeneroAddress } from "../prices"; + +describe("tokenIdToTeneroAddress", () => { + it("passes the literal 'stx' through unchanged", () => { + expect(tokenIdToTeneroAddress("stx")).toBe("stx"); + }); + + it("strips the ::asset suffix from a fully-qualified asset id", () => { + const sbtc = + "SM3VDXK3WZZSA84XXFKAFAF15NNZX32CTSG82JFQ4.sbtc-token::sbtc"; + expect(tokenIdToTeneroAddress(sbtc)).toBe( + "SM3VDXK3WZZSA84XXFKAFAF15NNZX32CTSG82JFQ4.sbtc-token" + ); + }); + + it("returns a bare contract id unchanged when there's no asset suffix", () => { + const bare = "SP4SZE494VC2YC5JYG7AYFQ44F5Q4PYV7DVMDPBG.ststx-token"; + expect(tokenIdToTeneroAddress(bare)).toBe(bare); + }); +}); diff --git a/lib/external/tenero/index.ts b/lib/external/tenero/index.ts new file mode 100644 index 00000000..0daf6cd3 --- /dev/null +++ b/lib/external/tenero/index.ts @@ -0,0 +1,14 @@ +export { + fetchTokenPriceUsd, + tokenIdToTeneroAddress, + type TeneroPriceResult, +} from "./prices"; +export { STATIC_TOKEN_IDS } from "./tokens"; +export { + getCachedTokenPrice, + getCachedTokenPrices, + setCachedTokenPrice, + TENERO_PRICE_KV_PREFIX, + TENERO_PRICE_KV_TTL_SECONDS, + type CachedTokenPrice, +} from "./kv-cache"; diff --git a/lib/external/tenero/kv-cache.ts b/lib/external/tenero/kv-cache.ts new file mode 100644 index 00000000..85725667 --- /dev/null +++ b/lib/external/tenero/kv-cache.ts @@ -0,0 +1,81 @@ +/** + * KV cache layer for Tenero token prices. + * + * Single key per token (`tenero:price:{tokenId}`) with the price and the + * timestamp it was written. The SchedulerDO's Tenero refresh task is the + * only writer; SSR routes and `/api/prices` are read-only consumers. + * + * Writes use a generous TTL ceiling (24h) so a paused scheduler still leaves + * a usable-but-stale value rather than nothing — the reader is responsible + * for deciding whether a stale value is acceptable using `fetchedAt`. + */ + +export const TENERO_PRICE_KV_PREFIX = "tenero:price:"; + +/** TTL ceiling for KV entries. Refresh cadence is 5min so this is just a safety net. */ +export const TENERO_PRICE_KV_TTL_SECONDS = 24 * 60 * 60; + +export interface CachedTokenPrice { + /** USD price; null means Tenero confirmed no published price (vs. fetch failure). */ + priceUsd: number | null; + /** Unix millis when this value was written. */ + fetchedAt: number; + /** Optional: minute-remaining at write time, for adaptive cadence inspection. */ + minuteRemaining: number | null; + /** Optional: month-remaining at write time. */ + monthRemaining: number | null; +} + +function kvKey(tokenId: string): string { + return `${TENERO_PRICE_KV_PREFIX}${tokenId}`; +} + +export async function getCachedTokenPrice( + kv: KVNamespace, + tokenId: string +): Promise { + const raw = await kv.get(kvKey(tokenId), "json"); + if (!raw) return null; + // Light shape check — anything unrecognized is treated as a cache miss + // rather than throwing, since we read this from SSR paths that must + // always render. + const obj = raw as Partial; + if (typeof obj.fetchedAt !== "number") return null; + return { + priceUsd: + typeof obj.priceUsd === "number" && Number.isFinite(obj.priceUsd) + ? obj.priceUsd + : null, + fetchedAt: obj.fetchedAt, + minuteRemaining: + typeof obj.minuteRemaining === "number" ? obj.minuteRemaining : null, + monthRemaining: + typeof obj.monthRemaining === "number" ? obj.monthRemaining : null, + }; +} + +export async function setCachedTokenPrice( + kv: KVNamespace, + tokenId: string, + value: CachedTokenPrice +): Promise { + await kv.put(kvKey(tokenId), JSON.stringify(value), { + expirationTtl: TENERO_PRICE_KV_TTL_SECONDS, + }); +} + +/** Read many token prices in parallel — useful for SSR paths that need a Map. */ +export async function getCachedTokenPrices( + kv: KVNamespace, + tokenIds: readonly string[] +): Promise> { + const out = new Map(); + if (tokenIds.length === 0) return out; + const results = await Promise.all( + tokenIds.map(async (id) => [id, await getCachedTokenPrice(kv, id)] as const) + ); + for (const [id, cached] of results) { + if (cached) out.set(id, cached); + } + return out; +} diff --git a/lib/external/tenero/prices.ts b/lib/external/tenero/prices.ts new file mode 100644 index 00000000..01684ec7 --- /dev/null +++ b/lib/external/tenero/prices.ts @@ -0,0 +1,95 @@ +/** + * Token price fetcher built on the Tenero wrapper. + * + * Single responsibility: given a token id (in the form `stx`, + * `SP...contract::asset`, or a bare contract id), call Tenero's + * `/v1/stacks/tokens/{contract_id}` endpoint and return a numeric USD price + * or null if the token isn't priced. + * + * Tenero's contract-id form drops the `::asset` suffix; native STX passes + * through as the literal `"stx"`. The mapping lives here (not in the wrapper) + * because it's price-endpoint-specific — other Tenero endpoints may want a + * different shape. + */ + +import { teneroFetch, extractTeneroRateLimit, type TeneroRateLimit } from "../tenero-fetch"; +import type { Logger } from "../../logging"; + +export interface TeneroPriceResult { + /** Parsed USD price, or null when the token has no published price or the fetch failed. */ + priceUsd: number | null; + /** Final response status (0 when the fetch threw before getting one). */ + status: number; + /** Rate-limit headers from the final response — surfaced for the scheduler's adaptive logic. */ + rateLimit: TeneroRateLimit; +} + +/** Strip the `::asset` suffix; native STX passes through as `"stx"`. */ +export function tokenIdToTeneroAddress(tokenId: string): string { + if (tokenId === "stx") return "stx"; + const idx = tokenId.indexOf("::"); + return idx >= 0 ? tokenId.slice(0, idx) : tokenId; +} + +/** + * Fetch a single token's USD price from Tenero. Never throws on non-2xx — + * callers (the scheduler refresh task) want to keep going on partial failure + * and have the rate-limit info surfaced for cadence decisions. + */ +export async function fetchTokenPriceUsd( + tokenId: string, + logger?: Logger, + apiKey?: string +): Promise { + const addr = tokenIdToTeneroAddress(tokenId); + const path = `/tokens/${encodeURIComponent(addr)}`; + + let response: Response; + try { + response = await teneroFetch(path, { logger, apiKey }); + } catch (error) { + logger?.warn("tenero.price_fetch_network_error", { + tokenId, + teneroAddress: addr, + error: String(error), + }); + return { + priceUsd: null, + status: 0, + rateLimit: { minuteRemaining: null, monthRemaining: null, type: null }, + }; + } + + const rateLimit = extractTeneroRateLimit(response); + + if (!response.ok) { + logger?.warn("tenero.price_fetch_non_2xx", { + tokenId, + teneroAddress: addr, + status: response.status, + rlMinuteRemaining: rateLimit.minuteRemaining, + rlMonthRemaining: rateLimit.monthRemaining, + }); + return { priceUsd: null, status: response.status, rateLimit }; + } + + let priceUsd: number | null = null; + try { + const body = (await response.json()) as { data?: { price_usd?: number | string | null } }; + const raw = body.data?.price_usd; + const parsed = typeof raw === "string" ? parseFloat(raw) : raw; + priceUsd = + typeof parsed === "number" && Number.isFinite(parsed) && parsed > 0 + ? parsed + : null; + } catch (error) { + logger?.warn("tenero.price_fetch_parse_error", { + tokenId, + teneroAddress: addr, + error: String(error), + }); + priceUsd = null; + } + + return { priceUsd, status: response.status, rateLimit }; +} diff --git a/lib/external/tenero/tokens.ts b/lib/external/tenero/tokens.ts new file mode 100644 index 00000000..a51246ce --- /dev/null +++ b/lib/external/tenero/tokens.ts @@ -0,0 +1,22 @@ +/** + * Active token set for Tenero price refresh. Locked to this static list + * (not dynamically discovered from `swaps.token_in`) because the + * leaderboard's `TOKEN_DECIMALS` table is the authority on what's + * priceable — discovering a token here that the leaderboard doesn't know + * the decimals for would fall back to `?? 6` and silently render the + * wrong USD figure with `allPriced: true`. + * + * Adding a new priceable token is a deliberate two-step edit: add to + * this list AND to `TOKEN_DECIMALS` in `app/leaderboard/page.tsx`, plus + * a Tenero probe to confirm `/v1/stacks/tokens/{contract_id}` returns + * 200 with a non-null `price_usd`. + * + * Future work (per #768 review): if this grows past ~30 tokens, consider + * splitting Tenero refresh into per-tick chunks so a slow run can't blow + * the alarm budget. + */ +export const STATIC_TOKEN_IDS: readonly string[] = [ + "stx", + "SM3VDXK3WZZSA84XXFKAFAF15NNZX32CTSG82JFQ4.sbtc-token::sbtc", + "SP4SZE494VC2YC5JYG7AYFQ44F5Q4PYV7DVMDPBG.ststx-token::ststx", +]; diff --git a/lib/inbox/d1-dual-write.ts b/lib/inbox/d1-dual-write.ts index fc71fb91..1c023d70 100644 --- a/lib/inbox/d1-dual-write.ts +++ b/lib/inbox/d1-dual-write.ts @@ -36,9 +36,18 @@ import type { AgentRecord } from "@/lib/types"; * payment_txid partial index (idx_inbox_payment_txid). * * @cloudflare/workers-types does not surface SQLite constraint codes — only - * the wrapped message string. We match the full constraint string verbatim - * (per Copilot review on #756) to avoid false positives if future schema - * changes introduce other tables/columns whose names contain `payment_txid`. + * the wrapped message string. We substring-match the fully-qualified + * `.` identifier inside the SQLite error message, which + * narrows to the exact constraint without requiring the surrounding + * wrapper text to stay byte-identical across runtime versions. + * + * False-positive risk: another error whose message coincidentally contains + * the literal `UNIQUE constraint failed: inbox_messages.payment_txid` would + * match. In practice SQLite only emits that phrasing for this specific + * constraint, so the risk is bounded by SQLite's own behavior — schema + * changes can't introduce a collision unless they reuse the + * `inbox_messages.payment_txid` column name. The Copilot review on #756 + * raised the concern; the substring approach is the deliberate trade-off. * * Re-check periodically against `@cloudflare/workers-types` releases — when * D1 introduces structured error codes, switch to those. diff --git a/lib/scheduler/__tests__/tenero-task.test.ts b/lib/scheduler/__tests__/tenero-task.test.ts new file mode 100644 index 00000000..ebdf2f03 --- /dev/null +++ b/lib/scheduler/__tests__/tenero-task.test.ts @@ -0,0 +1,201 @@ +import { + describe, + it, + expect, + vi, + beforeEach, + afterEach, +} from "vitest"; +import { runTeneroTask } from "../tenero-task"; + +/** Minimal logger double — captures events without console noise. */ +function createCapturingLogger() { + const events: Array<{ level: string; msg: string; ctx?: unknown }> = []; + const make = (level: string) => + (msg: string, ctx?: Record) => { + events.push({ level, msg, ctx }); + }; + const logger = { + debug: make("debug"), + info: make("info"), + warn: make("warn"), + error: make("error"), + child: () => logger, + }; + return { logger, events }; +} + +/** KV double — Map-backed; records every put for assertions. */ +function createFakeKv() { + const store = new Map(); + const puts: Array<{ key: string; value: string }> = []; + return { + kv: { + get: vi.fn(async (key: string, type?: "json") => { + const raw = store.get(key); + if (raw === undefined) return null; + return type === "json" ? JSON.parse(raw) : raw; + }), + put: vi.fn(async (key: string, value: string) => { + store.set(key, value); + puts.push({ key, value }); + }), + } as unknown as KVNamespace, + puts, + store, + }; +} + +/** + * Stub a Tenero response. Returns the global-fetch implementation the + * test should install for one specific request. + */ +function teneroResponse( + status: number, + opts: { + priceUsd?: number | string | null; + minuteRemaining?: number; + monthRemaining?: number; + } = {} +): Response { + const headers = new Headers({ "Content-Type": "application/json" }); + if (opts.minuteRemaining !== undefined) { + headers.set("x-ratelimit-minute-remaining", String(opts.minuteRemaining)); + } + if (opts.monthRemaining !== undefined) { + headers.set("x-ratelimit-month-remaining", String(opts.monthRemaining)); + } + const body = + opts.priceUsd === undefined + ? "{}" + : JSON.stringify({ data: { price_usd: opts.priceUsd } }); + return new Response(body, { status, headers }); +} + +let originalFetch: typeof globalThis.fetch; + +beforeEach(() => { + originalFetch = globalThis.fetch; +}); + +afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); +}); + +describe("runTeneroTask", () => { + it("happy path: writes a cache entry and bumps `succeeded`", async () => { + const { logger, events } = createCapturingLogger(); + const { kv, puts } = createFakeKv(); + + globalThis.fetch = vi.fn(async () => + teneroResponse(200, { + priceUsd: 1.85, + minuteRemaining: 99, + monthRemaining: 49_000, + }) + ) as unknown as typeof fetch; + + const fixedNow = 1_715_000_000_000; + const { result, rateLimited } = await runTeneroTask({ + logger, + kv, + tokenIds: ["stx"], + now: () => fixedNow, + }); + + expect(rateLimited).toBe(false); + expect(result.succeeded).toBe(1); + expect(result.failed).toBe(0); + expect(result.tokensAttempted).toBe(1); + expect(result.minuteRemaining).toBe(99); + expect(result.monthRemaining).toBe(49_000); + + expect(puts).toHaveLength(1); + expect(puts[0].key).toBe("tenero:price:stx"); + const written = JSON.parse(puts[0].value); + expect(written.priceUsd).toBe(1.85); + expect(written.fetchedAt).toBe(fixedNow); + expect(written.minuteRemaining).toBe(99); + + // Sanity: structured log events landed on the logger. + expect(events.some((e) => e.msg === "tenero.refresh_started")).toBe(true); + expect(events.some((e) => e.msg === "tenero.refresh_completed")).toBe(true); + }); + + it("5xx response: bumps `failed`, no KV write", async () => { + const { logger } = createCapturingLogger(); + const { kv, puts } = createFakeKv(); + + // teneroFetch retries 5xx once before giving up, so respond consistently. + globalThis.fetch = vi.fn(async () => + teneroResponse(503) + ) as unknown as typeof fetch; + + const { result, rateLimited } = await runTeneroTask({ + logger, + kv, + tokenIds: ["stx"], + }); + + expect(rateLimited).toBe(false); + expect(result.succeeded).toBe(0); + expect(result.failed).toBe(1); + expect(puts).toHaveLength(0); + }); + + it("429: flags rateLimited and bumps `failed`", async () => { + const { logger } = createCapturingLogger(); + const { kv, puts } = createFakeKv(); + + globalThis.fetch = vi.fn(async () => + teneroResponse(429) + ) as unknown as typeof fetch; + + const { result, rateLimited } = await runTeneroTask({ + logger, + kv, + tokenIds: ["stx"], + }); + + expect(rateLimited).toBe(true); + expect(result.succeeded).toBe(0); + expect(result.failed).toBe(1); + expect(puts).toHaveLength(0); + }); + + it("minuteRemaining <= 0 on a 200 response: flags rateLimited and breaks early", async () => { + const { logger, events } = createCapturingLogger(); + const { kv, puts } = createFakeKv(); + + // First call returns 200 but with the minute quota exhausted; the + // task should break out of the loop before hitting subsequent tokens. + globalThis.fetch = vi.fn(async () => + teneroResponse(200, { + priceUsd: 1.0, + minuteRemaining: 0, + monthRemaining: 30_000, + }) + ) as unknown as typeof fetch; + + const { result, rateLimited } = await runTeneroTask({ + logger, + kv, + tokenIds: [ + "stx", + "SM3VDXK3WZZSA84XXFKAFAF15NNZX32CTSG82JFQ4.sbtc-token::sbtc", + "SP4SZE494VC2YC5JYG7AYFQ44F5Q4PYV7DVMDPBG.ststx-token::ststx", + ], + }); + + expect(rateLimited).toBe(true); + // First token wrote successfully before the break. + expect(result.succeeded).toBe(1); + expect(puts).toHaveLength(1); + // Loop broke before processing tokens 2 + 3. + expect((globalThis.fetch as ReturnType).mock.calls).toHaveLength(1); + expect( + events.some((e) => e.msg === "tenero.minute_quota_exhausted_mid_run") + ).toBe(true); + }); +}); diff --git a/lib/scheduler/tenero-task.ts b/lib/scheduler/tenero-task.ts new file mode 100644 index 00000000..c2808865 --- /dev/null +++ b/lib/scheduler/tenero-task.ts @@ -0,0 +1,126 @@ +/** + * Tenero refresh task — pure orchestration of fetch + KV writes for the + * scheduler's per-tick price refresh. + * + * Extracted out of `SchedulerDO` (in `worker.ts`) so it can be tested + * without spinning up a Durable Object harness. The DO method becomes + * a thin wrapper that wires the dependencies and persists the result + * to DO storage; the actual fetch/cache/rate-limit logic lives here. + * + * Pattern follows `x402-sponsor-relay`'s split between durable-object + * orchestration and pure task functions. + */ + +import { fetchTokenPriceUsd } from "../external/tenero"; +import { setCachedTokenPrice } from "../external/tenero/kv-cache"; +import type { Logger } from "../logging"; + +export interface TeneroRunResult { + startedAt: number; + durationMs: number; + tokensAttempted: number; + succeeded: number; + failed: number; + minuteRemaining: number | null; + monthRemaining: number | null; +} + +export interface TeneroTaskDeps { + logger: Logger; + kv: KVNamespace; + tokenIds: readonly string[]; + apiKey?: string; + /** Test injection point. Defaults to `Date.now`. */ + now?: () => number; +} + +export interface TeneroTaskOutcome { + result: TeneroRunResult; + /** + * True when Tenero signalled rate-limiting during the run (HTTP 429 OR + * `x-ratelimit-minute-remaining <= 0`). Caller writes this to + * DO-storage `nextRunAfter.tenero` for adaptive backoff. + */ + rateLimited: boolean; +} + +export async function runTeneroTask( + deps: TeneroTaskDeps +): Promise { + const { logger, kv, tokenIds, apiKey } = deps; + const now = deps.now ?? Date.now; + const startedAt = now(); + + logger.info("tenero.refresh_started", { tokenCount: tokenIds.length }); + + let succeeded = 0; + let failed = 0; + let lastMinuteRemaining: number | null = null; + let lastMonthRemaining: number | null = null; + let rateLimited = false; + + for (const tokenId of tokenIds) { + const r = await fetchTokenPriceUsd(tokenId, logger, apiKey); + lastMinuteRemaining = r.rateLimit.minuteRemaining ?? lastMinuteRemaining; + lastMonthRemaining = r.rateLimit.monthRemaining ?? lastMonthRemaining; + + if (r.status === 0 || r.status >= 500) { + failed++; + } else if (r.status === 429) { + failed++; + rateLimited = true; + } else if (r.status === 200) { + try { + await setCachedTokenPrice(kv, tokenId, { + priceUsd: r.priceUsd, + fetchedAt: now(), + minuteRemaining: r.rateLimit.minuteRemaining, + monthRemaining: r.rateLimit.monthRemaining, + }); + succeeded++; + } catch (error) { + logger.warn("tenero.kv_write_failed", { + tokenId, + error: String(error), + }); + failed++; + } + } else { + failed++; + } + + if ( + r.rateLimit.minuteRemaining !== null && + r.rateLimit.minuteRemaining <= 0 + ) { + rateLimited = true; + logger.warn("tenero.minute_quota_exhausted_mid_run", { + rlMinuteRemaining: r.rateLimit.minuteRemaining, + processed: succeeded + failed, + remaining: tokenIds.length - (succeeded + failed), + }); + break; + } + } + + const result: TeneroRunResult = { + startedAt, + durationMs: now() - startedAt, + tokensAttempted: tokenIds.length, + succeeded, + failed, + minuteRemaining: lastMinuteRemaining, + monthRemaining: lastMonthRemaining, + }; + + logger.info("tenero.refresh_completed", { + succeeded, + failed, + durationMs: result.durationMs, + rlMinuteRemaining: lastMinuteRemaining, + rlMonthRemaining: lastMonthRemaining, + rateLimited, + }); + + return { result, rateLimited }; +} diff --git a/worker.ts b/worker.ts index 86c516b5..bd3c5540 100644 --- a/worker.ts +++ b/worker.ts @@ -3,9 +3,283 @@ import openNextWorker, { DOQueueHandler, DOShardedTagCache, } from "./.open-next/worker.js"; -import { createConsoleLogger, createLogger, isLogsRPC } from "./lib/logging"; +import { DurableObject } from "cloudflare:workers"; +import { + createConsoleLogger, + createLogger, + isLogsRPC, + type Logger, +} from "./lib/logging"; import { getPaymentRepoVersion } from "./lib/inbox/payment-logging"; import { processInboxReconciliationQueue } from "./lib/inbox/reconciliation-queue"; +import { STATIC_TOKEN_IDS } from "./lib/external/tenero"; +import { + runTeneroTask, + type TeneroRunResult, +} from "./lib/scheduler/tenero-task"; + +// ─────────────────────────── SchedulerDO ─────────────────────────── +// +// Defined inline at the worker entry (not imported from a separate file) +// so OpenNext + wrangler's esbuild bundle includes the class body. When +// SchedulerDO was imported from `./lib/scheduler/scheduler-do`, the class +// was dropped from the deployed bundle even though it appeared in `export +// { SchedulerDO }` — workerd then refused to start with "no such actor +// class; c = SchedulerDO" and every route returned 404 with +// x-preview-user-error: true. See PR #743 build logs at 07:28Z and 07:34Z, +// and OpenNext issue #502 for the broader context on custom-DO bundling +// failures with this adapter. +// +// Storage layout (this.ctx.storage): +// - lastTeneroRunAt — unix millis when the Tenero task last completed +// - lastTeneroResult — { succeeded, failed, minuteRemaining, monthRemaining } +// - consecutiveFailures — { tenero: number } +// - pausedUntil — unix millis; alarm() is a no-op until this passes +// - nextRunAfter — { tenero?: number }; adaptive backoff per task +// +// Long-lived cursors stay in D1 per issue #768 — the DO holds only its +// own bookkeeping. + +const TENERO_INTERVAL_MS = 5 * 60 * 1000; +const ALARM_TICK_MS = TENERO_INTERVAL_MS; +const TENERO_RATELIMIT_BACKOFF_MS = 5 * 60 * 1000; + +export interface SchedulerStatus { + now: number; + pausedUntil: number | null; + lastTeneroRunAt: number | null; + lastTeneroResult: TeneroRunResult | null; + consecutiveFailures: { tenero: number }; + nextRunAfter: { tenero: number | null }; + nextAlarmAt: number | null; +} + +type StoredScheduler = { + lastTeneroRunAt?: number; + lastTeneroResult?: TeneroRunResult; + consecutiveFailures?: { tenero: number }; + pausedUntil?: number; + nextRunAfter?: { tenero?: number }; +}; + +export class SchedulerDO extends DurableObject { + constructor(state: DurableObjectState, env: CloudflareEnv) { + super(state, env); + // Ensure an alarm is always armed. Idempotent — getAlarm() returns + // null if none is set. + this.ctx.blockConcurrencyWhile(async () => { + const current = await this.ctx.storage.getAlarm(); + if (current === null) { + await this.ctx.storage.setAlarm(Date.now() + ALARM_TICK_MS); + } + }); + } + + async status(): Promise { + const s = await this.readStored(); + const nextAlarmAt = await this.ctx.storage.getAlarm(); + return { + now: Date.now(), + pausedUntil: s.pausedUntil ?? null, + lastTeneroRunAt: s.lastTeneroRunAt ?? null, + lastTeneroResult: s.lastTeneroResult ?? null, + consecutiveFailures: { tenero: s.consecutiveFailures?.tenero ?? 0 }, + nextRunAfter: { tenero: s.nextRunAfter?.tenero ?? null }, + nextAlarmAt, + }; + } + + async refreshNow(task: "tenero" | "all"): Promise<{ tenero?: TeneroRunResult }> { + const logger = this.makeLogger({ trigger: "refreshNow", task }); + const out: { tenero?: TeneroRunResult } = {}; + if (task === "tenero" || task === "all") { + out.tenero = await this.runTenero(logger); + } + return out; + } + + async pauseUntil(timestamp: number): Promise { + await this.ctx.storage.put("pausedUntil", timestamp); + this.makeLogger({ trigger: "pauseUntil" }).warn("scheduler.paused", { + pausedUntil: timestamp, + }); + } + + async resume(): Promise { + await this.ctx.storage.delete("pausedUntil"); + this.makeLogger({ trigger: "resume" }).info("scheduler.resumed", {}); + } + + async alarm(): Promise { + const tickStartedAt = Date.now(); + const logger = this.makeLogger({ trigger: "alarm", tickStartedAt }); + + try { + const stored = await this.readStored(); + + if (stored.pausedUntil && stored.pausedUntil > tickStartedAt) { + logger.info("scheduler.alarm_skipped_paused", { + pausedUntil: stored.pausedUntil, + }); + return; + } + + // TODO(#768 follow-up): once a second task lands (competition Hiro + // sweep, then balance snapshots), this branching shape becomes a + // copy-paste smell. Refactor to a task registry: + // for (const task of TASKS) if (task.isDue(stored, tickStartedAt)) + // await task.run(logger, this.ctx); + // Each task owns its own cadence, persist helper, and failure key. + const teneroNextRunAfter = stored.nextRunAfter?.tenero ?? 0; + const teneroDue = + teneroNextRunAfter <= tickStartedAt && + (stored.lastTeneroRunAt ?? 0) + TENERO_INTERVAL_MS <= + tickStartedAt + 1_000; + + if (teneroDue) { + try { + await this.runTenero(logger); + } catch (error) { + logger.error("scheduler.tenero_unexpected_error", { + error: String(error), + stack: error instanceof Error ? error.stack : undefined, + }); + await this.bumpFailures("tenero"); + } + } else { + logger.debug("scheduler.tenero_not_due", { + lastRunAt: stored.lastTeneroRunAt ?? null, + nextRunAfter: teneroNextRunAfter || null, + }); + } + } finally { + await this.ctx.storage.setAlarm(Date.now() + ALARM_TICK_MS); + } + } + + // TODO(#768 follow-up): when the balance task ships, give each task a + // bounded slice of the tick (e.g. AbortSignal.timeout(30_000) per + // task) so a slow Hiro response can't starve Tenero refresh and vice + // versa. Today Tenero is the only task and the static token set + // bounds it implicitly; revisit when there's contention. + // + // The orchestration body is `runTeneroTask` in + // `lib/scheduler/tenero-task.ts` — kept testable without a DO harness. + // This wrapper only wires DO-scoped dependencies and persists the run + // result + failure counters / backoff to DO storage. + private async runTenero(parentLogger: Logger): Promise { + const logger = parentLogger.child + ? parentLogger.child({ task: "tenero" }) + : parentLogger; + + const { result, rateLimited } = await runTeneroTask({ + logger, + kv: this.env.VERIFIED_AGENTS, + tokenIds: STATIC_TOKEN_IDS, + apiKey: this.lookupTeneroApiKey(), + }); + + await this.persistTeneroResult(result, { rateLimited }); + return result; + } + + /** + * Read the DO's bookkeeping in a single parallel batch of targeted gets. + * `storage.list({ prefix: "" })` scans every stored key — fine at today's + * 5 keys, but the *point* of this DO is to grow more tasks (each with + * its own cursors and `lastResult`), so targeted `get` calls keep + * read cost bounded by the schema, not the storage size. + * + * Pattern mirrors `x402-sponsor-relay/src/durable-objects/nonce-do.ts`. + */ + private async readStored(): Promise { + const [ + lastTeneroRunAt, + lastTeneroResult, + consecutiveFailures, + pausedUntil, + nextRunAfter, + ] = await Promise.all([ + this.ctx.storage.get("lastTeneroRunAt"), + this.ctx.storage.get("lastTeneroResult"), + this.ctx.storage.get<{ tenero: number }>("consecutiveFailures"), + this.ctx.storage.get("pausedUntil"), + this.ctx.storage.get<{ tenero?: number }>("nextRunAfter"), + ]); + return { + ...(typeof lastTeneroRunAt === "number" ? { lastTeneroRunAt } : {}), + ...(lastTeneroResult ? { lastTeneroResult } : {}), + ...(consecutiveFailures ? { consecutiveFailures } : {}), + ...(typeof pausedUntil === "number" ? { pausedUntil } : {}), + ...(nextRunAfter ? { nextRunAfter } : {}), + }; + } + + private async persistTeneroResult( + result: TeneroRunResult, + opts: { rateLimited: boolean } + ): Promise { + await this.ctx.storage.put("lastTeneroRunAt", Date.now()); + await this.ctx.storage.put("lastTeneroResult", result); + + if (result.succeeded > 0 && result.failed === 0 && !opts.rateLimited) { + await this.clearFailures("tenero"); + const nextRunAfter = ((await this.ctx.storage.get<{ tenero?: number }>( + "nextRunAfter" + )) ?? {}) as { tenero?: number }; + if (nextRunAfter.tenero) { + delete nextRunAfter.tenero; + await this.ctx.storage.put("nextRunAfter", nextRunAfter); + } + } else if (result.failed > 0 || opts.rateLimited) { + await this.bumpFailures("tenero"); + } + + if (opts.rateLimited) { + const nextRunAfter = ((await this.ctx.storage.get<{ tenero?: number }>( + "nextRunAfter" + )) ?? {}) as { tenero?: number }; + nextRunAfter.tenero = Date.now() + TENERO_RATELIMIT_BACKOFF_MS; + await this.ctx.storage.put("nextRunAfter", nextRunAfter); + } + } + + private async bumpFailures(task: "tenero"): Promise { + const cur = ((await this.ctx.storage.get<{ tenero: number }>( + "consecutiveFailures" + )) ?? { tenero: 0 }) as { tenero: number }; + cur[task] = (cur[task] ?? 0) + 1; + await this.ctx.storage.put("consecutiveFailures", cur); + } + + private async clearFailures(task: "tenero"): Promise { + const cur = ((await this.ctx.storage.get<{ tenero: number }>( + "consecutiveFailures" + )) ?? { tenero: 0 }) as { tenero: number }; + if (cur[task] === 0) return; + cur[task] = 0; + await this.ctx.storage.put("consecutiveFailures", cur); + } + + private makeLogger(extra: Record): Logger { + const ctxBase = { + path: "/__do/scheduler", + doName: "scheduler", + ...extra, + }; + return isLogsRPC(this.env.LOGS) + ? createLogger(this.env.LOGS, this.ctx, ctxBase) + : createConsoleLogger(ctxBase); + } + + private lookupTeneroApiKey(): string | undefined { + const key = (this.env as unknown as { TENERO_API_KEY?: string }) + .TENERO_API_KEY; + return typeof key === "string" && key.length > 0 ? key : undefined; + } +} + +// ─────────────────────────── Exports ─────────────────────────── export { BucketCachePurge, DOQueueHandler, DOShardedTagCache }; diff --git a/wrangler.jsonc b/wrangler.jsonc index 41fd1948..86a8398b 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -82,18 +82,27 @@ ] }, - // Durable Object migration history. - // - // v1 was applied to the `landing-page` worker out-of-band during PR - // #743 experimentation (2026-05-12). Main does not reference the - // SchedulerDO class — so v2 deletes it, restoring the worker to a - // clean no-DO state. Both tags must be declared so wrangler can - // diff against CF's current migration history. Once v2 lands, do - // not remove these entries: future deploys still need to see the - // full history to deploy successfully. + // SchedulerDO: single instance coordinating periodic background work. + // Initial scope: Tenero price refresh (every ~5 min) writing to + // VERIFIED_AGENTS KV under `tenero:price:{tokenId}`. Competition Hiro + // sweep + balance snapshots land in follow-up PRs. See issue #768. + // Singleton resolved via env.SCHEDULER.idFromName("v1") — the instance + // name is independent of the migration tag, so it stays "v1" even + // though the migration history is up to v3. + "durable_objects": { + "bindings": [ + { "name": "SCHEDULER", "class_name": "SchedulerDO" } + ] + }, + // Migration history reflects what's been applied to CF, not just what + // this PR introduces. v1 + v2 happened during the original #743 + // experimentation (registered, then deleted via hotfix #772). v3 + // reintroduces the class cleanly. Keep all three declared — wrangler + // needs the full history to deploy without errors. "migrations": [ { "tag": "v1", "new_sqlite_classes": ["SchedulerDO"] }, - { "tag": "v2", "deleted_classes": ["SchedulerDO"] } + { "tag": "v2", "deleted_classes": ["SchedulerDO"] }, + { "tag": "v3", "new_sqlite_classes": ["SchedulerDO"] } ], /** @@ -181,9 +190,15 @@ ] }, + "durable_objects": { + "bindings": [ + { "name": "SCHEDULER", "class_name": "SchedulerDO" } + ] + }, "migrations": [ { "tag": "v1", "new_sqlite_classes": ["SchedulerDO"] }, - { "tag": "v2", "deleted_classes": ["SchedulerDO"] } + { "tag": "v2", "deleted_classes": ["SchedulerDO"] }, + { "tag": "v3", "new_sqlite_classes": ["SchedulerDO"] } ] }, @@ -266,9 +281,15 @@ ] }, + "durable_objects": { + "bindings": [ + { "name": "SCHEDULER", "class_name": "SchedulerDO" } + ] + }, "migrations": [ { "tag": "v1", "new_sqlite_classes": ["SchedulerDO"] }, - { "tag": "v2", "deleted_classes": ["SchedulerDO"] } + { "tag": "v2", "deleted_classes": ["SchedulerDO"] }, + { "tag": "v3", "new_sqlite_classes": ["SchedulerDO"] } ] } }