diff --git a/cmd/saga/start/service/service.go b/cmd/saga/start/service/service.go index 20ae35f..9903358 100644 --- a/cmd/saga/start/service/service.go +++ b/cmd/saga/start/service/service.go @@ -2,10 +2,13 @@ package service import ( "context" + "fmt" "github.com/bufbuild/connect-go" "github.com/kevinmichaelchen/temporal-saga-grpc/pkg/saga" + "github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal/ctxpropagation" "github.com/sirupsen/logrus" temporalv1beta1 "go.buf.build/bufbuild/connect-go/kevinmichaelchen/temporalapis/temporal/v1beta1" + "go.opentelemetry.io/otel/baggage" "go.temporal.io/sdk/client" ) @@ -26,6 +29,11 @@ func (s *Service) CreateLicense( // The business identifier of the workflow execution workflowID := req.Msg.GetWorkflowOptions().GetWorkflowId() + ctx, err := injectBaggage(ctx, workflowID) + if err != nil { + return nil, err + } + options := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: saga.CreateLicenseTaskQueue, @@ -68,3 +76,21 @@ func printResults(args saga.CreateLicenseInputArgs, workflowID, runID string) { "temporal.run_id": runID, }).Info("Successfully completed Workflow") } + +func injectBaggage(ctx context.Context, workflowID string) (context.Context, error) { + // Inject the workflow ID as OTel baggage, so it appears on all spans + bgMem, err := baggage.NewMember("foo", "bar") + if err != nil { + return nil, fmt.Errorf("failed to create otel baggage member: %w", err) + } + bg, err := baggage.New(bgMem) + if err != nil { + return nil, fmt.Errorf("failed to create otel baggage: %w", err) + } + + // INJECT BAGGAGE!! + //ctx = baggage.ContextWithBaggage(ctx, bg) + ctx = context.WithValue(ctx, ctxpropagation.PropagateKey, bg) + + return ctx, nil +} diff --git a/cmd/saga/worker/app/worker/worker.go b/cmd/saga/worker/app/worker/worker.go index abd0d6a..d149be3 100644 --- a/cmd/saga/worker/app/worker/worker.go +++ b/cmd/saga/worker/app/worker/worker.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "github.com/kevinmichaelchen/temporal-saga-grpc/pkg/saga" + "github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal/ctxpropagation" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/baggage" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.uber.org/fx" @@ -70,7 +72,21 @@ func NewConnToProfile() (*grpc.ClientConn, error) { func dial(addr string) (*grpc.ClientConn, error) { return grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), + grpc.WithChainUnaryInterceptor( + // interceptor to extract Baggage from workflow.Context and inject into new context + func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + bgv, ok := ctx.Value(ctxpropagation.PropagateKey).(ctxpropagation.Values) + + if ok { + // INJECT! + ctx = baggage.ContextWithBaggage(ctx, baggage.Baggage(bgv)) + } + + return invoker(ctx, method, req, reply, cc, opts...) + }, + // create span, do context propagation + otelgrpc.UnaryClientInterceptor(), + ), ) } diff --git a/pkg/connect/interceptors.go b/pkg/connect/interceptors.go index 8bf2633..29ddcba 100644 --- a/pkg/connect/interceptors.go +++ b/pkg/connect/interceptors.go @@ -3,7 +3,9 @@ package connect import ( "context" "github.com/bufbuild/connect-go" + "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" grpc_codes "google.golang.org/grpc/codes" @@ -39,6 +41,8 @@ func connectInterceptorForSpan() connect.UnaryInterceptorFunc { ) defer span.End() + logBaggage(ctx) + resp, err := next(ctx, req) if err != nil { s, _ := status.FromError(err) @@ -53,3 +57,13 @@ func connectInterceptorForSpan() connect.UnaryInterceptorFunc { } return connect.UnaryInterceptorFunc(interceptor) } + +func logBaggage(ctx context.Context) { + // EXTRACT BAGGAGE!! + bg := baggage.FromContext(ctx) + members := bg.Members() + logrus.WithField("otel.baggage.num_members", len(members)).Info("OTel baggage") + //for _, m := range bg.Members() { + // span.SetAttributes(attribute.String(m.Key(), m.Value())) + //} +} diff --git a/pkg/fxmod/temporal/temporal.go b/pkg/fxmod/temporal/temporal.go index 8ad22af..5d46418 100644 --- a/pkg/fxmod/temporal/temporal.go +++ b/pkg/fxmod/temporal/temporal.go @@ -2,8 +2,12 @@ package temporal import ( "context" - "github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal" + "fmt" + "github.com/kevinmichaelchen/temporal-saga-grpc/pkg/temporal/ctxpropagation" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/opentelemetry" + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" "go.uber.org/fx" ) @@ -14,13 +18,14 @@ var Module = fx.Module("temporal", ) func NewClient(lc fx.Lifecycle) (client.Client, error) { - interceptors, err := temporal.ClientInterceptors() + interceptors, err := clientInterceptors() if err != nil { return nil, err } c, err := client.Dial(client.Options{ - Interceptors: interceptors, + Interceptors: interceptors, + ContextPropagators: []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()}, }) if err != nil { return nil, err @@ -35,3 +40,13 @@ func NewClient(lc fx.Lifecycle) (client.Client, error) { return c, nil } + +func clientInterceptors() ([]interceptor.ClientInterceptor, error) { + i, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create OTEL tracing interceptor: %w", err) + } + return []interceptor.ClientInterceptor{ + i, + }, nil +} diff --git a/pkg/temporal/ctxpropagation/propagator.go b/pkg/temporal/ctxpropagation/propagator.go new file mode 100644 index 0000000..564c5b2 --- /dev/null +++ b/pkg/temporal/ctxpropagation/propagator.go @@ -0,0 +1,81 @@ +package ctxpropagation + +import ( + "context" + "go.opentelemetry.io/otel/baggage" + + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" +) + +type ( + // contextKey is an unexported type used as key for items stored in the + // Context object + contextKey struct{} + + // propagator implements the custom context propagator + propagator struct{} +) + +type Values baggage.Baggage + +// PropagateKey is the key used to store the value in the Context object +var PropagateKey = contextKey{} + +// propagationKey is the key used by the propagator to pass values through the +// Temporal server headers +const propagationKey = "custom-header" + +// NewContextPropagator returns a context propagator that propagates a set of +// string key-value pairs across a workflow +func NewContextPropagator() workflow.ContextPropagator { + return &propagator{} +} + +// Inject injects values from context into headers for propagation +func (s *propagator) Inject(ctx context.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(PropagateKey) + payload, err := converter.GetDefaultDataConverter().ToPayload(value) + if err != nil { + return err + } + writer.Set(propagationKey, payload) + return nil +} + +// InjectFromWorkflow injects values from context into headers for propagation +func (s *propagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(PropagateKey) + payload, err := converter.GetDefaultDataConverter().ToPayload(value) + if err != nil { + return err + } + writer.Set(propagationKey, payload) + return nil +} + +// Extract extracts values from headers and puts them into context +func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) { + if value, ok := reader.Get(propagationKey); ok { + var values Values + if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil { + return ctx, nil + } + ctx = context.WithValue(ctx, PropagateKey, values) + } + + return ctx, nil +} + +// ExtractToWorkflow extracts values from headers and puts them into context +func (s *propagator) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) { + if value, ok := reader.Get(propagationKey); ok { + var values Values + if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil { + return ctx, nil + } + ctx = workflow.WithValue(ctx, PropagateKey, values) + } + + return ctx, nil +} diff --git a/pkg/temporal/interceptors.go b/pkg/temporal/interceptors.go deleted file mode 100644 index 9682e58..0000000 --- a/pkg/temporal/interceptors.go +++ /dev/null @@ -1,17 +0,0 @@ -package temporal - -import ( - "fmt" - "go.temporal.io/sdk/contrib/opentelemetry" - "go.temporal.io/sdk/interceptor" -) - -func ClientInterceptors() ([]interceptor.ClientInterceptor, error) { - i, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to create OTEL tracing interceptor: %w", err) - } - return []interceptor.ClientInterceptor{ - i, - }, nil -}