From f787dd22f9a9a104df2a4a86fa32dbeac1e27306 Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Tue, 6 Jan 2026 18:12:21 +0900 Subject: [PATCH] chore(format): apply prettier fixes --- cloud/clickhouse/search.test.ts | 539 ++++++++++++++++++ cloud/clickhouse/search.ts | 976 ++++++++++++++++++++++++++++++++ cloud/tests/clickhouse.ts | 6 +- 3 files changed, 1518 insertions(+), 3 deletions(-) create mode 100644 cloud/clickhouse/search.test.ts create mode 100644 cloud/clickhouse/search.ts diff --git a/cloud/clickhouse/search.test.ts b/cloud/clickhouse/search.test.ts new file mode 100644 index 0000000000..09e60f4f6f --- /dev/null +++ b/cloud/clickhouse/search.test.ts @@ -0,0 +1,539 @@ +import { readFileSync } from "node:fs"; +import { + describe, + it, + expect, + TestClickHouse, + clickHouseAvailable, +} from "@/tests/clickhouse"; +import { Effect, Layer } from "effect"; +import { ClickHouse } from "@/clickhouse/client"; +import { ClickHouseSearch } from "@/clickhouse/search"; +import { beforeAll, afterAll } from "vitest"; + +// Test data IDs +const TEST_ENVIRONMENT_ID = "00000000-0000-0000-0000-000000000001"; +const TEST_PROJECT_ID = "00000000-0000-0000-0000-000000000002"; +const TEST_ORG_ID = "00000000-0000-0000-0000-000000000003"; +const TEST_TRACE_ID = "abc123def456"; +const TEST_SPAN_ID_1 = "span001"; +const TEST_SPAN_ID_2 = "span002"; + +const CLICKHOUSE_DATABASE = + process.env.CLICKHOUSE_DATABASE ?? "mirascope_analytics"; + +const MIGRATION_SQL = readFileSync( + new URL("./migrations/00001_create_spans_analytics.up.sql", import.meta.url), + "utf8", +); + +const renderMigrationSql = (database: string) => + MIGRATION_SQL.replaceAll("{{database}}", database); + +const splitStatements = (sql: string): string[] => + sql + .split(";") + .map((statement) => statement.trim()) + .filter((statement) => statement.length > 0); + +const TestSearchLayer = ClickHouseSearch.Default.pipe( + Layer.provide(TestClickHouse), +); + +const provideSearchLayer = (effect: Effect.Effect) => + effect.pipe(Effect.provide(TestSearchLayer)); + +// Setup and teardown for test data +const setupSchema = Effect.gen(function* () { + const client = yield* ClickHouse; + const statements = splitStatements(renderMigrationSql(CLICKHOUSE_DATABASE)); + + for (const statement of statements) { + yield* client.command(statement); + } + + const tables = yield* client.unsafeQuery<{ name: string }>( + `SELECT name FROM system.tables + WHERE database = {database:String} + AND name IN ('spans_analytics', 'annotations_analytics')`, + { database: CLICKHOUSE_DATABASE }, + ); + + if (tables.length < 2) { + throw new Error("ClickHouse test schema is missing required tables"); + } +}); + +const testSpans = [ + { + id: "11111111-1111-1111-1111-111111111111", + trace_db_id: "22222222-2222-2222-2222-222222222222", + trace_id: TEST_TRACE_ID, + span_id: TEST_SPAN_ID_1, + parent_span_id: null, + environment_id: TEST_ENVIRONMENT_ID, + project_id: TEST_PROJECT_ID, + organization_id: TEST_ORG_ID, + start_time: "2024-01-15 10:00:00.000000000", + end_time: "2024-01-15 10:00:01.000000000", + duration_ms: 1000, + name: "llm call openai", + kind: 1, + status_code: 0, + status_message: null, + model: "gpt-4", + provider: "openai", + input_tokens: 100, + output_tokens: 50, + total_tokens: 150, + cost_usd: 0.01, + function_id: null, + function_name: "my_function", + function_version: "v1", + error_type: null, + error_message: null, + attributes: "{}", + events: null, + links: null, + service_name: "test-service", + service_version: "1.0.0", + resource_attributes: null, + created_at: "2024-01-15 10:00:00.000", + _version: Date.now(), + }, + { + id: "33333333-3333-3333-3333-333333333333", + trace_db_id: "22222222-2222-2222-2222-222222222222", + trace_id: TEST_TRACE_ID, + span_id: TEST_SPAN_ID_2, + parent_span_id: TEST_SPAN_ID_1, + environment_id: TEST_ENVIRONMENT_ID, + project_id: TEST_PROJECT_ID, + organization_id: TEST_ORG_ID, + start_time: "2024-01-15 10:00:00.100000000", + end_time: "2024-01-15 10:00:00.500000000", + duration_ms: 400, + name: "embedding call", + kind: 1, + status_code: 0, + status_message: null, + model: "text-embedding-3-small", + provider: "openai", + input_tokens: 50, + output_tokens: 0, + total_tokens: 50, + cost_usd: 0.001, + function_id: null, + function_name: null, + function_version: null, + error_type: null, + error_message: null, + attributes: "{}", + events: null, + links: null, + service_name: "test-service", + service_version: "1.0.0", + resource_attributes: null, + created_at: "2024-01-15 10:00:00.100", + _version: Date.now(), + }, +]; + +const setupTestData = Effect.gen(function* () { + const client = yield* ClickHouse; + yield* client.command( + `TRUNCATE TABLE ${CLICKHOUSE_DATABASE}.spans_analytics`, + ); + yield* client.insert("spans_analytics", testSpans); +}); + +const cleanupTestData = Effect.gen(function* () { + const client = yield* ClickHouse; + yield* client.command( + `TRUNCATE TABLE ${CLICKHOUSE_DATABASE}.spans_analytics`, + ); +}); + +beforeAll(async () => { + if (!clickHouseAvailable) return; + try { + await Effect.runPromise( + Effect.gen(function* () { + yield* setupSchema; + yield* setupTestData; + }).pipe(Effect.provide(TestClickHouse)), + ); + } catch (error) { + console.warn("ClickHouse test setup failed:", error); + throw error; + } +}); + +afterAll(async () => { + if (!clickHouseAvailable) return; + await Effect.runPromise(cleanupTestData.pipe(Effect.provide(TestClickHouse))); +}); + +describe("ClickHouseSearch", () => { + describe("search", () => { + it.effect("returns spans matching query", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + query: "llm", + }); + + expect(result.spans.length).toBeGreaterThan(0); + expect(result.spans[0]?.name).toContain("llm"); + }).pipe(provideSearchLayer), + ); + + it.effect("returns empty array for non-matching query", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + query: "nonexistentqueryterm", + }); + + expect(result.spans).toHaveLength(0); + }).pipe(provideSearchLayer), + ); + + it.effect("filters by model", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + model: ["gpt-4"], + }); + + expect(result.spans.length).toBeGreaterThan(0); + for (const span of result.spans) { + expect(span.model).toBe("gpt-4"); + } + }).pipe(provideSearchLayer), + ); + + it.effect("filters by provider", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + provider: ["openai"], + }); + + expect(result.spans.length).toBeGreaterThan(0); + for (const span of result.spans) { + expect(span.provider).toBe("openai"); + } + }).pipe(provideSearchLayer), + ); + + it.effect("respects limit and offset", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + limit: 1, + offset: 0, + }); + + expect(result.spans.length).toBeLessThanOrEqual(1); + }).pipe(provideSearchLayer), + ); + + it.effect("sorts by start_time descending by default", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + }); + + if (result.spans.length >= 2) { + const first = new Date(result.spans[0].startTime).getTime(); + const second = new Date(result.spans[1].startTime).getTime(); + expect(first).toBeGreaterThanOrEqual(second); + } + }).pipe(provideSearchLayer), + ); + + it.effect("returns total count and hasMore", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + limit: 1, + }); + + expect(typeof result.total).toBe("number"); + expect(typeof result.hasMore).toBe("boolean"); + }).pipe(provideSearchLayer), + ); + }); + + describe("getTraceDetail", () => { + it.effect("returns all spans for a trace", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getTraceDetail({ + environmentId: TEST_ENVIRONMENT_ID, + traceId: TEST_TRACE_ID, + }); + + expect(result.traceId).toBe(TEST_TRACE_ID); + expect(result.spans.length).toBeGreaterThan(0); + }).pipe(provideSearchLayer), + ); + + it.effect("identifies root span", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getTraceDetail({ + environmentId: TEST_ENVIRONMENT_ID, + traceId: TEST_TRACE_ID, + }); + + expect(result.rootSpanId).toBe(TEST_SPAN_ID_1); + }).pipe(provideSearchLayer), + ); + + it.effect("calculates total duration", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getTraceDetail({ + environmentId: TEST_ENVIRONMENT_ID, + traceId: TEST_TRACE_ID, + }); + + expect(result.totalDurationMs).toBeGreaterThan(0); + }).pipe(provideSearchLayer), + ); + + it.effect("returns empty for non-existent trace", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getTraceDetail({ + environmentId: TEST_ENVIRONMENT_ID, + traceId: "non-existent-trace-id", + }); + + expect(result.spans).toHaveLength(0); + expect(result.rootSpanId).toBeNull(); + expect(result.totalDurationMs).toBeNull(); + }).pipe(provideSearchLayer), + ); + }); + + describe("getAnalyticsSummary", () => { + it.effect("returns analytics summary", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getAnalyticsSummary({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + }); + + expect(typeof result.totalSpans).toBe("number"); + expect(typeof result.errorRate).toBe("number"); + expect(typeof result.totalTokens).toBe("number"); + expect(Array.isArray(result.topModels)).toBe(true); + expect(Array.isArray(result.topFunctions)).toBe(true); + }).pipe(provideSearchLayer), + ); + + it.effect("returns top models", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getAnalyticsSummary({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + }); + + if (result.topModels.length > 0) { + expect(result.topModels[0]?.model).toBeDefined(); + expect(result.topModels[0]?.count).toBeGreaterThan(0); + } + }).pipe(provideSearchLayer), + ); + }); + + describe("validation", () => { + it("throws on time range exceeding 30 days for search", async () => { + const program = Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-03-01"), // 60 days + }); + }).pipe(provideSearchLayer); + + await expect(Effect.runPromise(program)).rejects.toThrow( + /Time range exceeds maximum/, + ); + }); + + it("throws on query exceeding max length", async () => { + const longQuery = "a".repeat(501); + + const program = Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + query: longQuery, + }); + }).pipe(provideSearchLayer); + + await expect(Effect.runPromise(program)).rejects.toThrow( + /Query exceeds maximum length/, + ); + }); + + it("throws on offset exceeding max", async () => { + const program = Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + offset: 10001, + }); + }).pipe(provideSearchLayer); + + await expect(Effect.runPromise(program)).rejects.toThrow( + /Offset exceeds maximum/, + ); + }); + + it("throws on too many attribute filters", async () => { + const tooManyFilters = Array.from({ length: 11 }, (_, i) => ({ + key: `attr${i}`, + operator: "eq" as const, + value: "test", + })); + + const program = Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + attributeFilters: tooManyFilters, + }); + }).pipe(provideSearchLayer); + + await expect(Effect.runPromise(program)).rejects.toThrow( + /Too many attribute filters/, + ); + }); + + it("throws on too many model values", async () => { + const tooManyModels = Array.from({ length: 21 }, (_, i) => `model${i}`); + + const program = Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + model: tooManyModels, + }); + }).pipe(provideSearchLayer); + + await expect(Effect.runPromise(program)).rejects.toThrow( + /Too many model values/, + ); + }); + }); + + describe("transformations", () => { + it.effect("transforms snake_case to camelCase in search results", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.search({ + environmentId: TEST_ENVIRONMENT_ID, + startTime: new Date("2024-01-01"), + endTime: new Date("2024-01-31"), + }); + + if (result.spans.length > 0) { + const span = result.spans[0]; + // Verify camelCase keys exist + expect("traceId" in span).toBe(true); + expect("spanId" in span).toBe(true); + expect("startTime" in span).toBe(true); + expect("durationMs" in span).toBe(true); + expect("totalTokens" in span).toBe(true); + expect("functionId" in span).toBe(true); + expect("functionName" in span).toBe(true); + } + }).pipe(provideSearchLayer), + ); + + it.effect("transforms snake_case to camelCase in trace detail", () => + Effect.gen(function* () { + const searchService = yield* ClickHouseSearch; + + const result = yield* searchService.getTraceDetail({ + environmentId: TEST_ENVIRONMENT_ID, + traceId: TEST_TRACE_ID, + }); + + if (result.spans.length > 0) { + const span = result.spans[0]; + // Verify camelCase keys exist + expect("traceDbId" in span).toBe(true); + expect("parentSpanId" in span).toBe(true); + expect("environmentId" in span).toBe(true); + expect("projectId" in span).toBe(true); + expect("organizationId" in span).toBe(true); + expect("statusCode" in span).toBe(true); + expect("inputTokens" in span).toBe(true); + expect("outputTokens" in span).toBe(true); + expect("costUsd" in span).toBe(true); + expect("errorType" in span).toBe(true); + expect("serviceName" in span).toBe(true); + } + }).pipe(provideSearchLayer), + ); + }); +}); diff --git a/cloud/clickhouse/search.ts b/cloud/clickhouse/search.ts new file mode 100644 index 0000000000..77fbc41b89 --- /dev/null +++ b/cloud/clickhouse/search.ts @@ -0,0 +1,976 @@ +/** + * @fileoverview Effect-native ClickHouse search service for spans analytics. + * + * Provides read-only search and analytics operations against ClickHouse. + * Uses ClickHouse for database access with automatic connection pooling. + * + * ## Architecture + * + * ``` + * ClickHouseSearch (read-only search operations) + * └── data access via ClickHouse + * ``` + * + * ## Query Strategies + * + * - **search()**: Uses `argMax` for deduplication (fast, eventual consistency) + * - **getTraceDetail()**: Uses `FINAL` for exact deduplication (accurate, lower frequency) + * - **getAnalyticsSummary()**: Uses `argMax` (aggregates need only latest values) + * + * ## Query Constraints + * + * - `limit`: max 1000 (default 50) + * - `offset`: max 10000 (cursor-based pagination recommended for deeper pages) + * - `time_range`: max 30 days (search), max 90 days (analytics) + * - `query`: max 500 characters + * - `attributeFilters`: max 10 + * - `model[]`/`provider[]`: max 20 items + * - Required: environmentId + startTime + endTime + * + * @example + * ```ts + * const searchService = yield* ClickHouseSearch; + * + * // Search spans with filters + * const results = yield* searchService.search({ + * environmentId: "env-123", + * startTime: new Date("2024-01-01"), + * endTime: new Date("2024-01-02"), + * query: "llm.call", + * }); + * + * // Get trace details with all spans + * const trace = yield* searchService.getTraceDetail({ + * environmentId: "env-123", + * traceId: "abc123", + * }); + * ``` + */ + +import { Context, Effect, Layer } from "effect"; +import { ClickHouse } from "@/clickhouse/client"; +import { ClickHouseError } from "@/errors"; + +// ============================================================================= +// Date Formatting Utilities +// ============================================================================= + +/** + * Formats a Date object for ClickHouse DateTime64(9) query parameters. + * ClickHouse expects format: "2024-01-01 00:00:00.000000000" (space separator, 9 decimal places). + * JavaScript toISOString() returns "2024-01-01T00:00:00.000Z" which is incompatible. + */ +function formatDateTime64(date: Date): string { + const iso = date.toISOString(); + // Replace 'T' with space, remove 'Z', and pad milliseconds to nanoseconds + return iso.replace("T", " ").replace("Z", "") + "000000"; +} + +// ============================================================================= +// Query Constraints +// ============================================================================= + +const QUERY_CONSTRAINTS = { + /** Maximum limit for search results */ + MAX_LIMIT: 1000, + /** Default limit for search results */ + DEFAULT_LIMIT: 50, + /** Maximum offset for pagination (prevents deep pagination) */ + MAX_OFFSET: 10000, + /** Maximum time range in days for search */ + MAX_TIME_RANGE_DAYS_SEARCH: 30, + /** Maximum time range in days for analytics */ + MAX_TIME_RANGE_DAYS_ANALYTICS: 90, + /** Maximum query string length */ + MAX_QUERY_LENGTH: 500, + /** Maximum attribute filters */ + MAX_ATTRIBUTE_FILTERS: 10, + /** Maximum items in model/provider arrays */ + MAX_ARRAY_ITEMS: 20, +} as const; + +// ============================================================================= +// Input Types +// ============================================================================= + +/** Attribute filter for ad-hoc queries */ +export interface AttributeFilter { + /** Attribute key path (e.g., "gen_ai.request.model") */ + key: string; + /** Comparison operator */ + operator: "eq" | "neq" | "contains" | "exists"; + /** Value to compare against (optional for "exists") */ + value?: string; +} + +/** Input type for span search with required filters. */ +export interface SpanSearchInput { + /** Environment ID to scope the search (required). */ + environmentId: string; + /** Start of time range (required). */ + startTime: Date; + /** End of time range (required). */ + endTime: Date; + /** Optional text search query (token-based matching). */ + query?: string; + /** Filter by trace ID. */ + traceId?: string; + /** Filter by span ID. */ + spanId?: string; + /** Filter by model names. */ + model?: string[]; + /** Filter by provider names. */ + provider?: string[]; + /** Filter by function ID. */ + functionId?: string; + /** Filter by function name. */ + functionName?: string; + /** Filter by error presence. */ + hasError?: boolean; + /** Filter by minimum total tokens. */ + minTokens?: number; + /** Filter by maximum total tokens. */ + maxTokens?: number; + /** Filter by minimum duration (ms). */ + minDuration?: number; + /** Filter by maximum duration (ms). */ + maxDuration?: number; + /** Custom attribute filters. */ + attributeFilters?: AttributeFilter[]; + /** Number of results to return (max 1000). */ + limit?: number; + /** Offset for pagination (max 10000). */ + offset?: number; + /** Sort field. */ + sortBy?: "start_time" | "duration_ms" | "total_tokens"; + /** Sort direction. */ + sortOrder?: "asc" | "desc"; +} + +/** Input type for trace detail retrieval. */ +export interface TraceDetailInput { + /** Environment ID to scope the query (required). */ + environmentId: string; + /** Trace ID to retrieve (required). */ + traceId: string; +} + +/** Input type for analytics summary. */ +export interface AnalyticsSummaryInput { + /** Environment ID to scope the query (required). */ + environmentId: string; + /** Start of time range (required). */ + startTime: Date; + /** End of time range (required). */ + endTime: Date; + /** Optional function ID filter. */ + functionId?: string; +} + +// ============================================================================= +// Output Types +// ============================================================================= + +/** + * Span search result (summary mode). + * + * Note: attributes, events, links, resource_attributes are intentionally + * excluded for query cost reduction. Use getTraceDetail() for full span data. + */ +export interface SpanSearchResult { + id: string; + traceId: string; + spanId: string; + name: string; + startTime: string; + durationMs: number | null; + model: string | null; + provider: string | null; + totalTokens: number | null; + functionId: string | null; + functionName: string | null; +} + +/** Search response with pagination info. */ +export interface SearchResponse { + spans: SpanSearchResult[]; + total: number; + hasMore: boolean; +} + +/** Full span data for trace detail view. */ +export interface SpanDetail { + id: string; + traceDbId: string; + traceId: string; + spanId: string; + parentSpanId: string | null; + environmentId: string; + projectId: string; + organizationId: string; + startTime: string; + endTime: string; + durationMs: number | null; + name: string; + kind: number; + statusCode: number | null; + statusMessage: string | null; + model: string | null; + provider: string | null; + inputTokens: number | null; + outputTokens: number | null; + totalTokens: number | null; + costUsd: number | null; + functionId: string | null; + functionName: string | null; + functionVersion: string | null; + errorType: string | null; + errorMessage: string | null; + attributes: string; + events: string | null; + links: string | null; + serviceName: string | null; + serviceVersion: string | null; + resourceAttributes: string | null; +} + +/** Trace detail response with all spans. */ +export interface TraceDetailResponse { + traceId: string; + spans: SpanDetail[]; + rootSpanId: string | null; + totalDurationMs: number | null; +} + +/** Analytics summary response. */ +export interface AnalyticsSummaryResponse { + totalSpans: number; + avgDurationMs: number | null; + p50DurationMs: number | null; + p95DurationMs: number | null; + p99DurationMs: number | null; + errorRate: number; + totalTokens: number; + totalCostUsd: number; + topModels: Array<{ model: string; count: number }>; + topFunctions: Array<{ functionName: string; count: number }>; +} + +// ============================================================================= +// Internal Row Types (ClickHouse query results) +// ============================================================================= + +interface SpanSummaryRow { + environment_id: string; + trace_id: string; + span_id: string; + id: string; + name: string; + start_time: string; + duration_ms: number | null; + model: string | null; + provider: string | null; + total_tokens: number | null; + function_id: string | null; + function_name: string | null; +} + +interface SpanDetailRow { + id: string; + trace_db_id: string; + trace_id: string; + span_id: string; + parent_span_id: string | null; + environment_id: string; + project_id: string; + organization_id: string; + start_time: string; + end_time: string; + duration_ms: number | null; + name: string; + kind: number; + status_code: number | null; + status_message: string | null; + model: string | null; + provider: string | null; + input_tokens: number | null; + output_tokens: number | null; + total_tokens: number | null; + cost_usd: number | null; + function_id: string | null; + function_name: string | null; + function_version: string | null; + error_type: string | null; + error_message: string | null; + attributes: string; + events: string | null; + links: string | null; + service_name: string | null; + service_version: string | null; + resource_attributes: string | null; +} + +interface CountRow { + count: number; +} + +interface AnalyticsRow { + total_spans: number; + avg_duration_ms: number | null; + p50_duration_ms: number | null; + p95_duration_ms: number | null; + p99_duration_ms: number | null; + error_count: number; + total_tokens: number; + total_cost_usd: number; +} + +interface TopModelRow { + model_value: string; + count: number; +} + +interface TopFunctionRow { + function_name_value: string; + count: number; +} + +// ============================================================================= +// Service Interface +// ============================================================================= + +export interface ClickHouseSearchClient { + /** Search spans with filters and pagination. */ + readonly search: ( + input: SpanSearchInput, + ) => Effect.Effect; + + /** Get full trace detail with all spans. */ + readonly getTraceDetail: ( + input: TraceDetailInput, + ) => Effect.Effect; + + /** Get analytics summary for a time range. */ + readonly getAnalyticsSummary: ( + input: AnalyticsSummaryInput, + ) => Effect.Effect; +} + +// ============================================================================= +// Service Definition +// ============================================================================= + +export class ClickHouseSearch extends Context.Tag("ClickHouseSearch")< + ClickHouseSearch, + ClickHouseSearchClient +>() { + static Default = Layer.effect( + ClickHouseSearch, + Effect.gen(function* () { + const client = yield* ClickHouse; + + return { + search: (input: SpanSearchInput) => + Effect.gen(function* () { + // Validate input constraints + const validatedInput = validateSearchInput(input); + + // Build WHERE clause and params + const { clause: whereClause, params } = + buildSearchWhereClause(validatedInput); + + // Build ORDER BY clause (validated) + const orderBy = buildOrderByClause( + validatedInput.sortBy ?? "start_time", + validatedInput.sortOrder ?? "desc", + ); + + const limit = + validatedInput.limit ?? QUERY_CONSTRAINTS.DEFAULT_LIMIT; + const offset = validatedInput.offset ?? 0; + + // Summary query using argMax for deduplication (hot path) + // Use table alias 's' to avoid column/alias name conflicts + const rows = yield* client.unsafeQuery( + ` + SELECT * + FROM ( + SELECT + s.environment_id, + s.trace_id, + s.span_id, + argMax(s.id, s._version) as id, + argMax(s.name, s._version) as name, + argMax(s.start_time, s._version) as start_time, + argMax(s.duration_ms, s._version) as duration_ms, + argMax(s.model, s._version) as model, + argMax(s.provider, s._version) as provider, + argMax(s.total_tokens, s._version) as total_tokens, + argMax(s.function_id, s._version) as function_id, + argMax(s.function_name, s._version) as function_name + FROM spans_analytics AS s + WHERE ${whereClause} + GROUP BY s.environment_id, s.trace_id, s.span_id + ) + ORDER BY ${orderBy} + LIMIT ${limit} OFFSET ${offset} + `, + params, + ); + + // Count query for pagination (uses same table alias 's') + const countRows = yield* client.unsafeQuery( + ` + SELECT count(DISTINCT (s.environment_id, s.trace_id, s.span_id)) as count + FROM spans_analytics AS s + WHERE ${whereClause} + `, + params, + ); + const total = Number(countRows[0]?.count ?? 0); + + // Transform to API response format + const spans = rows.map(transformToSearchResult); + + return { + spans, + total, + hasMore: offset + spans.length < total, + }; + }), + + getTraceDetail: (input: TraceDetailInput) => + Effect.gen(function* () { + // Use FINAL for exact deduplication (accuracy priority) + const rows = yield* client.unsafeQuery( + ` + SELECT + id, + trace_db_id, + trace_id, + span_id, + parent_span_id, + environment_id, + project_id, + organization_id, + start_time, + end_time, + duration_ms, + name, + kind, + status_code, + status_message, + model, + provider, + input_tokens, + output_tokens, + total_tokens, + cost_usd, + function_id, + function_name, + function_version, + error_type, + error_message, + attributes, + events, + links, + service_name, + service_version, + resource_attributes + FROM spans_analytics FINAL + WHERE environment_id = toUUID({environmentId:String}) + AND trace_id = {traceId:String} + ORDER BY start_time ASC + `, + { + environmentId: input.environmentId, + traceId: input.traceId, + }, + ); + + if (rows.length === 0) { + return { + traceId: input.traceId, + spans: [], + rootSpanId: null, + totalDurationMs: null, + }; + } + + // Transform rows + const spans = rows.map(transformToSpanDetail); + + // Find root span (no parent) and calculate total duration + const rootSpan = spans.find((s) => s.parentSpanId === null); + const rootSpanId = rootSpan?.spanId ?? null; + + // Calculate total duration from earliest start to latest end + let minStart: number | null = null; + let maxEnd: number | null = null; + + for (const span of spans) { + const start = new Date(span.startTime).getTime(); + const end = new Date(span.endTime).getTime(); + + if (minStart === null || start < minStart) minStart = start; + if (maxEnd === null || end > maxEnd) maxEnd = end; + } + + const totalDurationMs = + minStart !== null && maxEnd !== null ? maxEnd - minStart : null; + + return { + traceId: input.traceId, + spans, + rootSpanId, + totalDurationMs, + }; + }), + + getAnalyticsSummary: (input: AnalyticsSummaryInput) => + Effect.gen(function* () { + // Validate time range (max 90 days for analytics) + const timeDiff = + input.endTime.getTime() - input.startTime.getTime(); + const daysDiff = timeDiff / (1000 * 60 * 60 * 24); + if (daysDiff > QUERY_CONSTRAINTS.MAX_TIME_RANGE_DAYS_ANALYTICS) { + return yield* Effect.fail( + new ClickHouseError({ + message: `Time range exceeds maximum of ${QUERY_CONSTRAINTS.MAX_TIME_RANGE_DAYS_ANALYTICS} days`, + }), + ); + } + + // Build base WHERE clause for analytics + // Use toUUID() for explicit UUID type conversion + const { clause: baseWhere, params: baseParams } = + buildAnalyticsBaseWhere(input); + + // Main analytics query using argMax in subquery for deduplication + // Outer query aggregates the deduplicated values + // Use table alias 's' to avoid column/alias conflicts + const analyticsRows = yield* client.unsafeQuery( + ` + SELECT + count() as total_spans, + avg(duration_ms) as avg_duration_ms, + quantile(0.5)(duration_ms) as p50_duration_ms, + quantile(0.95)(duration_ms) as p95_duration_ms, + quantile(0.99)(duration_ms) as p99_duration_ms, + countIf(error_type IS NOT NULL) as error_count, + sum(total_tokens) as total_tokens, + sum(cost_usd) as total_cost_usd + FROM ( + SELECT + argMax(s.duration_ms, s._version) as duration_ms, + argMax(s.error_type, s._version) as error_type, + argMax(s.total_tokens, s._version) as total_tokens, + argMax(s.cost_usd, s._version) as cost_usd + FROM spans_analytics AS s + WHERE ${baseWhere} + GROUP BY s.environment_id, s.trace_id, s.span_id + ) + `, + baseParams, + ); + const analytics = analyticsRows[0]; + + // Top models query: deduplicate then group by model + // Use table alias to avoid column/alias conflicts + const modelWhere = `${baseWhere} AND s.model IS NOT NULL`; + + const topModelRows = yield* client.unsafeQuery( + ` + SELECT model_value, count() as count + FROM ( + SELECT argMax(s.model, s._version) as model_value + FROM spans_analytics AS s + WHERE ${modelWhere} + GROUP BY s.environment_id, s.trace_id, s.span_id + ) + WHERE model_value != '' + GROUP BY model_value + ORDER BY count DESC + LIMIT 10 + `, + baseParams, + ); + + // Top functions query: deduplicate then group by function_name + // Use table alias to avoid column/alias conflicts + const functionWhere = `${baseWhere} AND s.function_name IS NOT NULL`; + + const topFunctionRows = yield* client.unsafeQuery( + ` + SELECT function_name_value, count() as count + FROM ( + SELECT argMax(s.function_name, s._version) as function_name_value + FROM spans_analytics AS s + WHERE ${functionWhere} + GROUP BY s.environment_id, s.trace_id, s.span_id + ) + WHERE function_name_value != '' + GROUP BY function_name_value + ORDER BY count DESC + LIMIT 10 + `, + baseParams, + ); + + const totalSpans = Number(analytics?.total_spans ?? 0); + const errorCount = Number(analytics?.error_count ?? 0); + + return { + totalSpans, + avgDurationMs: + analytics?.avg_duration_ms != null + ? Number(analytics.avg_duration_ms) + : null, + p50DurationMs: + analytics?.p50_duration_ms != null + ? Number(analytics.p50_duration_ms) + : null, + p95DurationMs: + analytics?.p95_duration_ms != null + ? Number(analytics.p95_duration_ms) + : null, + p99DurationMs: + analytics?.p99_duration_ms != null + ? Number(analytics.p99_duration_ms) + : null, + errorRate: totalSpans > 0 ? errorCount / totalSpans : 0, + totalTokens: Number(analytics?.total_tokens ?? 0), + totalCostUsd: Number(analytics?.total_cost_usd ?? 0), + topModels: topModelRows.map((r) => ({ + model: r.model_value, + count: Number(r.count), + })), + topFunctions: topFunctionRows.map((r) => ({ + functionName: r.function_name_value, + count: Number(r.count), + })), + }; + }), + }; + }), + ); +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/** + * Validate search input constraints. + */ +function validateSearchInput(input: SpanSearchInput): SpanSearchInput { + // Validate time range + const timeDiff = input.endTime.getTime() - input.startTime.getTime(); + const daysDiff = timeDiff / (1000 * 60 * 60 * 24); + if (daysDiff > QUERY_CONSTRAINTS.MAX_TIME_RANGE_DAYS_SEARCH) { + throw new ClickHouseError({ + message: `Time range exceeds maximum of ${QUERY_CONSTRAINTS.MAX_TIME_RANGE_DAYS_SEARCH} days`, + }); + } + + // Validate query length + if (input.query && input.query.length > QUERY_CONSTRAINTS.MAX_QUERY_LENGTH) { + throw new ClickHouseError({ + message: `Query exceeds maximum length of ${QUERY_CONSTRAINTS.MAX_QUERY_LENGTH} characters`, + }); + } + + // Validate limit + const limit = Math.min( + input.limit ?? QUERY_CONSTRAINTS.DEFAULT_LIMIT, + QUERY_CONSTRAINTS.MAX_LIMIT, + ); + + // Validate offset + if ((input.offset ?? 0) > QUERY_CONSTRAINTS.MAX_OFFSET) { + throw new ClickHouseError({ + message: `Offset exceeds maximum of ${QUERY_CONSTRAINTS.MAX_OFFSET}. Use cursor-based pagination for deeper results.`, + }); + } + + // Validate attribute filters + if ( + input.attributeFilters && + input.attributeFilters.length > QUERY_CONSTRAINTS.MAX_ATTRIBUTE_FILTERS + ) { + throw new ClickHouseError({ + message: `Too many attribute filters. Maximum is ${QUERY_CONSTRAINTS.MAX_ATTRIBUTE_FILTERS}.`, + }); + } + + // Validate model/provider array lengths + if (input.model && input.model.length > QUERY_CONSTRAINTS.MAX_ARRAY_ITEMS) { + throw new ClickHouseError({ + message: `Too many model values. Maximum is ${QUERY_CONSTRAINTS.MAX_ARRAY_ITEMS}.`, + }); + } + if ( + input.provider && + input.provider.length > QUERY_CONSTRAINTS.MAX_ARRAY_ITEMS + ) { + throw new ClickHouseError({ + message: `Too many provider values. Maximum is ${QUERY_CONSTRAINTS.MAX_ARRAY_ITEMS}.`, + }); + } + + return { ...input, limit }; +} + +/** + * Build WHERE clause for search query. + */ +function buildSearchWhereClause(input: SpanSearchInput): { + clause: string; + params: Record; +} { + const conditions: string[] = []; + const params: Record = {}; + let paramIndex = 0; + + const addParam = (name: string, value: unknown) => { + const key = `${name}_${paramIndex++}`; + params[key] = value; + return key; + }; + + // Required filters (all column references use 's.' prefix) + // Use toUUID() for explicit type conversion (ClickHouse UUID type) + const environmentIdKey = addParam("environmentId", input.environmentId); + conditions.push(`s.environment_id = toUUID({${environmentIdKey}:String})`); + + const startTimeKey = addParam("startTime", formatDateTime64(input.startTime)); + conditions.push(`s.start_time >= toDateTime64({${startTimeKey}:String}, 9)`); + + const endTimeKey = addParam("endTime", formatDateTime64(input.endTime)); + conditions.push(`s.start_time <= toDateTime64({${endTimeKey}:String}, 9)`); + + // Optional text search using hasToken for tokenized matching + if (input.query) { + const tokens = input.query + .trim() + .toLowerCase() + .split(/[^a-z0-9]+/g) + .filter((token) => token.length > 0); + for (const token of tokens) { + const tokenKey = addParam("token", token); + conditions.push(`hasToken(s.name_lower, {${tokenKey}:String})`); + } + } + + // Optional filters + if (input.traceId) { + const traceIdKey = addParam("traceId", input.traceId); + conditions.push(`s.trace_id = {${traceIdKey}:String}`); + } + + if (input.spanId) { + const spanIdKey = addParam("spanId", input.spanId); + conditions.push(`s.span_id = {${spanIdKey}:String}`); + } + + if (input.model && input.model.length > 0) { + const modelKey = addParam("model", input.model); + conditions.push(`s.model IN {${modelKey}:Array(String)}`); + } + + if (input.provider && input.provider.length > 0) { + const providerKey = addParam("provider", input.provider); + conditions.push(`s.provider IN {${providerKey}:Array(String)}`); + } + + if (input.functionId) { + const functionIdKey = addParam("functionId", input.functionId); + conditions.push(`s.function_id = toUUID({${functionIdKey}:String})`); + } + + if (input.functionName) { + const functionNameKey = addParam("functionName", input.functionName); + conditions.push(`s.function_name = {${functionNameKey}:String}`); + } + + if (input.hasError === true) { + conditions.push("s.error_type IS NOT NULL"); + } else if (input.hasError === false) { + conditions.push("s.error_type IS NULL"); + } + + if (input.minTokens !== undefined) { + const minTokensKey = addParam("minTokens", input.minTokens); + conditions.push(`s.total_tokens >= {${minTokensKey}:Int64}`); + } + + if (input.maxTokens !== undefined) { + const maxTokensKey = addParam("maxTokens", input.maxTokens); + conditions.push(`s.total_tokens <= {${maxTokensKey}:Int64}`); + } + + if (input.minDuration !== undefined) { + const minDurationKey = addParam("minDuration", input.minDuration); + conditions.push(`s.duration_ms >= {${minDurationKey}:Int64}`); + } + + if (input.maxDuration !== undefined) { + const maxDurationKey = addParam("maxDuration", input.maxDuration); + conditions.push(`s.duration_ms <= {${maxDurationKey}:Int64}`); + } + + // Attribute filters + if (input.attributeFilters) { + for (const filter of input.attributeFilters) { + switch (filter.operator) { + case "eq": + { + const keyParam = addParam("attrKey", filter.key); + const valueParam = addParam("attrValue", filter.value ?? ""); + conditions.push( + `JSONExtractString(s.attributes, {${keyParam}:String}) = {${valueParam}:String}`, + ); + } + break; + case "neq": + { + const keyParam = addParam("attrKey", filter.key); + const valueParam = addParam("attrValue", filter.value ?? ""); + conditions.push( + `JSONExtractString(s.attributes, {${keyParam}:String}) != {${valueParam}:String}`, + ); + } + break; + case "contains": + { + const keyParam = addParam("attrKey", filter.key); + const valueParam = addParam("attrValue", `%${filter.value ?? ""}%`); + conditions.push( + `JSONExtractString(s.attributes, {${keyParam}:String}) LIKE {${valueParam}:String}`, + ); + } + break; + case "exists": + { + const keyParam = addParam("attrKey", filter.key); + conditions.push(`JSONHas(s.attributes, {${keyParam}:String})`); + } + break; + } + } + } + + return { + clause: conditions.join(" AND "), + params, + }; +} + +/** + * Build WHERE clause for analytics queries. + */ +function buildAnalyticsBaseWhere(input: AnalyticsSummaryInput): { + clause: string; + params: Record; +} { + const params: Record = {}; + let paramIndex = 0; + + const addParam = (name: string, value: unknown) => { + const key = `${name}_${paramIndex++}`; + params[key] = value; + return key; + }; + + const environmentIdKey = addParam("environmentId", input.environmentId); + const startTimeKey = addParam("startTime", formatDateTime64(input.startTime)); + const endTimeKey = addParam("endTime", formatDateTime64(input.endTime)); + + const conditions = [ + `s.environment_id = toUUID({${environmentIdKey}:String})`, + `s.start_time >= toDateTime64({${startTimeKey}:String}, 9)`, + `s.start_time <= toDateTime64({${endTimeKey}:String}, 9)`, + ]; + + if (input.functionId) { + const functionIdKey = addParam("functionId", input.functionId); + conditions.push(`s.function_id = toUUID({${functionIdKey}:String})`); + } + + return { + clause: conditions.join(" AND "), + params, + }; +} + +/** + * Build ORDER BY clause for search query. + */ +function buildOrderByClause( + sortBy: "start_time" | "duration_ms" | "total_tokens", + sortOrder: "asc" | "desc", +): string { + const direction = sortOrder.toUpperCase(); + // Use column alias - this works because ORDER BY is on the outer query + // which references the deduplicated subquery results + return `${sortBy} ${direction}`; +} + +/** + * Transform ClickHouse row to API search result (snake_case -> camelCase). + */ +function transformToSearchResult(row: SpanSummaryRow): SpanSearchResult { + return { + id: row.id, + traceId: row.trace_id, + spanId: row.span_id, + name: row.name, + startTime: row.start_time, + durationMs: row.duration_ms, + model: row.model, + provider: row.provider, + totalTokens: row.total_tokens, + functionId: row.function_id, + functionName: row.function_name, + }; +} + +/** + * Transform ClickHouse row to API span detail (snake_case -> camelCase). + */ +function transformToSpanDetail(row: SpanDetailRow): SpanDetail { + return { + id: row.id, + traceDbId: row.trace_db_id, + traceId: row.trace_id, + spanId: row.span_id, + parentSpanId: row.parent_span_id, + environmentId: row.environment_id, + projectId: row.project_id, + organizationId: row.organization_id, + startTime: row.start_time, + endTime: row.end_time, + durationMs: row.duration_ms, + name: row.name, + kind: row.kind, + statusCode: row.status_code, + statusMessage: row.status_message, + model: row.model, + provider: row.provider, + inputTokens: row.input_tokens, + outputTokens: row.output_tokens, + totalTokens: row.total_tokens, + costUsd: row.cost_usd, + functionId: row.function_id, + functionName: row.function_name, + functionVersion: row.function_version, + errorType: row.error_type, + errorMessage: row.error_message, + attributes: row.attributes, + events: row.events, + links: row.links, + serviceName: row.service_name, + serviceVersion: row.service_version, + resourceAttributes: row.resource_attributes, + }; +} diff --git a/cloud/tests/clickhouse.ts b/cloud/tests/clickhouse.ts index c8ce5728ce..0ea9638433 100644 --- a/cloud/tests/clickhouse.ts +++ b/cloud/tests/clickhouse.ts @@ -14,7 +14,7 @@ const CLICKHOUSE_PASSWORD = process.env.CLICKHOUSE_PASSWORD ?? "clickhouse"; const CLICKHOUSE_DATABASE = process.env.CLICKHOUSE_DATABASE ?? "mirascope_analytics"; -const isClickHouseAvailable = async (): Promise => { +export const checkClickHouseAvailable = async (): Promise => { try { const response = await fetch(`${CLICKHOUSE_URL}/ping`, { method: "GET", @@ -26,7 +26,7 @@ const isClickHouseAvailable = async (): Promise => { } }; -const clickhouseAvailable = await isClickHouseAvailable(); +export const clickHouseAvailable = await checkClickHouseAvailable(); /** * Test settings for ClickHouse. @@ -67,7 +67,7 @@ const wrapEffectTest = // eslint-disable-next-line @typescript-eslint/no-explicit-any (name: any, fn: any, timeout?: any) => { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const runner = clickhouseAvailable ? original : vitestIt.effect.skip; + const runner = clickHouseAvailable ? original : vitestIt.effect.skip; // eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-return return runner( name,