Skip to content

Commit

Permalink
Use a sync.Map for sampling decision to establish baseline perf
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 15, 2025
1 parent ea7b481 commit 5dde3d0
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -48,6 +49,8 @@ type Storage struct {
// pendingSize tracks the total size of pending writes across ReadWriters
pendingSize *atomic.Int64
codec Codec

sampled sync.Map
}

// Codec provides methods for encoding and decoding events.
Expand Down Expand Up @@ -153,32 +156,39 @@ func (rw *ReadWriter) Flush() error {

// WriteTraceSampled records the tail-sampling decision for the given trace ID.
func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
rw.lazyInit()

key := []byte(prefixSamplingDecision + traceID)
meta := entryMetaTraceUnsampled
if sampled {
meta = entryMetaTraceSampled
}
return rw.batch.Set(key, []byte{meta}, pebble.NoSync)
rw.s.sampled.Store(traceID, sampled)
return nil
//rw.lazyInit()
//
//key := []byte(prefixSamplingDecision + traceID)
//meta := entryMetaTraceUnsampled
//if sampled {
// meta = entryMetaTraceSampled
//}
//return rw.batch.Set(key, []byte{meta}, pebble.NoSync)
}

// IsTraceSampled reports whether traceID belongs to a trace that is sampled
// or unsampled. If no sampling decision has been recorded, IsTraceSampled
// returns ErrNotFound.
func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) {
rw.lazyInit()

// FIXME: this needs to be fast, as it is in the hot path
// It should minimize disk IO on miss due to
// 1. (pubsub) remote sampling decision
// 2. (hot path) sampling decision not made yet
item, closer, err := rw.batch.Get([]byte(prefixSamplingDecision + traceID))
if err == pebble.ErrNotFound {
if sampled, ok := rw.s.sampled.Load(traceID); !ok {
return false, ErrNotFound
} else {
return sampled.(bool), nil
}
defer closer.Close()
return item[0] == entryMetaTraceSampled, nil
//rw.lazyInit()
//
//// FIXME: this needs to be fast, as it is in the hot path
//// It should minimize disk IO on miss due to
//// 1. (pubsub) remote sampling decision
//// 2. (hot path) sampling decision not made yet
//item, closer, err := rw.batch.Get([]byte(prefixSamplingDecision + traceID))
//if err == pebble.ErrNotFound {
// return false, ErrNotFound
//}
//defer closer.Close()
//return item[0] == entryMetaTraceSampled, nil
}

// WriteTraceEvent writes a trace event to storage.
Expand Down

0 comments on commit 5dde3d0

Please sign in to comment.