Skip to content

Commit

Permalink
link an otel span to apm parent span
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs committed Jan 24, 2025
1 parent 99f84c0 commit 5f3e279
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/rumv3"
v2 "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/v2"
"github.com/elastic/apm-data/model/modelpb"
"go.opentelemetry.io/otel/trace"
)

var (
Expand Down Expand Up @@ -234,6 +235,7 @@ func (p *Processor) readBatch(
// Callers must not access result concurrently with HandleStream.
func (p *Processor) HandleStream(
ctx context.Context,
tp trace.TracerProvider,
baseEvent *modelpb.APMEvent,
reader io.Reader,
batchSize int,
Expand All @@ -246,7 +248,7 @@ func (p *Processor) HandleStream(
//
// The semaphore defaults to 200 (N), only allowing N requests to read
// an cache Y events (determined by batchSize) from the batch.
if err := p.semAcquire(ctx); err != nil {
if err := p.semAcquire(ctx, tp); err != nil {
return fmt.Errorf("unable to service request: %w", err)
}
sr := p.getStreamReader(reader)
Expand Down Expand Up @@ -325,9 +327,10 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader {
}
}

func (p *Processor) semAcquire(ctx context.Context) error {
sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()
func (p *Processor) semAcquire(ctx context.Context, tp trace.TracerProvider) error {
tracer := tp.Tracer("internal/server/http/elastic")
ctx, span := tracer.Start(ctx, "Semaphore.Acquire")
defer span.End()

return p.sem.Acquire(ctx, 1)
}
Expand Down

0 comments on commit 5f3e279

Please sign in to comment.