From 5dde3d048d61746d12eb3acffa44f58acf8961b9 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 15 Jan 2025 15:55:48 +0000 Subject: [PATCH] Use a sync.Map for sampling decision to establish baseline perf --- .../sampling/eventstorage/storage.go | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index f7f4c586d3..b5d68320c0 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -8,6 +8,7 @@ import ( "bytes" "errors" "fmt" + "sync" "sync/atomic" "time" @@ -48,6 +49,8 @@ type Storage struct { // pendingSize tracks the total size of pending writes across ReadWriters pendingSize *atomic.Int64 codec Codec + + sampled sync.Map } // Codec provides methods for encoding and decoding events. @@ -153,32 +156,39 @@ func (rw *ReadWriter) Flush() error { // WriteTraceSampled records the tail-sampling decision for the given trace ID. func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error { - rw.lazyInit() - - key := []byte(prefixSamplingDecision + traceID) - meta := entryMetaTraceUnsampled - if sampled { - meta = entryMetaTraceSampled - } - return rw.batch.Set(key, []byte{meta}, pebble.NoSync) + rw.s.sampled.Store(traceID, sampled) + return nil + //rw.lazyInit() + // + //key := []byte(prefixSamplingDecision + traceID) + //meta := entryMetaTraceUnsampled + //if sampled { + // meta = entryMetaTraceSampled + //} + //return rw.batch.Set(key, []byte{meta}, pebble.NoSync) } // IsTraceSampled reports whether traceID belongs to a trace that is sampled // or unsampled. If no sampling decision has been recorded, IsTraceSampled // returns ErrNotFound. func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) { - rw.lazyInit() - - // FIXME: this needs to be fast, as it is in the hot path - // It should minimize disk IO on miss due to - // 1. (pubsub) remote sampling decision - // 2. (hot path) sampling decision not made yet - item, closer, err := rw.batch.Get([]byte(prefixSamplingDecision + traceID)) - if err == pebble.ErrNotFound { + if sampled, ok := rw.s.sampled.Load(traceID); !ok { return false, ErrNotFound + } else { + return sampled.(bool), nil } - defer closer.Close() - return item[0] == entryMetaTraceSampled, nil + //rw.lazyInit() + // + //// FIXME: this needs to be fast, as it is in the hot path + //// It should minimize disk IO on miss due to + //// 1. (pubsub) remote sampling decision + //// 2. (hot path) sampling decision not made yet + //item, closer, err := rw.batch.Get([]byte(prefixSamplingDecision + traceID)) + //if err == pebble.ErrNotFound { + // return false, ErrNotFound + //} + //defer closer.Close() + //return item[0] == entryMetaTraceSampled, nil } // WriteTraceEvent writes a trace event to storage.