Skip to content

Traceparent not forwarded via PubSub #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
dkrizic opened this issue Jan 29, 2023 · 5 comments · May be fixed by #717
Open

Traceparent not forwarded via PubSub #355

dkrizic opened this issue Jan 29, 2023 · 5 comments · May be fixed by #717
Labels
bug Something isn't working

Comments

@dkrizic
Copy link

dkrizic commented Jan 29, 2023

I don't know if this is a bug or feature request. I have two services "server" and "echo". Server sends send a message via pubsub to echo. Both services are fully instrumented with OpenTelemetry. A call to server give me the following trace

image

The last method SendNotifcation forwards the instrumented ctx to the DAPR client:

func (n *Sender) SendNotification(ctx context.Context, message []byte) error {
	ctx, span := otel.Tracer("sender").Start(ctx, "SendNotification")
	defer span.End()

	llog := log.WithFields(log.Fields{
		"pubsubName": n.PubSubName,
		"topicName":  n.TopicName,
		"message":    string(message),
	})
	llog.Debug("Sending sender")
	err := n.client.PublishEvent(ctx, n.PubSubName, n.TopicName, message)
	if err != nil {
		llog.WithError(err).Warn("Unable to send sender")
		return err
	}
	return nil
}

But the transported message has a new traceid:

{
    "data": "{\"Before\":{\"Id\":\"77\",\"Title\":\"Title\",\"Description\":\"The description\",\"Status\":\"\"},\"After\":{\"Id\":\"77\",\"Title\":\"Title\",\"Description\":\"The description\",\"Status\":\"\"},\"ChangeType\":\"CREATE\"}",
    "datacontenttype": "text/plain",
    "id": "b946b14e-1770-4316-8483-9eca55602318",
    "pubsubname": "todo-pubsub",
    "source": "todo",
    "specversion": "1.0",
    "time": "2023-01-29T11:22:03Z",
    "topic": "todo",
    "traceid": "00-f4905fb842564c59ca0905e18f408be7-24f05a94522b81d3-01",
    "traceparent": "00-f4905fb842564c59ca0905e18f408be7-24f05a94522b81d3-01",
    "tracestate": "",
    "type": "com.dapr.event.sent"
}

Receiving the trace works again:

image

But alltogether I have two traces:

image

Questions:

  • Is this feature missing?
  • Am I doing something wrong?
  • Does this make sense at all having a trace over asynchronous communication?
@dkrizic dkrizic added the bug Something isn't working label Jan 29, 2023
@dkrizic dkrizic changed the title Transparent not forwarded via PubSub Traceparent not forwarded via PubSub Jan 29, 2023
@dkrizic
Copy link
Author

dkrizic commented Jan 29, 2023

This code does not forward the tradeid as well

func (n *Sender) SendNotification(ctx context.Context, message []byte) error {
	ctx, span := otel.Tracer("sender").Start(ctx, "SendNotification")
	defer span.End()

	llog := log.WithFields(log.Fields{
		"pubsubName": n.PubSubName,
		"topicName":  n.TopicName,
		"message":    string(message),
	})
	llog.Debug("Sending sender")
	client, err := dapr.NewClient()
	defer client.Close()
	if err != nil {
		llog.WithError(err).Warn("Unable to create dapr client")
		span.RecordError(err)
		return err
	}
	ctx = client.WithTraceID(ctx, span.SpanContext().TraceID().String())
	err = client.PublishEvent(ctx, n.PubSubName, n.TopicName, message)
	if err != nil {
		llog.WithError(err).Warn("Unable to send sender")
		span.RecordError(err)
		return err
	}
	return nil
}

using WithTraceID like

ctx = client.WithTraceID(ctx, span.SpanContext().TraceID().String())

@linas-ipxo
Copy link

It seems that if you pass the grpc trace header with the context, it works then. Not sure if this is intended, or a bug, but as a workaround the following works.

you will need either the full import of github.com/dapr/dapr/pkg/diagnostics/utils or copy out the BinaryFromSpanContext function from https://github.com/dapr/dapr/blob/release-1.13/pkg/diagnostics/utils/trace_utils.go#L134 , as we need it to construct binary traceparent.

Then before .PublishEvent,

	sc := trace.SpanContextFromContext(ctx)
	tid := utils.BinaryFromSpanContext(sc)
	md := metadata.Pairs("grpc-trace-bin", string(tid))
	ctx = metadata.NewOutgoingContext(ctx, md)

Then using the new context with grpc metadata appears to propagate the traceparent correctly across dapr, and traces get connected.

This is specifically for GRPC client.

A HTTP client (I'm not sure if the sdk provides one, I tested simply using http requests to a dapr sidecar) appears to work as-is.

@mkruczek
Copy link

@linas-ipxo i have done this, and play a little bit with trace, span and ctx it self, but nothing works... at the end i always got "traceid":"00-1234567890abcdef1234567890abcdef-1fa19ef014f128c8-01". maybe you have working example?

@mkruczek
Copy link

something was achieved using PublishEventWithMetadata , and it looks like this:

	var metadata map[string]string
	if traceParentID != "" {
		metadata = map[string]string{
			 "cloudevent.traceparent": traceParentID,
		}
	}

	if err := client.PublishEvent(ctx, p.publisher, p.topic, data, dapr.PublishEventWithMetadata(metadata)); err != nil {
		log.Errorf("Error publishing event: %v", err)
		return err
	}
	

more info here: https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-cloudevents/#replace-dapr-generated-cloudevents-values

@linas-ipxo
Copy link

@linas-ipxo i have done this, and play a little bit with trace, span and ctx it self, but nothing works... at the end i always got "traceid":"00-1234567890abcdef1234567890abcdef-1fa19ef014f128c8-01". maybe you have working example?

I'm not entirely sure what you mean, or expected to see, but the following code is what I was using while testing this.
Not sure if it works as-is fully, as this is a minimzed version of a bigger project I was doing, and i haven't tested this exact code, just checked for syntax. But this is mostly copy-pasted from the bigger project.
Maybe you can find answers in here.

package daprtest

import (
	"context"
	"encoding/json"
	"go-api/services/trace"

	dapr "github.com/dapr/go-sdk/client"

	otel "go.opentelemetry.io/otel"
	sdktrace "go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc/metadata"

	"github.com/google/uuid"

	cloudevents "github.com/cloudevents/sdk-go/v2"
)

var daprClient dapr.Client

func DoTheThing() {

	var err error

	daprClient, err = dapr.NewClient()
	if err != nil {
		panic("cannot create dapr client")
	}

	// context would come from some other place, which has instantiated the parent span already
	// otherwise this will be the root span with new trace id
	ctx := context.Background()

	_, span := trace.Tracer.Start(ctx, "lalalal")
	defer span.End()

	data := map[string]string{"something": "happened"}

	publishEvent("topic", ctx, data)
}

func publishEvent(topic string, ctx context.Context, data interface{}) {

	sc := sdktrace.SpanContextFromContext(ctx)
	tid := string(binaryFromSpanContext(sc))

	md := metadata.Pairs("grpc-trace-bin", tid)
	newCtx := metadata.NewOutgoingContext(ctx, md)

	err := daprClient.PublishEvent(
		newCtx, // this new context contains grpc metadata with "grpc-trace-bin"
		"pubsub",
		topic,
		formatRawCloudEvent(topic, ctx, data),
	)

	if err != nil {
		panic(err)
	}
}

// https://github.com/dapr/dapr/blob/release-1.13/pkg/diagnostics/utils/trace_utils.go#L134
func binaryFromSpanContext(sc sdktrace.SpanContext) []byte {
	traceID := sc.TraceID()
	spanID := sc.SpanID()
	traceFlags := sc.TraceFlags()

	var b [29]byte
	copy(b[2:18], traceID[:])
	b[18] = 1
	copy(b[19:27], spanID[:])
	b[27] = 2
	b[28] = uint8(traceFlags)

	return b[:]
}

func formatRawCloudEvent(topic string, ctx context.Context, data interface{}) string {
	event := cloudevents.NewEvent()
	event.SetID(uuid.NewString())
	event.SetSource("test-source")
	event.SetType(topic)

	carrier := CloudEventCarrier{Event: &event}

	event.SetData("application/json", data)

	otel.GetTextMapPropagator().Inject(ctx, carrier)

	j, err := json.Marshal(event)
	if err != nil {
		panic(err)
	}

	return string(j)
}

/** cloud event carrier for otel */

type CloudEventCarrier struct {
	Event *cloudevents.Event
}

// Get returns the value associated with the passed key.
func (c CloudEventCarrier) Get(key string) string {
	ext, err := c.Event.Context.GetExtension(key)
	if err != nil {
		return ""
	}

	return ext.(string)
}

// Set stores the key-value pair.
func (c CloudEventCarrier) Set(key, value string) {
	c.Event.Context.SetExtension(key, value)
}

// Keys lists the keys stored in this carrier.
func (c CloudEventCarrier) Keys() []string {
	ext := c.Event.Context.GetExtensions()

	keys := make([]string, 0, len(ext))
	for k := range ext {
		keys = append(keys, k)
	}

	return keys
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants