Skip to content

Commit 13d60e0

Browse files
committedMar 31, 2025·
feat(core): Add Supabase Queues support
1 parent b630367 commit 13d60e0

File tree

5 files changed

+283
-10
lines changed

5 files changed

+283
-10
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://public@dsn.ingest.sentry.io/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues.rpc('enqueue', {
22+
queue_name: 'todos',
23+
msg: { title: 'Test Todo' },
24+
});
25+
26+
await queues.rpc('dequeue', {
27+
queue_name: 'todos',
28+
});
29+
} catch (error) {
30+
Sentry.captureException(error);
31+
}
32+
}
33+
34+
performQueueOperations();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import type { Page} from '@playwright/test';
2+
import { expect } from '@playwright/test';
3+
import type { Event } from '@sentry/core';
4+
5+
import { sentryTest } from '../../../../utils/fixtures';
6+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
7+
8+
async function mockSupabaseRoute(page: Page) {
9+
await page.route('**/rest/v1/rpc**', route => {
10+
return route.fulfill({
11+
status: 200,
12+
body: JSON.stringify({
13+
foo: ['bar', 'baz'],
14+
}),
15+
headers: {
16+
'Content-Type': 'application/json',
17+
},
18+
});
19+
});
20+
}
21+
22+
sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
23+
await mockSupabaseRoute(page);
24+
25+
if (shouldSkipTracingTest()) {
26+
return;
27+
}
28+
29+
const url = await getLocalTestUrl({ testDir: __dirname });
30+
31+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
32+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
33+
34+
expect(queueSpans).toHaveLength(2);
35+
36+
expect(queueSpans![0]).toMatchObject({
37+
description: 'supabase.db.rpc',
38+
parent_span_id: event.contexts?.trace?.span_id,
39+
span_id: expect.any(String),
40+
start_timestamp: expect.any(Number),
41+
timestamp: expect.any(Number),
42+
trace_id: event.contexts?.trace?.trace_id,
43+
data: expect.objectContaining({
44+
'sentry.op': 'queue.publish',
45+
'sentry.origin': 'auto.db.supabase',
46+
'messaging.destination.name': 'todos',
47+
'messaging.message.id': 'Test Todo',
48+
}),
49+
});
50+
51+
expect(queueSpans![1]).toMatchObject({
52+
description: 'supabase.db.rpc',
53+
parent_span_id: event.contexts?.trace?.span_id,
54+
span_id: expect.any(String),
55+
start_timestamp: expect.any(Number),
56+
timestamp: expect.any(Number),
57+
trace_id: event.contexts?.trace?.trace_id,
58+
data: expect.objectContaining({
59+
'sentry.op': 'queue.process',
60+
'sentry.origin': 'auto.db.supabase',
61+
'messaging.destination.name': 'todos',
62+
}),
63+
});
64+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://public@dsn.ingest.sentry.io/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues
22+
.schema('pgmq_public')
23+
.rpc('enqueue', {
24+
queue_name: 'todos',
25+
msg: { title: 'Test Todo' },
26+
});
27+
28+
await queues
29+
.schema('pgmq_public')
30+
.rpc('dequeue', {
31+
queue_name: 'todos',
32+
});
33+
} catch (error) {
34+
Sentry.captureException(error);
35+
}
36+
}
37+
38+
performQueueOperations();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { type Page, expect } from '@playwright/test';
2+
import type { Event } from '@sentry/core';
3+
4+
import { sentryTest } from '../../../../utils/fixtures';
5+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
6+
7+
async function mockSupabaseRoute(page: Page) {
8+
await page.route('**/rest/v1/rpc**', route => {
9+
return route.fulfill({
10+
status: 200,
11+
body: JSON.stringify({
12+
foo: ['bar', 'baz'],
13+
}),
14+
headers: {
15+
'Content-Type': 'application/json',
16+
},
17+
});
18+
});
19+
}
20+
21+
sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
22+
await mockSupabaseRoute(page);
23+
24+
if (shouldSkipTracingTest()) {
25+
return;
26+
}
27+
28+
const url = await getLocalTestUrl({ testDir: __dirname });
29+
30+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
31+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
32+
33+
expect(queueSpans).toHaveLength(2);
34+
35+
expect(queueSpans![0]).toMatchObject({
36+
description: 'supabase.db.rpc',
37+
parent_span_id: event.contexts?.trace?.span_id,
38+
span_id: expect.any(String),
39+
start_timestamp: expect.any(Number),
40+
timestamp: expect.any(Number),
41+
trace_id: event.contexts?.trace?.trace_id,
42+
data: expect.objectContaining({
43+
'sentry.op': 'queue.publish',
44+
'sentry.origin': 'auto.db.supabase',
45+
'messaging.destination.name': 'todos',
46+
'messaging.message.id': 'Test Todo',
47+
}),
48+
});
49+
50+
expect(queueSpans![1]).toMatchObject({
51+
description: 'supabase.db.rpc',
52+
parent_span_id: event.contexts?.trace?.span_id,
53+
span_id: expect.any(String),
54+
start_timestamp: expect.any(Number),
55+
timestamp: expect.any(Number),
56+
trace_id: event.contexts?.trace?.trace_id,
57+
data: expect.objectContaining({
58+
'sentry.op': 'queue.process',
59+
'sentry.origin': 'auto.db.supabase',
60+
'messaging.destination.name': 'todos',
61+
}),
62+
});
63+
});

‎packages/core/src/integrations/supabase.ts

+84-10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '
1212
import { captureException } from '../exports';
1313
import { SPAN_STATUS_ERROR, SPAN_STATUS_OK } from '../tracing';
1414

15+
export interface SupabaseClientConstructor {
16+
prototype: {
17+
from: (table: string) => PostgrestQueryBuilder;
18+
schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> };
19+
};
20+
rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
21+
}
22+
1523
const AUTH_OPERATIONS_TO_INSTRUMENT = [
1624
'reauthenticate',
1725
'signInAnonymously',
@@ -114,12 +122,6 @@ export interface SupabaseBreadcrumb {
114122
};
115123
}
116124

117-
export interface SupabaseClientConstructor {
118-
prototype: {
119-
from: (table: string) => PostgrestQueryBuilder;
120-
};
121-
}
122-
123125
export interface PostgrestProtoThenable {
124126
then: <T>(
125127
onfulfilled?: ((value: T) => T | PromiseLike<T>) | null,
@@ -197,6 +199,76 @@ export function translateFiltersIntoMethods(key: string, query: string): string
197199
return `${method}(${key}, ${value.join('.')})`;
198200
}
199201

202+
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
203+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
204+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
205+
{
206+
apply(target, thisArg, argumentsList) {
207+
const rv = Reflect.apply(target, thisArg, argumentsList);
208+
209+
return instrumentRpc(rv);
210+
},
211+
},
212+
);
213+
}
214+
215+
function instrumentRpc(SupabaseClient: unknown): unknown {
216+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy(
217+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc,
218+
{
219+
apply(target, thisArg, argumentsList) {
220+
const isProducerSpan = argumentsList[0] === 'enqueue';
221+
const isConsumerSpan = argumentsList[0] === 'dequeue';
222+
223+
const maybeQueueParams = argumentsList[1];
224+
225+
// If the second argument is not an object, it's not a queue operation
226+
if (!isPlainObject(maybeQueueParams)) {
227+
return Reflect.apply(target, thisArg, argumentsList);
228+
}
229+
230+
const msg = maybeQueueParams?.msg as { title: string };
231+
232+
const messageId = msg?.title;
233+
const queueName = maybeQueueParams?.queue_name as string;
234+
235+
const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : '';
236+
237+
// If the operation is not a queue operation, return the original function
238+
if (!op) {
239+
return Reflect.apply(target, thisArg, argumentsList);
240+
}
241+
242+
return startSpan(
243+
{
244+
name: 'supabase.db.rpc',
245+
attributes: {
246+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
247+
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op,
248+
},
249+
},
250+
async span => {
251+
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => {
252+
if (messageId) {
253+
span.setAttribute('messaging.message.id', messageId);
254+
}
255+
256+
if (queueName) {
257+
span.setAttribute('messaging.destination.name', queueName);
258+
}
259+
260+
span.end();
261+
return res;
262+
});
263+
},
264+
);
265+
},
266+
},
267+
);
268+
269+
return SupabaseClient;
270+
}
271+
200272
function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn {
201273
return new Proxy(operation, {
202274
apply(target, thisArg, argumentsList) {
@@ -266,13 +338,13 @@ function instrumentSupabaseAuthClient(supabaseClientInstance: SupabaseClientInst
266338
});
267339
}
268340

269-
function instrumentSupabaseClientConstructor(SupabaseClient: unknown): void {
270-
if (instrumented.has(SupabaseClient)) {
341+
function instrumentSupabaseClientConstructor(SupabaseClientConstructor: unknown): void {
342+
if (instrumented.has(SupabaseClientConstructor)) {
271343
return;
272344
}
273345

274-
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.from = new Proxy(
275-
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.from,
346+
(SupabaseClientConstructor as unknown as SupabaseClientConstructor).prototype.from = new Proxy(
347+
(SupabaseClientConstructor as unknown as SupabaseClientConstructor).prototype.from,
276348
{
277349
apply(target, thisArg, argumentsList) {
278350
const rv = Reflect.apply(target, thisArg, argumentsList);
@@ -466,6 +538,8 @@ const instrumentSupabase = (supabaseClientInstance: unknown): void => {
466538
supabaseClientInstance.constructor === Function ? supabaseClientInstance : supabaseClientInstance.constructor;
467539

468540
instrumentSupabaseClientConstructor(SupabaseClientConstructor);
541+
instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor);
542+
instrumentRpc(supabaseClientInstance as SupabaseClientInstance);
469543
instrumentSupabaseAuthClient(supabaseClientInstance as SupabaseClientInstance);
470544
};
471545

0 commit comments

Comments
 (0)
Please sign in to comment.