diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 91fa489c0..f32601ea4 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -63,7 +63,7 @@ type Tracer interface { // BaseTracer is a default implementation of Tracer meant for embedding. type BaseTracer struct{} -func (BaseTracer) GetLogger(logger log.Logger, ref TracerSpanRef) log.Logger { +func (BaseTracer) GetLogger(logger log.Logger, _ TracerSpanRef) log.Logger { return logger } func (BaseTracer) SpanName(options *TracerStartSpanOptions) string { @@ -81,7 +81,7 @@ type TracerOptions struct { // // This is used internally to set the span on contexts not natively supported // by tracing systems such as [workflow.Context]. - SpanContextKey interface{} + SpanContextKey any // HeaderKey is the key name on the Temporal header to serialize the span to. // This should never be empty. @@ -194,7 +194,7 @@ func (t *tracingInterceptor) InterceptClient(next ClientOutboundInterceptor) Cli } func (t *tracingInterceptor) InterceptActivity( - ctx context.Context, + _ context.Context, next ActivityInboundInterceptor, ) ActivityInboundInterceptor { i := &tracingActivityInboundInterceptor{root: t} @@ -212,7 +212,7 @@ func (t *tracingInterceptor) InterceptWorkflow( } func (t *tracingInterceptor) InterceptNexusOperation( - ctx context.Context, + _ context.Context, next NexusOperationInboundInterceptor, ) NexusOperationInboundInterceptor { i := &tracingNexusOperationInboundInterceptor{root: t} @@ -420,7 +420,7 @@ func (t *tracingActivityInboundInterceptor) Init(outbound ActivityOutboundInterc func (t *tracingActivityInboundInterceptor) ExecuteActivity( ctx context.Context, in *ExecuteActivityInput, -) (interface{}, error) { +) (any, error) { // Start span reading from header info := activity.GetInfo(ctx) span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{ @@ -473,19 +473,18 @@ func (t *tracingWorkflowInboundInterceptor) Init(outbound WorkflowOutboundInterc func (t *tracingWorkflowInboundInterceptor) ExecuteWorkflow( ctx workflow.Context, in *ExecuteWorkflowInput, -) (interface{}, error) { +) (any, error) { // Start span reading from header - span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ - Operation: "RunWorkflow", - Name: t.info.WorkflowType.Name, - Tags: map[string]string{ - workflowIDTagKey: t.info.WorkflowExecution.ID, - runIDTagKey: t.info.WorkflowExecution.RunID, - }, - FromHeader: true, - Time: t.info.WorkflowStartTime, - IdempotencyKey: t.newIdempotencyKey(), - }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpan(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "RunWorkflow", + Name: t.info.WorkflowType.Name, + Tags: spanTagsFromWorkflowCtx(ctx), + FromHeader: true, + Time: t.info.WorkflowStartTime, + IdempotencyKey: t.newIdempotencyKey(), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if err != nil { return nil, err } @@ -499,22 +498,22 @@ func (t *tracingWorkflowInboundInterceptor) ExecuteWorkflow( func (t *tracingWorkflowInboundInterceptor) HandleSignal(ctx workflow.Context, in *HandleSignalInput) error { // Only add tracing if enabled and not replaying - if t.root.options.DisableSignalTracing || workflow.IsReplaying(ctx) { + if t.root.options.DisableSignalTracing { return t.Next.HandleSignal(ctx, in) } + // Start span reading from header - info := workflow.GetInfo(ctx) - span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ - Operation: "HandleSignal", - Name: in.SignalName, - Tags: map[string]string{ - workflowIDTagKey: info.WorkflowExecution.ID, - runIDTagKey: info.WorkflowExecution.RunID, - }, - FromHeader: true, - Time: time.Now(), - IdempotencyKey: t.newIdempotencyKey(), - }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpan(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "HandleSignal", + Name: in.SignalName, + Tags: spanTagsFromWorkflowCtx(ctx), + FromHeader: true, + Time: time.Now(), + IdempotencyKey: t.newIdempotencyKey(), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) + if err != nil { return err } @@ -529,27 +528,26 @@ func (t *tracingWorkflowInboundInterceptor) HandleSignal(ctx workflow.Context, i func (t *tracingWorkflowInboundInterceptor) HandleQuery( ctx workflow.Context, in *HandleQueryInput, -) (interface{}, error) { - // Only add tracing if enabled and not replaying - if t.root.options.DisableQueryTracing || workflow.IsReplaying(ctx) { +) (any, error) { + // Only add tracing if enabled + if t.root.options.DisableQueryTracing { return t.Next.HandleQuery(ctx, in) } // Start span reading from header - info := workflow.GetInfo(ctx) - span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ - Operation: "HandleQuery", - Name: in.QueryType, - Tags: map[string]string{ - workflowIDTagKey: info.WorkflowExecution.ID, - runIDTagKey: info.WorkflowExecution.RunID, - }, - FromHeader: true, - Time: time.Now(), - // We intentionally do not set IdempotencyKey here because queries are not recorded in - // workflow history. When the tracing interceptor's span counter is reset between workflow - // replays, old queries will not be processed which could result in idempotency key - // collisions with other queries or signals. - }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpan(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "HandleQuery", + Name: in.QueryType, + Tags: spanTagsFromWorkflowCtx(ctx), + FromHeader: true, + Time: time.Now(), + // We intentionally do not set IdempotencyKey here because queries are not recorded in + // workflow history. When the tracing interceptor's span counter is reset between workflow + // replays, old queries will not be processed which could result in idempotency key + // collisions with other queries or signals. + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) + if err != nil { return nil, err } @@ -570,23 +568,19 @@ func (t *tracingWorkflowInboundInterceptor) ValidateUpdate( return t.Next.ValidateUpdate(ctx, in) } // Start span reading from header - info := workflow.GetInfo(ctx) - currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx) - span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ - Operation: "ValidateUpdate", - Name: in.Name, - Tags: map[string]string{ - workflowIDTagKey: info.WorkflowExecution.ID, - runIDTagKey: info.WorkflowExecution.RunID, - updateIDTagKey: currentUpdateInfo.ID, - }, - FromHeader: true, - Time: time.Now(), - // We intentionally do not set IdempotencyKey here because validation is not run on - // replay. When the tracing interceptor's span counter is reset between workflow - // replays, the validator will not be processed which could result in impotency key - // collisions with other requests. - }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpan(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "ValidateUpdate", + Name: in.Name, + Tags: spanTagsFromWorkflowCtx(ctx), + FromHeader: true, + Time: time.Now(), + // We intentionally do not set IdempotencyKey here because validation is not run on + // replay. When the tracing interceptor's span counter is reset between workflow + // replays, the validator will not be processed which could result in impotency key + // collisions with other requests. + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if err != nil { return err } @@ -601,27 +595,23 @@ func (t *tracingWorkflowInboundInterceptor) ValidateUpdate( func (t *tracingWorkflowInboundInterceptor) ExecuteUpdate( ctx workflow.Context, in *UpdateInput, -) (interface{}, error) { +) (any, error) { // Only add tracing if enabled and not replaying if t.root.options.DisableUpdateTracing { return t.Next.ExecuteUpdate(ctx, in) } // Start span reading from header - info := workflow.GetInfo(ctx) - currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx) - span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ - // Using operation name "HandleUpdate" to match other SDKs and by consistence with other operations - Operation: "HandleUpdate", - Name: in.Name, - Tags: map[string]string{ - workflowIDTagKey: info.WorkflowExecution.ID, - runIDTagKey: info.WorkflowExecution.RunID, - updateIDTagKey: currentUpdateInfo.ID, - }, - FromHeader: true, - Time: time.Now(), - IdempotencyKey: t.newIdempotencyKey(), - }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpan(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + // Using operation name "HandleUpdate" to match other SDKs and by consistence with other operations + Operation: "HandleUpdate", + Name: in.Name, + Tags: spanTagsFromWorkflowCtx(ctx), + FromHeader: true, + Time: time.Now(), + IdempotencyKey: t.newIdempotencyKey(), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if err != nil { return nil, err } @@ -641,10 +631,19 @@ type tracingWorkflowOutboundInterceptor struct { func (t *tracingWorkflowOutboundInterceptor) ExecuteActivity( ctx workflow.Context, activityType string, - args ...interface{}, + args ...any, ) workflow.Future { // Start span writing to header - span, ctx, err := t.startNonReplaySpan(ctx, "StartActivity", activityType, true, t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpanWithFuture(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "StartActivity", + Name: activityType, + Time: time.Now(), + DependedOn: true, + ToHeader: true, + Tags: spanTagsFromWorkflowCtx(ctx), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if err != nil { return err } @@ -653,13 +652,35 @@ func (t *tracingWorkflowOutboundInterceptor) ExecuteActivity( return t.Next.ExecuteActivity(ctx, activityType, args...) } +func spanTagsFromWorkflowCtx(ctx workflow.Context) map[string]string { + info := workflow.GetInfo(ctx) + currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx) + tags := map[string]string{ + workflowIDTagKey: info.WorkflowExecution.ID, + runIDTagKey: info.WorkflowExecution.RunID, + } + if currentUpdateInfo != nil { + tags[updateIDTagKey] = currentUpdateInfo.ID + } + return tags +} + func (t *tracingWorkflowOutboundInterceptor) ExecuteLocalActivity( ctx workflow.Context, activityType string, - args ...interface{}, + args ...any, ) workflow.Future { // Start span writing to header - span, ctx, err := t.startNonReplaySpan(ctx, "StartActivity", activityType, true, t.root.workflowHeaderWriter(ctx)) + span, ctx, err := startNonReplaySpanWithFuture(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "StartActivity", + Name: activityType, + Time: time.Now(), + DependedOn: true, + ToHeader: true, + Tags: spanTagsFromWorkflowCtx(ctx), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if err != nil { return err } @@ -678,10 +699,18 @@ func (t *tracingWorkflowOutboundInterceptor) GetLogger(ctx workflow.Context) log func (t *tracingWorkflowOutboundInterceptor) ExecuteChildWorkflow( ctx workflow.Context, childWorkflowType string, - args ...interface{}, + args ...any, ) workflow.ChildWorkflowFuture { // Start span writing to header - span, ctx, errFut := t.startNonReplaySpan(ctx, "StartChildWorkflow", childWorkflowType, false, t.root.workflowHeaderWriter(ctx)) + span, ctx, errFut := startNonReplaySpanWithFuture(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "StartChildWorkflow", + Name: childWorkflowType, + Time: time.Now(), + ToHeader: true, + Tags: spanTagsFromWorkflowCtx(ctx), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if errFut != nil { return childWorkflowFuture{errFut} } @@ -695,13 +724,21 @@ func (t *tracingWorkflowOutboundInterceptor) SignalExternalWorkflow( workflowID string, runID string, signalName string, - arg interface{}, + arg any, ) workflow.Future { // Start span writing to header if enabled - if !t.root.options.DisableSignalTracing { + if t.root.options.DisableSignalTracing { var span TracerSpan var futErr workflow.Future - span, ctx, futErr = t.startNonReplaySpan(ctx, "SignalExternalWorkflow", signalName, false, t.root.workflowHeaderWriter(ctx)) + span, ctx, futErr = startNonReplaySpanWithFuture(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "SignalExternalWorkflow", + Name: signalName, + Time: time.Now(), + ToHeader: true, + Tags: spanTagsFromWorkflowCtx(ctx), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) if futErr != nil { return futErr } @@ -715,19 +752,29 @@ func (t *tracingWorkflowOutboundInterceptor) SignalChildWorkflow( ctx workflow.Context, workflowID string, signalName string, - arg interface{}, + arg any, ) workflow.Future { // Start span writing to header if enabled - if !t.root.options.DisableSignalTracing { - var span TracerSpan - var futErr workflow.Future - span, ctx, futErr = t.startNonReplaySpan(ctx, "SignalChildWorkflow", signalName, false, t.root.workflowHeaderWriter(ctx)) - if futErr != nil { - return futErr - } - defer span.Finish(&TracerFinishSpanOptions{}) + if t.root.options.DisableSignalTracing { + return t.Next.SignalChildWorkflow(ctx, workflowID, signalName, arg) } + var span TracerSpan + var futErr workflow.Future + span, ctx, futErr = startNonReplaySpanWithFuture(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "SignalChildWorkflow", + Name: signalName, + Time: time.Now(), + ToHeader: true, + Tags: spanTagsFromWorkflowCtx(ctx), + }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) + }) + if futErr != nil { + return futErr + } + defer span.Finish(&TracerFinishSpanOptions{}) + return t.Next.SignalChildWorkflow(ctx, workflowID, signalName, arg) } @@ -741,7 +788,15 @@ func (t *tracingWorkflowOutboundInterceptor) ExecuteNexusOperation(ctx workflow. } else { return nexusOperationFuture{workflowFutureFromErr(ctx, fmt.Errorf("unexpected operation type: %v", input.Operation))} } - span, ctx, futErr := t.startNonReplaySpan(ctx, "StartNexusOperation", input.Client.Service()+"/"+operationName, false, t.root.nexusHeaderWriter(input.NexusHeader)) + span, ctx, futErr := startNonReplaySpanWithFuture(ctx, func() (TracerSpan, workflow.Context, error) { + return t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ + Operation: "StartNexusOperation", + Name: input.Client.Service() + "/" + operationName, + Time: time.Now(), + ToHeader: true, + Tags: spanTagsFromWorkflowCtx(ctx), + }, t.root.nexusHeaderReader(input.NexusHeader), t.root.nexusHeaderWriter(input.NexusHeader)) + }) if futErr != nil { return nexusOperationFuture{futErr} } @@ -752,8 +807,8 @@ func (t *tracingWorkflowOutboundInterceptor) ExecuteNexusOperation(ctx workflow. func (t *tracingWorkflowOutboundInterceptor) NewContinueAsNewError( ctx workflow.Context, - wfn interface{}, - args ...interface{}, + wfn any, + args ...any, ) error { err := t.Next.NewContinueAsNewError(ctx, wfn, args...) if !workflow.IsReplaying(ctx) { @@ -773,32 +828,32 @@ type nopSpan struct{} func (nopSpan) Finish(*TracerFinishSpanOptions) {} +type startSpanFunc func() (TracerSpan, workflow.Context, error) + // Span always returned, even in replay. futErr is non-nil on error. -func (t *tracingWorkflowOutboundInterceptor) startNonReplaySpan( +func startNonReplaySpan( ctx workflow.Context, - operation string, - name string, - dependedOn bool, - headerWriter func(TracerSpan) error, -) (span TracerSpan, newCtx workflow.Context, futErr workflow.Future) { + startRealSpanFunc startSpanFunc, +) (span TracerSpan, newCtx workflow.Context, err error) { // Noop span if replaying if workflow.IsReplaying(ctx) { return nopSpan{}, ctx, nil } - info := workflow.GetInfo(ctx) - span, newCtx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{ - Operation: operation, - Name: name, - DependedOn: dependedOn, - Tags: map[string]string{ - workflowIDTagKey: info.WorkflowExecution.ID, - runIDTagKey: info.WorkflowExecution.RunID, - }, - ToHeader: true, - Time: time.Now(), - }, t.root.workflowHeaderReader(ctx), headerWriter) + span, newCtx, err = startRealSpanFunc() + if err != nil { + return nopSpan{}, ctx, err + } + return span, newCtx, nil +} + +func startNonReplaySpanWithFuture( + ctx workflow.Context, + startRealSpanFunc startSpanFunc, +) (span TracerSpan, newCtx workflow.Context, future workflow.Future) { + var err error + span, newCtx, err = startNonReplaySpan(ctx, startRealSpanFunc) if err != nil { - return nopSpan{}, ctx, workflowFutureFromErr(ctx, err) + return span, newCtx, workflowFutureFromErr(ctx, err) } return span, newCtx, nil } @@ -882,7 +937,7 @@ func (t *tracingInterceptor) startSpanFromWorkflowContext( // Note, this does not put the span on the context func (t *tracingInterceptor) startSpan( - ctx interface{ Value(interface{}) interface{} }, + ctx interface{ Value(any) any }, options *TracerStartSpanOptions, headerReader func() (TracerSpanRef, error), headerWriter func(span TracerSpan) error, @@ -1018,6 +1073,6 @@ type childWorkflowFuture struct{ workflow.Future } func (e childWorkflowFuture) GetChildWorkflowExecution() workflow.Future { return e } -func (e childWorkflowFuture) SignalChildWorkflow(ctx workflow.Context, signalName string, data interface{}) workflow.Future { +func (e childWorkflowFuture) SignalChildWorkflow(_ workflow.Context, _ string, _ any) workflow.Future { return e }