Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/opencode/src/altimate/telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,14 @@ export namespace Telemetry {
session_id: string
/** skipped = no cache or stale, passed = valid SQL, blocked = invalid SQL caught, error = validation itself failed */
outcome: "skipped" | "passed" | "blocked" | "error"
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, validation_exception */
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, dispatcher_failed, validation_exception */
reason: string
/** warehouse driver type (postgres, snowflake, bigquery, ...) — enables per-warehouse catch-rate analysis */
warehouse_type: string
/** read / write / unknown — enables per-query-type analysis */
query_type: string
/** SHA-256 prefix of masked SQL — join key to sql_execute_failure events for same query */
masked_sql_hash: string
schema_columns: number
/** true when schema scan hit the column-scan cap — flags samples biased by large-warehouse truncation */
schema_truncated: boolean
Expand Down
93 changes: 61 additions & 32 deletions packages/opencode/src/altimate/tools/sql-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const SqlExecuteTool = Tool.define("sql_execute", {
// but does NOT block execution. Used to measure catch rate before deciding
// whether to enable blocking in a future release. Fire-and-forget so it
// doesn't add latency to the sql_execute hot path.
preValidateSql(args.query, args.warehouse).catch(() => {})
preValidateSql(args.query, args.warehouse, queryType).catch(() => {})
// altimate_change end

try {
Expand Down Expand Up @@ -115,22 +115,39 @@ interface PreValidationResult {
error?: string
}

async function preValidateSql(sql: string, warehouse?: string): Promise<PreValidationResult> {
async function preValidateSql(sql: string, warehouse: string | undefined, queryType: string): Promise<PreValidationResult> {
const startTime = Date.now()
// Yield the event loop before heavy synchronous SQLite work so concurrent
// tasks aren't blocked. Bun's sqlite API is sync and listColumns can touch
// hundreds of thousands of rows for large warehouses.
await new Promise<void>((resolve) => setImmediate(resolve))

// Precompute correlation fields used in every telemetry event this function emits.
const maskedSqlHash = Telemetry.hashError(Telemetry.maskString(sql))

try {
// Resolve the warehouse the same way sql.execute's fallback path does:
// when caller omits `warehouse`, sql.execute uses Registry.list()[0].
// Matching that here keeps the shadow validation aligned with actual
// execution (dbt-routed queries are a known gap — they short-circuit
// before this fallback, so validation may use a different warehouse
// than the one dbt selects).
const registered = Registry.list().warehouses
let warehouseName = warehouse
if (!warehouseName) {
const registered = Registry.list().warehouses
warehouseName = registered[0]?.name
}
const warehouseInfo = registered.find((w) => w.name === warehouseName)
const warehouseType = warehouseInfo?.type ?? "unknown"

const ctx: TrackCtx = {
warehouse_type: warehouseType,
query_type: queryType,
masked_sql_hash: maskedSqlHash,
}

if (!warehouseName) {
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

Expand All @@ -139,31 +156,39 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid

const warehouseStatus = status.warehouses.find((w) => w.name === warehouseName)
if (!warehouseStatus?.last_indexed) {
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

// Check cache freshness
const cacheAge = Date.now() - new Date(warehouseStatus.last_indexed).getTime()
if (cacheAge > CACHE_TTL_MS) {
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

// Build schema context from cached columns
const columns = cache.listColumns(warehouseName, COLUMN_SCAN_LIMIT)
const schemaTruncated = columns.length >= COLUMN_SCAN_LIMIT
if (columns.length === 0) {
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

const schemaContext: Record<string, any> = {}
// Build schema context keyed by fully-qualified name (database.schema.table)
// so multi-database warehouses don't collide on schema+table alone.
// Dedupe columns per table to defend against residual collisions.
const schemaContext: Record<string, { name: string; type: string; nullable: boolean }[]> = {}
const seenColumns: Record<string, Set<string>> = {}
for (const col of columns) {
const tableName = col.schema_name ? `${col.schema_name}.${col.table}` : col.table
const tableName = [col.database, col.schema_name, col.table].filter(Boolean).join(".")
if (!tableName) continue
if (!schemaContext[tableName]) {
schemaContext[tableName] = []
seenColumns[tableName] = new Set()
}
if (seenColumns[tableName].has(col.name)) continue
seenColumns[tableName].add(col.name)
schemaContext[tableName].push({
name: col.name,
type: col.data_type || "VARCHAR",
Expand All @@ -178,60 +203,61 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid
schema_context: schemaContext,
})

// If the dispatcher itself failed, don't treat missing data as "valid".
if (!validationResult.success) {
const errMsg = typeof validationResult.error === "string" ? validationResult.error : undefined
trackPreValidation("error", "dispatcher_failed", 0, Date.now() - startTime, false, ctx, errMsg)
return { blocked: false }
}

const data = (validationResult.data ?? {}) as Record<string, any>
const errors = Array.isArray(data.errors) ? data.errors : []
const isValid = data.valid !== false && errors.length === 0

if (isValid) {
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated)
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated, ctx)
return { blocked: false }
}

// Only block on high-confidence structural errors
const structuralErrors = errors.filter((e: any) => {
const msg = (e.message ?? "").toLowerCase()
return msg.includes("column") || msg.includes("table") || msg.includes("not found") || msg.includes("does not exist")
return /\b(column|table|relation|identifier|not found|does not exist)\b/.test(msg)
})

if (structuralErrors.length === 0) {
// Non-structural errors (ambiguous cases) — let them through
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated)
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated, ctx)
return { blocked: false }
}

// Build helpful error with available columns
const errorMsgs = structuralErrors.map((e: any) => e.message).join("\n")
const referencedTables = Object.keys(schemaContext).slice(0, 10)
const availableColumns = referencedTables
.map((t) => `${t}: ${schemaContext[t].map((c: any) => c.name).join(", ")}`)
.join("\n")

const errorOutput = [
`Pre-execution validation failed (validated against cached schema):`,
``,
errorMsgs,
``,
`Available tables and columns:`,
availableColumns,
``,
`Fix the query and retry. If the schema cache is outdated, run schema_index to refresh it.`,
].join("\n")

trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, errorMsgs)
return { blocked: true, error: errorOutput }
trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, ctx, errorMsgs)
// Shadow mode: caller discards the result. When blocking is enabled in the
// future, build errorOutput here with the structural errors and
// schemaContext keys for user-facing guidance.
return { blocked: false }
} catch {
// Validation failure should never block execution
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false)
const ctx: TrackCtx = { warehouse_type: "unknown", query_type: queryType, masked_sql_hash: maskedSqlHash }
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}
}

interface TrackCtx {
warehouse_type: string
query_type: string
masked_sql_hash: string
}

function trackPreValidation(
outcome: "skipped" | "passed" | "blocked" | "error",
reason: string,
schema_columns: number,
duration_ms: number,
schema_truncated: boolean,
ctx: TrackCtx,
error_message?: string,
) {
// Mask schema identifiers (table / column names, paths, user IDs) from the
Expand All @@ -244,6 +270,9 @@ function trackPreValidation(
session_id: Telemetry.getContext().sessionId,
outcome,
reason,
warehouse_type: ctx.warehouse_type,
query_type: ctx.query_type,
masked_sql_hash: ctx.masked_sql_hash,
schema_columns,
schema_truncated,
duration_ms,
Expand Down
3 changes: 2 additions & 1 deletion packages/opencode/test/telemetry/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,12 @@ const ALL_EVENT_TYPES: Telemetry.Event["type"][] = [
"sql_execute_failure",
"feature_suggestion",
"core_failure",
"sql_pre_validation",
]

describe("telemetry.event-types", () => {
test("all event types are valid", () => {
expect(ALL_EVENT_TYPES.length).toBe(42)
expect(ALL_EVENT_TYPES.length).toBe(43)
})
})

Expand Down
Loading