Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions packages/plugins/opentelemetry/src/plugin-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,23 @@ export function withState<
function addStateGetters(src: any) {
const result: any = {};
for (const [hookName, hook] of Object.entries(src) as any) {
result[hookName] =
typeof hook !== 'function'
? hook
: (payload: any, ...args: any[]) =>
hook(
{
...payload,
get state() {
return getState(payload);
},
if (typeof hook !== 'function') {
result[hookName] = hook;
} else {
result[hookName] = {
[hook.name](payload: any, ...args: any[]) {
return hook(
{
...payload,
get state() {
return getState(payload);
},
...args,
);
},
...args,
);
},
}[hook.name];
}
}
return result;
}
Expand Down
43 changes: 42 additions & 1 deletion packages/plugins/opentelemetry/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ import {
createHttpSpan,
startSubgraphExecuteFetchSpan as createSubgraphExecuteFetchSpan,
createUpstreamHttpFetchSpan,
recordCacheError,
recordCacheEvent,
registerException,
setExecutionAttributesOnOperationSpan,
setExecutionResultAttributes,
setGraphQLExecutionAttributes,
setGraphQLExecutionResultAttributes,
setGraphQLParseAttributes,
Expand Down Expand Up @@ -204,6 +207,10 @@ export type OpenTelemetryGatewayPluginOptions =
* Enable/disable upstream HTTP fetch calls spans (default: true).
*/
upstreamFetch?: BooleanOrPredicate<ExecutionRequest | undefined>;
/**
* Enable/Disable cache related span events (default: true).
*/
cache?: BooleanOrPredicate<{ key: string; action: 'read' | 'write' }>;
};
};

Expand Down Expand Up @@ -662,6 +669,25 @@ export function useOpenTelemetry(
});
},

onCacheGet: (payload) =>
shouldTrace(options.spans?.cache, { key: payload.key, action: 'read' })
? {
onCacheMiss: () => recordCacheEvent('miss', payload),
onCacheHit: () => recordCacheEvent('hit', payload),
onCacheGetError: ({ error }) =>
recordCacheError('read', error, payload),
}
: undefined,

onCacheSet: (payload) =>
shouldTrace(options.spans?.cache, { key: payload.key, action: 'write' })
? {
onCacheSetDone: () => recordCacheEvent('write', payload),
onCacheSetError: ({ error }) =>
recordCacheError('write', error, payload),
}
: undefined,

onResponse({ response, state }) {
try {
state.forRequest.otel &&
Expand All @@ -671,7 +697,7 @@ export function useOpenTelemetry(
}
},

onParams({ state, context: gqlCtx, params }) {
onParams: function onParamsOTEL({ state, context: gqlCtx, params }) {
if (
!isParentEnabled(state) ||
!shouldTrace(options.spans?.graphql, gqlCtx)
Expand All @@ -683,6 +709,21 @@ export function useOpenTelemetry(
setParamsAttributes({ ctx, params });
},

onExecutionResult: function onExeResOTEL({
result,
context: gqlCtx,
state,
}) {
if (
!isParentEnabled(state) ||
!shouldTrace(options.spans?.graphql, gqlCtx)
) {
return;
}

setExecutionResultAttributes({ ctx: getContext(state), result });
},

onParse({ state, context: gqlCtx }) {
if (
!isParentEnabled(state) ||
Expand Down
50 changes: 50 additions & 0 deletions packages/plugins/opentelemetry/src/spans.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { OnCacheGetHookEventPayload } from '@graphql-hive/gateway-runtime';
import { defaultPrintFn } from '@graphql-mesh/transport-common';
import {
getOperationASTFromDocument,
Expand All @@ -12,6 +13,11 @@ import {
type Context,
type Tracer,
} from '@opentelemetry/api';
import {
SEMATTRS_EXCEPTION_MESSAGE,
SEMATTRS_EXCEPTION_STACKTRACE,
SEMATTRS_EXCEPTION_TYPE,
} from '@opentelemetry/semantic-conventions';
import type { ExecutionArgs } from 'graphql';
import type { GraphQLParams } from 'graphql-yoga';
import {
Expand Down Expand Up @@ -78,6 +84,10 @@ export function setResponseAttributes(ctx: Context, response: Response) {
const span = trace.getSpan(ctx);
if (span) {
span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status);
span.setAttribute(
'gateway.cache.response_cache',
response.status === 304 && response.headers.get('ETag') ? 'hit' : 'miss',
);
span.setStatus({
code: response.ok ? SpanStatusCode.OK : SpanStatusCode.ERROR,
message: response.ok ? undefined : response.statusText,
Expand Down Expand Up @@ -385,6 +395,46 @@ export function setUpstreamFetchResponseAttributes(input: {
});
}

export function recordCacheEvent(
event: string,
payload: OnCacheGetHookEventPayload,
) {
trace.getActiveSpan()?.addEvent('gateway.cache.' + event, {
'gateway.cache.key': payload.key,
'gateway.cache.ttl': payload.ttl,
});
}

export function recordCacheError(
action: 'read' | 'write',
error: Error,
payload: OnCacheGetHookEventPayload,
) {
trace.getActiveSpan()?.addEvent('gateway.cache.error', {
'gateway.cache.key': payload.key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good naming!

nit: can also be graphql.cache.key to make it more unique in a full architecture,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or graphql.response_cache.key

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not just response caching. Cache storage can be used in other places.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure about graphql.cache.key, the cache instance can actually be used to cache anything (like the schema)

'gateway.cache.ttl': payload.ttl,
'gateway.cache.action': action,
[SEMATTRS_EXCEPTION_TYPE]:
'code' in error ? (error.code as string) : error.message,
[SEMATTRS_EXCEPTION_MESSAGE]: error.message,
[SEMATTRS_EXCEPTION_STACKTRACE]: error.stack,
});
}

const responseCacheSymbol = Symbol.for('servedFromResponseCache');
export function setExecutionResultAttributes(input: {
ctx: Context;
result?: any; // We don't need a proper type here because we rely on Symbol mark from response cache plugin
}) {
const span = trace.getSpan(input.ctx);
if (input.result && span) {
span.setAttribute(
'gateway.cache.response_cache',
input.result[responseCacheSymbol] ? 'hit' : 'miss',
);
}
}

export function registerException(ctx: Context | undefined, error: any) {
const span = ctx && trace.getSpan(ctx);
if (!span) {
Expand Down
54 changes: 54 additions & 0 deletions packages/plugins/opentelemetry/tests/useOpenTelemetry.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,5 +555,59 @@ describe('useOpenTelemetry', () => {
children.forEach(spanTree.expectChild);
}
});

it('should have a response cache attribute', async () => {
function checkCacheAttributes(attrs: {
http: 'hit' | 'miss';
operation?: 'hit' | 'miss';
}) {
const { span: httpSpan } = spanExporter.assertRoot('POST /graphql');
const operationSpan = spanExporter.spans.find(({ name }) =>
name.startsWith('graphql.operation'),
);

expect(httpSpan.attributes['gateway.cache.response_cache']).toBe(
attrs.http,
);
if (attrs.operation) {
expect(operationSpan).toBeDefined();
expect(
operationSpan!.attributes['gateway.cache.response_cache'],
).toBe(attrs.operation);
}
}
await using gateway = await buildTestGateway({
gatewayOptions: {
cache: await import('@graphql-mesh/cache-localforage').then(
({ default: Cache }) => new Cache(),
),
responseCaching: {
session: () => '1',
},
},
});
await gateway.query();

checkCacheAttributes({ http: 'miss', operation: 'miss' });

spanExporter.reset();
await gateway.query();

checkCacheAttributes({ http: 'miss', operation: 'hit' });

spanExporter.reset();
const response = await gateway.fetch('http://gateway/graphql', {
method: 'POST',
headers: {
'content-type': 'application/json',
'If-None-Match':
'c2f6fb105ef60ccc99dd6725b55939742e69437d4f85d52bf4664af3799c49fa',
'If-Modified-Since': new Date(),
},
});
expect(response.status).toBe(304);

checkCacheAttributes({ http: 'hit' }); // There is no graphql operation span when cached by HTTP
});
});
});
23 changes: 19 additions & 4 deletions packages/plugins/opentelemetry/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,39 @@ export async function buildTestGateway(

return {
otelPlugin: otelPlugin!,
query: async (
body: GraphQLParams = {
query: async ({
shouldReturnErrors,
body = {
query: /* GraphQL */ `
query {
hello
}
`,
},
) => {
}: {
body?: GraphQLParams;
shouldReturnErrors?: boolean;
} = {}) => {
const response = await gateway.fetch('http://localhost:4000/graphql', {
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify(body),
});
return response.json();

const result = await response.json();
if (shouldReturnErrors) {
expect(result.errors).toBeDefined();
} else {
if (result.errors) {
console.error(result.errors);
}
expect(result.errors).not.toBeDefined();
}
return result;
},
fetch: gateway.fetch,
[Symbol.asyncDispose]: () => {
diag.disable();
return stack.disposeAsync();
Expand Down
Loading