Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cmd/saga/start/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package service

import (
"context"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/kevinmichaelchen/temporal-saga-grpc/pkg/saga"
"github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal/ctxpropagation"
"github.com/sirupsen/logrus"
temporalv1beta1 "go.buf.build/bufbuild/connect-go/kevinmichaelchen/temporalapis/temporal/v1beta1"
"go.opentelemetry.io/otel/baggage"
"go.temporal.io/sdk/client"
)

Expand All @@ -26,6 +29,11 @@ func (s *Service) CreateLicense(
// The business identifier of the workflow execution
workflowID := req.Msg.GetWorkflowOptions().GetWorkflowId()

ctx, err := injectBaggage(ctx, workflowID)
if err != nil {
return nil, err
}

options := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: saga.CreateLicenseTaskQueue,
Expand Down Expand Up @@ -68,3 +76,21 @@ func printResults(args saga.CreateLicenseInputArgs, workflowID, runID string) {
"temporal.run_id": runID,
}).Info("Successfully completed Workflow")
}

func injectBaggage(ctx context.Context, workflowID string) (context.Context, error) {
// Inject the workflow ID as OTel baggage, so it appears on all spans
bgMem, err := baggage.NewMember("foo", "bar")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're creating some baggage with one element 😄

if err != nil {
return nil, fmt.Errorf("failed to create otel baggage member: %w", err)
}
bg, err := baggage.New(bgMem)
if err != nil {
return nil, fmt.Errorf("failed to create otel baggage: %w", err)
}

// INJECT BAGGAGE!!
//ctx = baggage.ContextWithBaggage(ctx, bg)
ctx = context.WithValue(ctx, ctxpropagation.PropagateKey, bg)

return ctx, nil
}
Copy link
Owner Author

@kevinmichaelchen kevinmichaelchen Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are in the Temporal workflow starter. We create some OTel baggage and inject it in context using our special PropagateKey.

18 changes: 17 additions & 1 deletion cmd/saga/worker/app/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"github.com/kevinmichaelchen/temporal-saga-grpc/pkg/saga"
"github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal/ctxpropagation"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/baggage"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.uber.org/fx"
Expand Down Expand Up @@ -70,7 +72,21 @@ func NewConnToProfile() (*grpc.ClientConn, error) {
func dial(addr string) (*grpc.ClientConn, error) {
return grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithChainUnaryInterceptor(
// interceptor to extract Baggage from workflow.Context and inject into new context
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
bgv, ok := ctx.Value(ctxpropagation.PropagateKey).(ctxpropagation.Values)

if ok {
// INJECT!
ctx = baggage.ContextWithBaggage(ctx, baggage.Baggage(bgv))
}

return invoker(ctx, method, req, reply, cc, opts...)
},
// create span, do context propagation
otelgrpc.UnaryClientInterceptor(),
),
Copy link
Owner Author

@kevinmichaelchen kevinmichaelchen Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are in the Temporal worker. Before we invoke the next interceptor in the chain, we're going to pull baggage from context using our special PropagateKey and then re-set it in context.

)
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/connect/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package connect
import (
"context"
"github.com/bufbuild/connect-go"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
grpc_codes "google.golang.org/grpc/codes"
Expand Down Expand Up @@ -39,6 +41,8 @@ func connectInterceptorForSpan() connect.UnaryInterceptorFunc {
)
defer span.End()

logBaggage(ctx)

resp, err := next(ctx, req)
if err != nil {
s, _ := status.FromError(err)
Expand All @@ -53,3 +57,13 @@ func connectInterceptorForSpan() connect.UnaryInterceptorFunc {
}
return connect.UnaryInterceptorFunc(interceptor)
}

func logBaggage(ctx context.Context) {
// EXTRACT BAGGAGE!!
bg := baggage.FromContext(ctx)
members := bg.Members()
logrus.WithField("otel.baggage.num_members", len(members)).Info("OTel baggage")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're seeing 0 baggage elements 😟

//for _, m := range bg.Members() {
// span.SetAttributes(attribute.String(m.Key(), m.Value()))
//}
Copy link
Owner Author

@kevinmichaelchen kevinmichaelchen Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are in the gRPC server interceptor. The server should be able to find baggage in its context.

}
21 changes: 18 additions & 3 deletions pkg/fxmod/temporal/temporal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package temporal

import (
"context"
"github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal"
"fmt"
"github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal/ctxpropagation"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
"go.uber.org/fx"
)

Expand All @@ -14,13 +18,14 @@ var Module = fx.Module("temporal",
)

func NewClient(lc fx.Lifecycle) (client.Client, error) {
interceptors, err := temporal.ClientInterceptors()
interceptors, err := clientInterceptors()
if err != nil {
return nil, err
}

c, err := client.Dial(client.Options{
Interceptors: interceptors,
Interceptors: interceptors,
ContextPropagators: []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()},
Copy link
Owner Author

@kevinmichaelchen kevinmichaelchen Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Temporal clients use Temporal's OTel interceptor, and a special Context Propagator.

See https://github.com/temporalio/samples-go/blob/v1.3.0/ctxpropagation/starter/main.go#L24

// Optional: Sets ContextPropagators that allows users to control the context information passed through a workflow
// default: nil
ContextPropagators []ContextPropagator

})
if err != nil {
return nil, err
Expand All @@ -35,3 +40,13 @@ func NewClient(lc fx.Lifecycle) (client.Client, error) {

return c, nil
}

func clientInterceptors() ([]interceptor.ClientInterceptor, error) {
i, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create OTEL tracing interceptor: %w", err)
}
return []interceptor.ClientInterceptor{
i,
}, nil
}
81 changes: 81 additions & 0 deletions pkg/temporal/ctxpropagation/propagator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ctxpropagation

import (
"context"
"go.opentelemetry.io/otel/baggage"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"
)

type (
// contextKey is an unexported type used as key for items stored in the
// Context object
contextKey struct{}

// propagator implements the custom context propagator
propagator struct{}
)

type Values baggage.Baggage
Copy link
Owner Author

@kevinmichaelchen kevinmichaelchen Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is our special Context Propagator, pulled verbatim from temporalio/samples-go, except with just this line changed


// PropagateKey is the key used to store the value in the Context object
var PropagateKey = contextKey{}

// propagationKey is the key used by the propagator to pass values through the
// Temporal server headers
const propagationKey = "custom-header"

// NewContextPropagator returns a context propagator that propagates a set of
// string key-value pairs across a workflow
func NewContextPropagator() workflow.ContextPropagator {
return &propagator{}
}

// Inject injects values from context into headers for propagation
func (s *propagator) Inject(ctx context.Context, writer workflow.HeaderWriter) error {
value := ctx.Value(PropagateKey)
payload, err := converter.GetDefaultDataConverter().ToPayload(value)
if err != nil {
return err
}
writer.Set(propagationKey, payload)
return nil
}

// InjectFromWorkflow injects values from context into headers for propagation
func (s *propagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error {
value := ctx.Value(PropagateKey)
payload, err := converter.GetDefaultDataConverter().ToPayload(value)
if err != nil {
return err
}
writer.Set(propagationKey, payload)
return nil
}

// Extract extracts values from headers and puts them into context
func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) {
if value, ok := reader.Get(propagationKey); ok {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return ctx, nil
}
ctx = context.WithValue(ctx, PropagateKey, values)
}

return ctx, nil
}

// ExtractToWorkflow extracts values from headers and puts them into context
func (s *propagator) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) {
if value, ok := reader.Get(propagationKey); ok {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return ctx, nil
}
ctx = workflow.WithValue(ctx, PropagateKey, values)
}

return ctx, nil
}
17 changes: 0 additions & 17 deletions pkg/temporal/interceptors.go

This file was deleted.