diff --git a/packages/opencode/src/altimate/telemetry/index.ts b/packages/opencode/src/altimate/telemetry/index.ts index 4ab8fa2a7..023879bfb 100644 --- a/packages/opencode/src/altimate/telemetry/index.ts +++ b/packages/opencode/src/altimate/telemetry/index.ts @@ -644,8 +644,14 @@ export namespace Telemetry { session_id: string /** skipped = no cache or stale, passed = valid SQL, blocked = invalid SQL caught, error = validation itself failed */ outcome: "skipped" | "passed" | "blocked" | "error" - /** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, validation_exception */ + /** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, dispatcher_failed, validation_exception */ reason: string + /** warehouse driver type (postgres, snowflake, bigquery, ...) — enables per-warehouse catch-rate analysis */ + warehouse_type: string + /** read / write / unknown — enables per-query-type analysis */ + query_type: string + /** SHA-256 prefix of masked SQL — join key to sql_execute_failure events for same query */ + masked_sql_hash: string schema_columns: number /** true when schema scan hit the column-scan cap — flags samples biased by large-warehouse truncation */ schema_truncated: boolean diff --git a/packages/opencode/src/altimate/tools/sql-execute.ts b/packages/opencode/src/altimate/tools/sql-execute.ts index 3bcb77d21..a0588b413 100644 --- a/packages/opencode/src/altimate/tools/sql-execute.ts +++ b/packages/opencode/src/altimate/tools/sql-execute.ts @@ -43,7 +43,7 @@ export const SqlExecuteTool = Tool.define("sql_execute", { // but does NOT block execution. Used to measure catch rate before deciding // whether to enable blocking in a future release. Fire-and-forget so it // doesn't add latency to the sql_execute hot path. - preValidateSql(args.query, args.warehouse).catch(() => {}) + preValidateSql(args.query, args.warehouse, queryType).catch(() => {}) // altimate_change end try { @@ -115,8 +115,16 @@ interface PreValidationResult { error?: string } -async function preValidateSql(sql: string, warehouse?: string): Promise { +async function preValidateSql(sql: string, warehouse: string | undefined, queryType: string): Promise { const startTime = Date.now() + // Yield the event loop before heavy synchronous SQLite work so concurrent + // tasks aren't blocked. Bun's sqlite API is sync and listColumns can touch + // hundreds of thousands of rows for large warehouses. + await new Promise((resolve) => setImmediate(resolve)) + + // Precompute correlation fields used in every telemetry event this function emits. + const maskedSqlHash = Telemetry.hashError(Telemetry.maskString(sql)) + try { // Resolve the warehouse the same way sql.execute's fallback path does: // when caller omits `warehouse`, sql.execute uses Registry.list()[0]. @@ -124,13 +132,22 @@ async function preValidateSql(sql: string, warehouse?: string): Promise w.name === warehouseName) + const warehouseType = warehouseInfo?.type ?? "unknown" + + const ctx: TrackCtx = { + warehouse_type: warehouseType, + query_type: queryType, + masked_sql_hash: maskedSqlHash, + } + if (!warehouseName) { - trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false) + trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx) return { blocked: false } } @@ -139,14 +156,14 @@ async function preValidateSql(sql: string, warehouse?: string): Promise w.name === warehouseName) if (!warehouseStatus?.last_indexed) { - trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false) + trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx) return { blocked: false } } // Check cache freshness const cacheAge = Date.now() - new Date(warehouseStatus.last_indexed).getTime() if (cacheAge > CACHE_TTL_MS) { - trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false) + trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false, ctx) return { blocked: false } } @@ -154,16 +171,24 @@ async function preValidateSql(sql: string, warehouse?: string): Promise= COLUMN_SCAN_LIMIT if (columns.length === 0) { - trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false) + trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false, ctx) return { blocked: false } } - const schemaContext: Record = {} + // Build schema context keyed by fully-qualified name (database.schema.table) + // so multi-database warehouses don't collide on schema+table alone. + // Dedupe columns per table to defend against residual collisions. + const schemaContext: Record = {} + const seenColumns: Record> = {} for (const col of columns) { - const tableName = col.schema_name ? `${col.schema_name}.${col.table}` : col.table + const tableName = [col.database, col.schema_name, col.table].filter(Boolean).join(".") + if (!tableName) continue if (!schemaContext[tableName]) { schemaContext[tableName] = [] + seenColumns[tableName] = new Set() } + if (seenColumns[tableName].has(col.name)) continue + seenColumns[tableName].add(col.name) schemaContext[tableName].push({ name: col.name, type: col.data_type || "VARCHAR", @@ -178,60 +203,61 @@ async function preValidateSql(sql: string, warehouse?: string): Promise const errors = Array.isArray(data.errors) ? data.errors : [] const isValid = data.valid !== false && errors.length === 0 if (isValid) { - trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated) + trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated, ctx) return { blocked: false } } // Only block on high-confidence structural errors const structuralErrors = errors.filter((e: any) => { const msg = (e.message ?? "").toLowerCase() - return msg.includes("column") || msg.includes("table") || msg.includes("not found") || msg.includes("does not exist") + return /\b(column|table|view|relation|identifier|not found|does not exist)\b/.test(msg) }) if (structuralErrors.length === 0) { // Non-structural errors (ambiguous cases) — let them through - trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated) + trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated, ctx) return { blocked: false } } - // Build helpful error with available columns const errorMsgs = structuralErrors.map((e: any) => e.message).join("\n") - const referencedTables = Object.keys(schemaContext).slice(0, 10) - const availableColumns = referencedTables - .map((t) => `${t}: ${schemaContext[t].map((c: any) => c.name).join(", ")}`) - .join("\n") - - const errorOutput = [ - `Pre-execution validation failed (validated against cached schema):`, - ``, - errorMsgs, - ``, - `Available tables and columns:`, - availableColumns, - ``, - `Fix the query and retry. If the schema cache is outdated, run schema_index to refresh it.`, - ].join("\n") - - trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, errorMsgs) - return { blocked: true, error: errorOutput } + trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, ctx, errorMsgs) + // Shadow mode: caller discards the result. When blocking is enabled in the + // future, build errorOutput here with the structural errors and + // schemaContext keys for user-facing guidance. + return { blocked: false } } catch { // Validation failure should never block execution - trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false) + const ctx: TrackCtx = { warehouse_type: "unknown", query_type: queryType, masked_sql_hash: maskedSqlHash } + trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false, ctx) return { blocked: false } } } +interface TrackCtx { + warehouse_type: string + query_type: string + masked_sql_hash: string +} + function trackPreValidation( outcome: "skipped" | "passed" | "blocked" | "error", reason: string, schema_columns: number, duration_ms: number, schema_truncated: boolean, + ctx: TrackCtx, error_message?: string, ) { // Mask schema identifiers (table / column names, paths, user IDs) from the @@ -244,6 +270,9 @@ function trackPreValidation( session_id: Telemetry.getContext().sessionId, outcome, reason, + warehouse_type: ctx.warehouse_type, + query_type: ctx.query_type, + masked_sql_hash: ctx.masked_sql_hash, schema_columns, schema_truncated, duration_ms, diff --git a/packages/opencode/test/telemetry/telemetry.test.ts b/packages/opencode/test/telemetry/telemetry.test.ts index 10aef0aa9..8500ac6c3 100644 --- a/packages/opencode/test/telemetry/telemetry.test.ts +++ b/packages/opencode/test/telemetry/telemetry.test.ts @@ -245,11 +245,12 @@ const ALL_EVENT_TYPES: Telemetry.Event["type"][] = [ "sql_execute_failure", "feature_suggestion", "core_failure", + "sql_pre_validation", ] describe("telemetry.event-types", () => { test("all event types are valid", () => { - expect(ALL_EVENT_TYPES.length).toBe(42) + expect(ALL_EVENT_TYPES.length).toBe(43) }) })