Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Publish latency spikes in certain messages (low throughput publisher) #11449

Open
agis opened this issue Jan 14, 2025 · 2 comments
Open
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged.

Comments

@agis
Copy link

agis commented Jan 14, 2025

Client

cloud.google.com/go/pubsub v1.44.0

Environment

Cloud Run
go 1.23.4

Code and Dependencies

var (
  wg             sync.WaitGroup
  publishResults sync.Map
)

for _, job := range jobs {
  result := pubsub.Publish(ctx, topic, job)

  wg.Add(1)
  go func(res *pubsub.PublishResult) {
    defer wg.Done()

    publishStart := d.clock.Now().UTC()

    err := res.Get(ctx)
    if err != nil {
      // report to external monitoring system
      return
    }

    publishResults.Store(job.ID, true)

    _ = statsd.Distribution("publish_latency", time.Now().UTC().Sub(publishStart).Seconds(), 1)
  }(result)
}

wg.Wait()

Expected behavior

We expect all messages to be published within 100ms.

Actual behavior

We have a few outliers: while most messages are published within 20ms, a few of them take up to 5s.

Screenshots

The following are traces from Datadog, gathered via https://github.com/DataDog/dd-trace-go/tree/main/contrib/cloud.google.com/go/pubsub.v1:

Image

This is consistent with what GCP's topic.send_request_latencies.avg reports, pretty much the same time:

Image

Additional context

@agis agis added the triage me I really want to be triaged. label Jan 14, 2025
@agis agis changed the title pubsub: Publish latency bursts pubsub: Publish latency spikes Jan 14, 2025
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 14, 2025
@agis agis changed the title pubsub: Publish latency spikes pubsub: Publish latency spikes (low-throughput usecase) Jan 14, 2025
@agis agis changed the title pubsub: Publish latency spikes (low-throughput usecase) pubsub: Publish latency spikes in certain messages (low throughput publisher) Jan 14, 2025
@hongalex
Copy link
Member

Could you show more of the code, specifically on how the topic object is passed around? Historically an issue of higher latency and Cloud Run (or Cloud Functions) is that the object isn't properly cached.

@agis
Copy link
Author

agis commented Jan 15, 2025

We have a thin client wrapper, which keeps a reference to the created topic. We use that same reference throughout the code, and we initialize the client once in our main, so I believe we do use it as intented.

import (
	"context"
	"encoding/json"
	"errors"

	"cloud.google.com/go/pubsub"
	pubsubtrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/cloud.google.com/go/pubsub.v1"
)


type Client struct {
	pubsubClient *pubsub.Client
	topic        *pubsub.Topic
}

func NewClient(ctx context.Context, topicName string) *Client {
	client := &Client{}
        client.pubsubClient = createPubsubClient(ctx)

	client.topic = client.pubsubClient.Topic(topicName)
	client.topic.PublishSettings = pubsub.PublishSettings{
		ByteThreshold:  DefaultByteThreshold,
		CountThreshold: DefaultCountThreshold,
	}
	
	return client
}

func (c *Client) PublishAsync(ctx context.Context, message Message) *AsyncPublishResult {
	messageJSON, err := json.Marshal(message)
	if err != nil {
		return nil, err
	}

	msg := &pubsub.Message{
		Data:       messageJSON,
		Attributes: message.Attributes(),
	}

	// Publish the message
	result := pubsubtrace.Publish(ctx, c.topic, msg)
	return &AsyncPublishResult{
		result: result,
	}
}

type AsyncPublishResult struct {
	err    error
	result *pubsubtrace.PublishResult
}

func (r *AsyncPublishResult) Get(ctx context.Context) error {
	if r.err != nil {
		return r.err
	}
	if r.result != nil {
		_, err := r.result.Get(ctx)
		if err != nil {
			return err
		}
	}
	return nil
}

Note that it's the call to result.Get that is slow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged.
Projects
None yet
Development

No branches or pull requests

2 participants