diff --git a/cloud/db/traces.test.ts b/cloud/db/traces.test.ts index 556a8911b2..660c783d6b 100644 --- a/cloud/db/traces.test.ts +++ b/cloud/db/traces.test.ts @@ -541,6 +541,69 @@ describe("Traces", () => { ), ); + it.effect("rejects span when outbox insert fails", () => + Effect.gen(function* () { + const db = yield* Database; + + const resourceSpans = [ + { + resource: { attributes: [] }, + scopeSpans: [ + { + scope: { name: "test" }, + spans: [ + { + traceId: "trace-outbox-error", + spanId: "span-outbox-error", + name: "test-span", + startTimeUnixNano: "1000000000", + endTimeUnixNano: "2000000000", + }, + ], + }, + ], + }, + ]; + + const result = + yield* db.organizations.projects.environments.traces.create({ + userId: "owner-id", + organizationId: "org-id", + projectId: "project-id", + environmentId: "env-id", + data: { resourceSpans }, + }); + + expect(result.rejectedSpans).toBe(1); + expect(result.acceptedSpans).toBe(0); + }).pipe( + Effect.provide( + new MockDrizzleORM() + .select([ + { + role: "OWNER", + organizationId: "org-id", + memberId: "owner-id", + createdAt: new Date(), + }, + ]) + .select([ + { + role: "OWNER", + organizationId: "org-id", + memberId: "owner-id", + createdAt: new Date(), + }, + ]) + .select([{ id: "project-id" }]) + .insert([{ id: "trace-id" }]) // Trace upsert succeeds + .insert([{ id: "span-id" }]) // Span insert succeeds + .insert(new Error("Outbox insert failed")) // Outbox insert fails + .build(), + ), + ), + ); + it.effect("handles empty resourceSpans", () => Effect.gen(function* () { const { environment, project, org, owner } = diff --git a/cloud/db/traces.ts b/cloud/db/traces.ts index 618a341e1b..09fd9b3a17 100644 --- a/cloud/db/traces.ts +++ b/cloud/db/traces.ts @@ -81,6 +81,7 @@ import { type CreateTraceResponse, } from "@/db/schema/traces"; import { spans, type NewSpan } from "@/db/schema/spans"; +import { spansOutbox } from "@/db/schema/spansOutbox"; import type { ProjectRole } from "@/db/schema"; import type { ResourceSpans, KeyValue } from "@/api/traces.schemas"; @@ -419,6 +420,32 @@ export class Traces extends BaseAuthenticatedEffectService< ), ); + // Write to outbox for ClickHouse sync (all inserted spans). + // This is intentionally not wrapped in a transaction so that + // a failed outbox write does not roll back the span insert. + // If the outbox insert fails we mark the span as rejected below. + if (insertedSpans.length > 0) { + const outboxRows = insertedSpans.map((s) => ({ + spanId: s.id, + operation: "INSERT" as const, + })); + yield* client + .insert(spansOutbox) + .values(outboxRows) + .onConflictDoNothing({ + target: [spansOutbox.spanId, spansOutbox.operation], + }) + .pipe( + Effect.mapError( + (e) => + new DatabaseError({ + message: "Failed to write to outbox", + cause: e, + }), + ), + ); + } + return insertedSpans.length > 0; }).pipe(Effect.catchAll(() => Effect.succeed(false)));