From 53f4eabb6114519be11fe5ddc9aa2c939f87eac1 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 24 Aug 2022 23:01:25 -0400 Subject: [PATCH] [8.4] Fix event loss during reload of TBS processor (backport #8809) (#8877) * Fix event loss during reload of TBS processor (#8809) * Maintain TBS read writers outside processor lifecycle Configuration reload logic for TBS processor may lead to the existence of multiple processors at the same time which introduces race between the two processors for flush of a shard (traceID). Additionally, badger uses serializable snapshot isolation which might cause a newly created read-write shard (based on traceID) to not see data committed by older shard for the same traceID. * Finalize traces before stopping the TBS processor (cherry picked from commit 269216e0275f30a1c5f3bf68bef482c37bfcf9be) # Conflicts: # changelogs/head.asciidoc * Fix conflicts Co-authored-by: Vishal Raj Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- x-pack/apm-server/main.go | 34 +++- x-pack/apm-server/sampling/config.go | 10 ++ x-pack/apm-server/sampling/config_test.go | 4 + .../sampling/eventstorage/sharded.go | 24 +-- .../eventstorage/sharded_bench_test.go | 18 +- .../sampling/eventstorage/storage.go | 55 +++--- .../eventstorage/storage_bench_test.go | 32 ++-- .../sampling/eventstorage/storage_test.go | 41 +++-- x-pack/apm-server/sampling/processor.go | 163 +++++++++++------- .../sampling/processor_bench_test.go | 2 +- x-pack/apm-server/sampling/processor_test.go | 100 +++++++++-- 11 files changed, 323 insertions(+), 160 deletions(-) diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 9a50ad4c449..ddaab53cceb 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -40,6 +40,9 @@ var ( // badgerDB holds the badger database to use when tail-based sampling is configured. badgerMu sync.Mutex badgerDB *badger.DB + + storageMu sync.Mutex + storage *eventstorage.ShardedReadWriter ) type namedProcessor struct { @@ -108,6 +111,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er if err != nil { return nil, errors.Wrap(err, "failed to get Badger database") } + readWriters := getStorage(badgerDB) policies := make([]sampling.Policy, len(tailSamplingConfig.Policies)) for i, in := range tailSamplingConfig.Policies { @@ -121,6 +125,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er SampleRate: in.SampleRate, } } + return sampling.NewProcessor(sampling.Config{ BeatID: args.Info.ID.String(), BatchProcessor: args.BatchProcessor, @@ -141,6 +146,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er }, StorageConfig: sampling.StorageConfig{ DB: badgerDB, + Storage: readWriters, StorageDir: storageDir, StorageGCInterval: tailSamplingConfig.StorageGCInterval, StorageLimit: tailSamplingConfig.StorageLimitParsed, @@ -162,6 +168,16 @@ func getBadgerDB(storageDir string) (*badger.DB, error) { return badgerDB, nil } +func getStorage(db *badger.DB) *eventstorage.ShardedReadWriter { + storageMu.Lock() + defer storageMu.Unlock() + if storage == nil { + eventCodec := eventstorage.JSONCodec{} + storage = eventstorage.New(db, eventCodec).NewShardedReadWriter() + } + return storage +} + // runServerWithProcessors runs the APM Server and the given list of processors. // // newProcessors returns a list of processors which will process events in @@ -230,6 +246,22 @@ func closeBadger() error { return nil } +func closeStorage() { + if storage != nil { + storage.Close() + } +} + +func cleanup() (result error) { + // Close the underlying storage, the storage will be flushed on processor stop. + closeStorage() + + if err := closeBadger(); err != nil { + result = multierror.Append(result, err) + } + return result +} + func Main() error { rootCmd := newXPackRootCommand( beater.NewCreator(beater.CreatorParams{ @@ -237,7 +269,7 @@ func Main() error { }), ) result := rootCmd.Execute() - if err := closeBadger(); err != nil { + if err := cleanup(); err != nil { result = multierror.Append(result, err) } return result diff --git a/x-pack/apm-server/sampling/config.go b/x-pack/apm-server/sampling/config.go index 584844c97d7..a1c64237eb9 100644 --- a/x-pack/apm-server/sampling/config.go +++ b/x-pack/apm-server/sampling/config.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub" ) @@ -94,6 +95,12 @@ type StorageConfig struct { // DB will not be closed when the processor is closed. DB *badger.DB + // Storage holds the read writers which provide sharded, locked access to storage. + // + // Storage lives outside processor lifecycle and will not be closed when processor + // is closed + Storage *eventstorage.ShardedReadWriter + // StorageDir holds the directory in which event storage will be maintained. StorageDir string @@ -225,6 +232,9 @@ func (config StorageConfig) validate() error { if config.DB == nil { return errors.New("DB unspecified") } + if config.Storage == nil { + return errors.New("Storage unspecified") + } if config.StorageDir == "" { return errors.New("StorageDir unspecified") } diff --git a/x-pack/apm-server/sampling/config_test.go b/x-pack/apm-server/sampling/config_test.go index 5215f62b755..f3fddd763d0 100644 --- a/x-pack/apm-server/sampling/config_test.go +++ b/x-pack/apm-server/sampling/config_test.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/model" "github.com/elastic/apm-server/x-pack/apm-server/sampling" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" ) func TestNewProcessorConfigInvalid(t *testing.T) { @@ -75,6 +76,9 @@ func TestNewProcessorConfigInvalid(t *testing.T) { assertInvalidConfigError("invalid storage config: DB unspecified") config.DB = &badger.DB{} + assertInvalidConfigError("invalid storage config: Storage unspecified") + config.Storage = &eventstorage.ShardedReadWriter{} + assertInvalidConfigError("invalid storage config: StorageDir unspecified") config.StorageDir = "tbs" diff --git a/x-pack/apm-server/sampling/eventstorage/sharded.go b/x-pack/apm-server/sampling/eventstorage/sharded.go index af9558e0d24..663054e2965 100644 --- a/x-pack/apm-server/sampling/eventstorage/sharded.go +++ b/x-pack/apm-server/sampling/eventstorage/sharded.go @@ -41,10 +41,10 @@ func (s *ShardedReadWriter) Close() { } // Flush flushes all sharded storage readWriters. -func (s *ShardedReadWriter) Flush() error { +func (s *ShardedReadWriter) Flush(limit int64) error { var result error for i := range s.readWriters { - if err := s.readWriters[i].Flush(); err != nil { + if err := s.readWriters[i].Flush(limit); err != nil { result = multierror.Append(result, err) } } @@ -57,13 +57,13 @@ func (s *ShardedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) er } // WriteTraceEvent calls Writer.WriteTraceEvent, using a sharded, locked, Writer. -func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error { - return s.getWriter(traceID).WriteTraceEvent(traceID, id, event) +func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent, opts WriterOpts) error { + return s.getWriter(traceID).WriteTraceEvent(traceID, id, event, opts) } // WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer. -func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { - return s.getWriter(traceID).WriteTraceSampled(traceID, sampled) +func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error { + return s.getWriter(traceID).WriteTraceSampled(traceID, sampled, opts) } // IsTraceSampled calls Writer.IsTraceSampled, using a sharded, locked, Writer. @@ -98,10 +98,10 @@ func (rw *lockedReadWriter) Close() { rw.rw.Close() } -func (rw *lockedReadWriter) Flush() error { +func (rw *lockedReadWriter) Flush(limit int64) error { rw.mu.Lock() defer rw.mu.Unlock() - return rw.rw.Flush() + return rw.rw.Flush(limit) } func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error { @@ -110,16 +110,16 @@ func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) er return rw.rw.ReadTraceEvents(traceID, out) } -func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error { +func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent, opts WriterOpts) error { rw.mu.Lock() defer rw.mu.Unlock() - return rw.rw.WriteTraceEvent(traceID, id, event) + return rw.rw.WriteTraceEvent(traceID, id, event, opts) } -func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { +func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error { rw.mu.Lock() defer rw.mu.Unlock() - return rw.rw.WriteTraceSampled(traceID, sampled) + return rw.rw.WriteTraceSampled(traceID, sampled, opts) } func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { diff --git a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go index 0f76cacbfb5..9c2b5217dc1 100644 --- a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go @@ -16,10 +16,13 @@ import ( func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { db := newBadgerDB(b, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) sharded := store.NewShardedReadWriter() defer sharded.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } b.RunParallel(func(pb *testing.PB) { traceID := uuid.Must(uuid.NewV4()).String() @@ -27,7 +30,7 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { Transaction: &model.Transaction{ID: traceID}, } for pb.Next() { - if err := sharded.WriteTraceEvent(traceID, traceID, transaction); err != nil { + if err := sharded.WriteTraceEvent(traceID, traceID, transaction, wOpts); err != nil { b.Fatal(err) } } @@ -36,10 +39,13 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { func BenchmarkShardedWriteTransactionContended(b *testing.B) { db := newBadgerDB(b, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) sharded := store.NewShardedReadWriter() defer sharded.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } // Use a single trace ID, causing all events to go through // the same sharded writer, contending for a single lock. @@ -51,7 +57,7 @@ func BenchmarkShardedWriteTransactionContended(b *testing.B) { Transaction: &model.Transaction{ID: transactionID}, } for pb.Next() { - if err := sharded.WriteTraceEvent(traceID, transactionID, transaction); err != nil { + if err := sharded.WriteTraceEvent(traceID, transactionID, transaction, wOpts); err != nil { b.Fatal(err) } } diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 1148f6a6319..150d945bb96 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -23,10 +23,6 @@ const ( entryMetaTraceEvent = 'e' ) -const ( - storageLimitThreshold = 0.90 // Allow 90% of the quota to be used. -) - var ( // ErrNotFound is returned by by the Storage.IsTraceSampled method, // for non-existing trace IDs. @@ -42,8 +38,6 @@ var ( type Storage struct { db *badger.DB codec Codec - ttl time.Duration - limit int64 } // Codec provides methods for encoding and decoding events. @@ -53,17 +47,8 @@ type Codec interface { } // New returns a new Storage using db and codec. -// -// Storage entries expire after ttl. -// The amount of storage that can be consumed can be limited by passing in a -// limit value greater than zero. The hard limit on storage is set to 90% of -// the limit to account for delay in the size reporting by badger. -// https://github.com/dgraph-io/badger/blob/82b00f27e3827022082225221ae05c03f0d37620/db.go#L1302-L1319. -func New(db *badger.DB, codec Codec, ttl time.Duration, limit int64) *Storage { - if limit > 1 { - limit = int64(float64(limit) * storageLimitThreshold) - } - return &Storage{db: db, codec: codec, ttl: ttl, limit: limit} +func New(db *badger.DB, codec Codec) *Storage { + return &Storage{db: db, codec: codec} } // NewShardedReadWriter returns a new ShardedReadWriter, for sharded @@ -86,8 +71,8 @@ func (s *Storage) NewReadWriter() *ReadWriter { } } -func (s *Storage) limitReached() bool { - if s.limit == 0 { +func (s *Storage) limitReached(limit int64) bool { + if limit == 0 { return false } // The badger database has an async size reconciliation, with a 1 minute @@ -96,7 +81,13 @@ func (s *Storage) limitReached() bool { // lookup is cheap. lsm, vlog := s.db.Size() current := lsm + vlog - return current >= s.limit + return current >= limit +} + +// WriterOpts provides configuration options for writes to storage +type WriterOpts struct { + TTL time.Duration + StorageLimitInBytes int64 } // ReadWriter provides a means of reading events from storage, and batched @@ -134,8 +125,8 @@ const flushErrFmt = "flush pending writes: %w" // may be lost. // Flush returns ErrLimitReached when the StorageLimiter reports that // the size of LSM and Vlog files exceeds the configured threshold. -func (rw *ReadWriter) Flush() error { - if rw.s.limitReached() { +func (rw *ReadWriter) Flush(limit int64) error { + if rw.s.limitReached(limit) { return fmt.Errorf(flushErrFmt, ErrLimitReached) } err := rw.txn.Commit() @@ -148,14 +139,13 @@ func (rw *ReadWriter) Flush() error { } // WriteTraceSampled records the tail-sampling decision for the given trace ID. -func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool) error { +func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error { key := []byte(traceID) var meta uint8 = entryMetaTraceUnsampled if sampled { meta = entryMetaTraceSampled } - entry := badger.NewEntry(key[:], nil).WithMeta(meta) - return rw.writeEntry(entry.WithTTL(rw.s.ttl)) + return rw.writeEntry(badger.NewEntry(key[:], nil).WithMeta(meta), opts) } // IsTraceSampled reports whether traceID belongs to a trace that is sampled @@ -177,21 +167,18 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) { // // WriteTraceEvent may return before the write is committed to storage. // Call Flush to ensure the write is committed. -func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent) error { +func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent, opts WriterOpts) error { key := append(append([]byte(traceID), ':'), id...) data, err := rw.s.codec.EncodeEvent(event) if err != nil { return err } - return rw.writeEntry(badger.NewEntry(key[:], data). - WithMeta(entryMetaTraceEvent). - WithTTL(rw.s.ttl), - ) + return rw.writeEntry(badger.NewEntry(key[:], data).WithMeta(entryMetaTraceEvent), opts) } -func (rw *ReadWriter) writeEntry(e *badger.Entry) error { +func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error { rw.pendingWrites++ - err := rw.txn.SetEntry(e) + err := rw.txn.SetEntry(e.WithTTL(opts.TTL)) // Attempt to flush if there are 200 or more uncommitted writes. // This ensures calls to ReadTraceEvents are not slowed down; // ReadTraceEvents uses an iterator, which must sort all keys @@ -199,7 +186,7 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error { // The 200 value yielded a good balance between read and write speed: // https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643 if rw.pendingWrites >= 200 { - if err := rw.Flush(); err != nil { + if err := rw.Flush(opts.StorageLimitInBytes); err != nil { return err } } @@ -209,7 +196,7 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error { if err != badger.ErrTxnTooBig { return err } - if err := rw.Flush(); err != nil { + if err := rw.Flush(opts.StorageLimitInBytes); err != nil { return err } return rw.txn.SetEntry(e) diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index a0e8d39c22b..224b437523b 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -20,8 +20,7 @@ import ( func BenchmarkWriteTransaction(b *testing.B) { test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) { db := newBadgerDB(b, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, codec, ttl, 0) + store := eventstorage.New(db, codec) readWriter := store.NewReadWriter() defer readWriter.Close() @@ -39,12 +38,17 @@ func BenchmarkWriteTransaction(b *testing.B) { } b.ResetTimer() + + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } for i := 0; i < b.N; i++ { - if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil { + if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction, wOpts); err != nil { b.Fatal(err) } } - assert.NoError(b, readWriter.Flush()) + assert.NoError(b, readWriter.Flush(wOpts.StorageLimitInBytes)) } type testCase struct { @@ -84,10 +88,13 @@ func BenchmarkReadEvents(b *testing.B) { for _, count := range counts { b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) { db := newBadgerDB(b, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, codec, ttl, 0) + store := eventstorage.New(db, codec) readWriter := store.NewReadWriter() defer readWriter.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } for i := 0; i < count; i++ { transactionID := uuid.Must(uuid.NewV4()).String() @@ -101,7 +108,7 @@ func BenchmarkReadEvents(b *testing.B) { }, } } - if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil { + if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction, wOpts); err != nil { b.Fatal(err) } } @@ -161,15 +168,18 @@ func BenchmarkIsTraceSampled(b *testing.B) { // Test with varying numbers of events in the trace. db := newBadgerDB(b, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) readWriter := store.NewReadWriter() defer readWriter.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } - if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil { + if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true, wOpts); err != nil { b.Fatal(err) } - if err := readWriter.WriteTraceSampled(unsampledTraceUUID.String(), false); err != nil { + if err := readWriter.WriteTraceSampled(unsampledTraceUUID.String(), false, wOpts); err != nil { b.Fatal(err) } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_test.go b/x-pack/apm-server/sampling/eventstorage/storage_test.go index e6462263117..d2d09d38413 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_test.go @@ -34,8 +34,7 @@ func TestWriteEvents(t *testing.T) { func testWriteEvents(t *testing.T, numSpans int) { db := newBadgerDB(t, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) readWriter := store.NewShardedReadWriter() defer readWriter.Close() @@ -45,7 +44,11 @@ func testWriteEvents(t *testing.T, numSpans int) { transaction := model.APMEvent{ Transaction: &model.Transaction{ID: transactionID}, } - assert.NoError(t, readWriter.WriteTraceEvent(traceID, transactionID, &transaction)) + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } + assert.NoError(t, readWriter.WriteTraceEvent(traceID, transactionID, &transaction, wOpts)) var spanEvents []model.APMEvent for i := 0; i < numSpans; i++ { @@ -53,7 +56,7 @@ func testWriteEvents(t *testing.T, numSpans int) { span := model.APMEvent{ Span: &model.Span{ID: spanID}, } - assert.NoError(t, readWriter.WriteTraceEvent(traceID, spanID, &span)) + assert.NoError(t, readWriter.WriteTraceEvent(traceID, spanID, &span, wOpts)) spanEvents = append(spanEvents, span) } afterWrite := time.Now() @@ -64,7 +67,7 @@ func testWriteEvents(t *testing.T, numSpans int) { assert.ElementsMatch(t, append(spanEvents, transaction), batch) // Flush in order for the writes to be visible to other readers. - assert.NoError(t, readWriter.Flush()) + assert.NoError(t, readWriter.Flush(wOpts.StorageLimitInBytes)) var recorded []model.APMEvent assert.NoError(t, db.View(func(txn *badger.Txn) error { @@ -81,8 +84,8 @@ func testWriteEvents(t *testing.T, numSpans int) { // started and finished writing + the TTL. The expiry time // is recorded as seconds since the Unix epoch, hence the // truncation. - lowerBound := beforeWrite.Add(ttl).Truncate(time.Second) - upperBound := afterWrite.Add(ttl).Truncate(time.Second) + lowerBound := beforeWrite.Add(wOpts.TTL).Truncate(time.Second) + upperBound := afterWrite.Add(wOpts.TTL).Truncate(time.Second) assert.Condition(t, func() bool { return !lowerBound.After(expiryTime) }, "expiry time %s is before %s", expiryTime, lowerBound) @@ -104,14 +107,17 @@ func testWriteEvents(t *testing.T, numSpans int) { func TestWriteTraceSampled(t *testing.T) { db := newBadgerDB(t, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) readWriter := store.NewShardedReadWriter() defer readWriter.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } before := time.Now() - assert.NoError(t, readWriter.WriteTraceSampled("sampled_trace_id", true)) - assert.NoError(t, readWriter.WriteTraceSampled("unsampled_trace_id", false)) + assert.NoError(t, readWriter.WriteTraceSampled("sampled_trace_id", true, wOpts)) + assert.NoError(t, readWriter.WriteTraceSampled("unsampled_trace_id", false, wOpts)) // We can read our writes without flushing. isSampled, err := readWriter.IsTraceSampled("sampled_trace_id") @@ -119,7 +125,7 @@ func TestWriteTraceSampled(t *testing.T) { assert.True(t, isSampled) // Flush in order for the writes to be visible to other readers. - assert.NoError(t, readWriter.Flush()) + assert.NoError(t, readWriter.Flush(wOpts.StorageLimitInBytes)) sampled := make(map[string]bool) assert.NoError(t, db.View(func(txn *badger.Txn) error { @@ -130,7 +136,7 @@ func TestWriteTraceSampled(t *testing.T) { expiresAt := item.ExpiresAt() expiryTime := time.Unix(int64(expiresAt), 0) assert.Condition(t, func() bool { - return !before.After(expiryTime) && !expiryTime.After(before.Add(ttl)) + return !before.After(expiryTime) && !expiryTime.After(before.Add(wOpts.TTL)) }) key := string(item.Key()) @@ -154,8 +160,7 @@ func TestWriteTraceSampled(t *testing.T) { func TestReadTraceEvents(t *testing.T) { db := newBadgerDB(t, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} require.NoError(t, db.Update(func(txn *badger.Txn) error { @@ -201,8 +206,7 @@ func TestReadTraceEvents(t *testing.T) { func TestReadTraceEventsDecodeError(t *testing.T) { db := newBadgerDB(t, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} require.NoError(t, db.Update(func(txn *badger.Txn) error { @@ -224,8 +228,7 @@ func TestReadTraceEventsDecodeError(t *testing.T) { func TestIsTraceSampled(t *testing.T) { db := newBadgerDB(t, badgerOptions) - ttl := time.Minute - store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0) + store := eventstorage.New(db, eventstorage.JSONCodec{}) require.NoError(t, db.Update(func(txn *badger.Txn) error { if err := txn.SetEntry(badger.NewEntry([]byte("sampled_trace_id"), nil).WithMeta('s')); err != nil { diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 8b037a6b074..f40666094e2 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -33,10 +33,11 @@ const ( // loggerRateLimit is the maximum frequency at which "too many groups" and // "write failure" log messages are logged. loggerRateLimit = time.Minute -) -// ErrStopped is returned when calling ProcessBatch on a stopped Processor. -var ErrStopped = errors.New("processor is stopped") + // shutdownGracePeriod is the time that the processor has to gracefully + // terminate after the stop method is called. + shutdownGracePeriod = 5 * time.Second +) // Processor is a tail-sampling event processor. type Processor struct { @@ -45,8 +46,7 @@ type Processor struct { rateLimitedLogger *logp.Logger groups *traceGroups - storageMu sync.RWMutex - storage *eventstorage.ShardedReadWriter + eventStore *wrappedRW eventMetrics *eventMetrics // heap-allocated for 64-bit alignment stopMu sync.Mutex @@ -71,17 +71,13 @@ func NewProcessor(config Config) (*Processor, error) { return nil, errors.Wrap(err, "invalid tail-sampling config") } - eventCodec := eventstorage.JSONCodec{} - storage := eventstorage.New(config.DB, eventCodec, config.TTL, int64(config.StorageLimit)) logger := logp.NewLogger(logs.Sampling) - readWriter := storage.NewShardedReadWriter() - p := &Processor{ config: config, logger: logger, rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)), groups: newTraceGroups(config.Policies, config.MaxDynamicServices, config.IngestRateDecayFactor), - storage: readWriter, + eventStore: newWrappedRW(config.Storage, config.TTL, int64(config.StorageLimit)), eventMetrics: &eventMetrics{}, stopping: make(chan struct{}), stopped: make(chan struct{}), @@ -116,8 +112,6 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) { monitoring.ReportInt(V, "dynamic_service_groups", int64(numDynamicGroups)) monitoring.ReportNamespace(V, "storage", func() { - p.storageMu.RLock() - defer p.storageMu.RUnlock() lsmSize, valueLogSize := p.config.DB.Size() monitoring.ReportInt(V, "lsm_size", int64(lsmSize)) monitoring.ReportInt(V, "value_log_size", int64(valueLogSize)) @@ -144,11 +138,6 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) { // All other trace events will either be dropped (e.g. known to not // be tail-sampled), or stored for possible later publication. func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error { - p.storageMu.RLock() - defer p.storageMu.RUnlock() - if p.storage == nil { - return ErrStopped - } events := *batch for i := 0; i < len(events); i++ { event := &events[i] @@ -221,7 +210,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo return true, false, nil } - traceSampled, err := p.storage.IsTraceSampled(event.Trace.ID) + traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.ID) switch err { case nil: // Tail-sampling decision has been made: report the transaction @@ -241,7 +230,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo if event.Parent.ID != "" { // Non-root transaction: write to local storage while we wait // for a sampling decision. - return false, true, p.storage.WriteTraceEvent( + return false, true, p.eventStore.WriteTraceEvent( event.Trace.ID, event.Transaction.ID, event, ) } @@ -271,25 +260,21 @@ sampling policies without service name specified. // This is a local optimisation only. To avoid creating network // traffic and load on Elasticsearch for uninteresting root // transactions, we do not propagate this to other APM Servers. - return false, false, p.storage.WriteTraceSampled(event.Trace.ID, false) + return false, false, p.eventStore.WriteTraceSampled(event.Trace.ID, false) } // The root transaction was admitted to the sampling reservoir, so we // can proceed to write the transaction to storage; we may index it later, // after finalising the sampling decision. - return false, true, p.storage.WriteTraceEvent( - event.Trace.ID, event.Transaction.ID, event, - ) + return false, true, p.eventStore.WriteTraceEvent(event.Trace.ID, event.Transaction.ID, event) } func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ error) { - traceSampled, err := p.storage.IsTraceSampled(event.Trace.ID) + traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.ID) if err != nil { if err == eventstorage.ErrNotFound { // Tail-sampling decision has not yet been made, write event to local storage. - return false, true, p.storage.WriteTraceEvent( - event.Trace.ID, event.Span.ID, event, - ) + return false, true, p.eventStore.WriteTraceEvent(event.Trace.ID, event.Span.ID, event) } return false, false, err } @@ -304,14 +289,10 @@ func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ e // badger.DB must be closed independently to ensure writes are synced to disk. func (p *Processor) Stop(ctx context.Context) error { p.stopMu.Lock() - if p.storage == nil { - // Already fully stopped. - p.stopMu.Unlock() - return nil - } select { + case <-p.stopped: case <-p.stopping: - // already stopping + // already stopped or stopping default: close(p.stopping) } @@ -324,17 +305,8 @@ func (p *Processor) Stop(ctx context.Context) error { case <-p.stopped: } - // Lock storage before stopping, to prevent closing storage while - // ProcessBatch is using it. - p.storageMu.Lock() - defer p.storageMu.Unlock() - - if err := p.storage.Flush(); err != nil { - return err - } - p.storage.Close() - p.storage = nil - return nil + // Flush event store and the underlying read writers + return p.eventStore.Flush() } // Run runs the tail-sampling processor. This method is responsible for: @@ -346,8 +318,6 @@ func (p *Processor) Stop(ctx context.Context) error { // // Run returns when a fatal error occurs or the Stop method is invoked. func (p *Processor) Run() error { - p.storageMu.RLock() - defer p.storageMu.RUnlock() defer func() { p.stopMu.Lock() defer p.stopMu.Unlock() @@ -397,6 +367,11 @@ func (p *Processor) Run() error { case <-ctx.Done(): return ctx.Err() case <-p.stopping: + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(shutdownGracePeriod): + } return context.Canceled } }) @@ -429,23 +404,33 @@ func (p *Processor) Run() error { ticker := time.NewTicker(p.config.FlushInterval) defer ticker.Stop() var traceIDs []string + + publishDecisions := func() error { + p.logger.Debug("finalizing local sampling reservoirs") + traceIDs = p.groups.finalizeSampledTraces(traceIDs) + if len(traceIDs) == 0 { + return nil + } + var g errgroup.Group + g.Go(func() error { return sendTraceIDs(ctx, publishSampledTraceIDs, traceIDs) }) + g.Go(func() error { return sendTraceIDs(ctx, localSampledTraceIDs, traceIDs) }) + if err := g.Wait(); err != nil { + return err + } + traceIDs = traceIDs[:0] + return nil + } + for { select { case <-ctx.Done(): return ctx.Err() + case <-p.stopping: + return publishDecisions() case <-ticker.C: - p.logger.Debug("finalizing local sampling reservoirs") - traceIDs = p.groups.finalizeSampledTraces(traceIDs) - if len(traceIDs) == 0 { - continue - } - var g errgroup.Group - g.Go(func() error { return sendTraceIDs(ctx, publishSampledTraceIDs, traceIDs) }) - g.Go(func() error { return sendTraceIDs(ctx, localSampledTraceIDs, traceIDs) }) - if err := g.Wait(); err != nil { + if err := publishDecisions(); err != nil { return err } - traceIDs = traceIDs[:0] } } }) @@ -465,13 +450,13 @@ func (p *Processor) Run() error { remoteDecision = true case traceID = <-localSampledTraceIDs: } - if err := p.storage.WriteTraceSampled(traceID, true); err != nil { + if err := p.eventStore.WriteTraceSampled(traceID, true); err != nil { p.rateLimitedLogger.Warnf( "received error writing sampled trace: %s", err, ) } var events model.Batch - if err := p.storage.ReadTraceEvents(traceID, &events); err != nil { + if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil { p.rateLimitedLogger.Warnf( "received error reading trace events: %s", err, ) @@ -489,11 +474,11 @@ func (p *Processor) Run() error { for _, event := range events { switch event.Processor { case model.TransactionProcessor: - if err := p.storage.DeleteTraceEvent(event.Trace.ID, event.Transaction.ID); err != nil { + if err := p.eventStore.DeleteTraceEvent(event.Trace.ID, event.Transaction.ID); err != nil { return errors.Wrap(err, "failed to delete transaction from local storage") } case model.SpanProcessor: - if err := p.storage.DeleteTraceEvent(event.Trace.ID, event.Span.ID); err != nil { + if err := p.eventStore.DeleteTraceEvent(event.Trace.ID, event.Span.ID); err != nil { return errors.Wrap(err, "failed to delete span from local storage") } } @@ -561,3 +546,61 @@ func sendTraceIDs(ctx context.Context, out chan<- string, traceIDs []string) err } return nil } + +const ( + storageLimitThreshold = 0.90 // Allow 90% of the quota to be used. +) + +// wrappedRW wraps configurable write options for global ShardedReadWriter +type wrappedRW struct { + rw *eventstorage.ShardedReadWriter + writerOpts eventstorage.WriterOpts +} + +// Stored entries expire after ttl. +// The amount of storage that can be consumed can be limited by passing in a +// limit value greater than zero. The hard limit on storage is set to 90% of +// the limit to account for delay in the size reporting by badger. +// https://github.com/dgraph-io/badger/blob/82b00f27e3827022082225221ae05c03f0d37620/db.go#L1302-L1319. +func newWrappedRW(rw *eventstorage.ShardedReadWriter, ttl time.Duration, limit int64) *wrappedRW { + if limit > 1 { + limit = int64(float64(limit) * storageLimitThreshold) + } + return &wrappedRW{ + rw: rw, + writerOpts: eventstorage.WriterOpts{ + TTL: ttl, + StorageLimitInBytes: limit, + }, + } +} + +// ReadTraceEvents calls ShardedReadWriter.ReadTraceEvents +func (s *wrappedRW) ReadTraceEvents(traceID string, out *model.Batch) error { + return s.rw.ReadTraceEvents(traceID, out) +} + +// WriteTraceEvents calls ShardedReadWriter.WriteTraceEvents using the configured WriterOpts +func (s *wrappedRW) WriteTraceEvent(traceID, id string, event *model.APMEvent) error { + return s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts) +} + +// WriteTraceSampled calls ShardedReadWriter.WriteTraceSampled using the configured WriterOpts +func (s *wrappedRW) WriteTraceSampled(traceID string, sampled bool) error { + return s.rw.WriteTraceSampled(traceID, sampled, s.writerOpts) +} + +// IsTraceSampled calls ShardedReadWriter.IsTraceSampled +func (s *wrappedRW) IsTraceSampled(traceID string) (bool, error) { + return s.rw.IsTraceSampled(traceID) +} + +// DeleteTraceEvent calls ShardedReadWriter.DeleteTraceEvent +func (s *wrappedRW) DeleteTraceEvent(traceID, id string) error { + return s.rw.DeleteTraceEvent(traceID, id) +} + +// Flush calls ShardedReadWriter.Flush +func (s *wrappedRW) Flush() error { + return s.rw.Flush(s.writerOpts.StorageLimitInBytes) +} diff --git a/x-pack/apm-server/sampling/processor_bench_test.go b/x-pack/apm-server/sampling/processor_bench_test.go index 35f88c81bf9..10ec2b3feb8 100644 --- a/x-pack/apm-server/sampling/processor_bench_test.go +++ b/x-pack/apm-server/sampling/processor_bench_test.go @@ -23,7 +23,7 @@ func BenchmarkProcess(b *testing.B) { processor, err := sampling.NewProcessor(newTempdirConfig(b)) require.NoError(b, err) go processor.Run() - defer processor.Stop(context.Background()) + b.Cleanup(func() { processor.Stop(context.Background()) }) b.RunParallel(func(pb *testing.PB) { var seed int64 diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index f8eb70fe394..3d7d8882d3b 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -60,18 +60,29 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // subsequent events in the trace will be reported immediately. trace1 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f10"} trace2 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f11"} - storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}, time.Minute, 0) + storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}) writer := storage.NewReadWriter() - assert.NoError(t, writer.WriteTraceSampled(trace1.ID, true)) - assert.NoError(t, writer.Flush()) + wOpts := eventstorage.WriterOpts{ + TTL: time.Minute, + StorageLimitInBytes: 0, + } + assert.NoError(t, writer.WriteTraceSampled(trace1.ID, true, wOpts)) + assert.NoError(t, writer.Flush(wOpts.StorageLimitInBytes)) writer.Close() - storage = eventstorage.New(config.DB, eventstorage.JSONCodec{}, -1, 0) // expire immediately + wOpts.TTL = -1 // expire immediately + storage = eventstorage.New(config.DB, eventstorage.JSONCodec{}) writer = storage.NewReadWriter() - assert.NoError(t, writer.WriteTraceSampled(trace2.ID, true)) - assert.NoError(t, writer.Flush()) + assert.NoError(t, writer.WriteTraceSampled(trace2.ID, true, wOpts)) + assert.NoError(t, writer.Flush(wOpts.StorageLimitInBytes)) writer.Close() + // Badger transactions created globally before committing the above writes + // will not see them due to SSI (Serializable Snapshot Isolation). Flush + // the storage so that new transactions are created for the underlying + // writer shards that can list all the events committed so far. + require.NoError(t, config.Storage.Flush(0)) + processor, err := sampling.NewProcessor(config) require.NoError(t, err) go processor.Run() @@ -126,8 +137,9 @@ func TestProcessAlreadyTailSampled(t *testing.T) { expectedMonitoring.Ints["sampling.events.failed_writes"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) - // Stop the processor so we can access the database. + // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) + assert.NoError(t, config.Storage.Flush(0)) reader := storage.NewReadWriter() defer reader.Close() @@ -232,9 +244,10 @@ func TestProcessLocalTailSampling(t *testing.T) { expectedMonitoring.Ints["sampling.events.failed_writes"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) - // Stop the processor so we can access the database. + // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) - storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}, time.Minute, 0) + assert.NoError(t, config.Storage.Flush(0)) + storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}) reader := storage.NewReadWriter() defer reader.Close() @@ -289,7 +302,8 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { // Stop the processor so we can access the database. assert.NoError(t, processor.Stop(context.Background())) - storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}, time.Minute, 0) + assert.NoError(t, config.Storage.Flush(0)) + storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}) reader := storage.NewReadWriter() defer reader.Close() @@ -300,9 +314,10 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { // No sampling decision made yet. } else { assert.NoError(t, err) - assert.False(t, sampled) - anyUnsampled = true - break + if !sampled { + anyUnsampled = true + break + } } } assert.True(t, anyUnsampled) @@ -439,8 +454,9 @@ func TestProcessRemoteTailSampling(t *testing.T) { case <-time.After(50 * time.Millisecond): } - // Stop the processor so we can access the database. + // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) + assert.NoError(t, config.Storage.Flush(0)) assert.Empty(t, published) // remote decisions don't get republished expectedMonitoring := monitoring.MakeFlatSnapshot() @@ -454,7 +470,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { assert.Equal(t, trace1Events, events) - storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}, time.Minute, 0) + storage := eventstorage.New(config.DB, eventstorage.JSONCodec{}) reader := storage.NewReadWriter() defer reader.Close() @@ -563,6 +579,10 @@ func TestStorageGC(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { badgerDB.Close() }) config.DB = badgerDB + config.Storage = eventstorage. + New(config.DB, eventstorage.JSONCodec{}). + NewShardedReadWriter() + t.Cleanup(func() { config.Storage.Close() }) writeBatch := func(n int) { config.StorageGCInterval = time.Minute // effectively disable @@ -660,7 +680,9 @@ func TestStorageLimit(t *testing.T) { // Write 5K span events and close the DB to persist to disk the storage // size and assert that none are reported immediately. writeBatch(5000, config, func(b model.Batch) { assert.Empty(t, b) }) - config.DB.Close() + assert.NoError(t, config.Storage.Flush(0)) + config.Storage.Close() + assert.NoError(t, config.DB.Close()) // Open a new instance of the badgerDB and check the size. var err error @@ -712,6 +734,47 @@ func TestProcessRemoteTailSamplingPersistence(t *testing.T) { assert.Equal(t, `{"index_name":1}`, string(data)) } +func TestGracefulShutdown(t *testing.T) { + config := newTempdirConfig(t) + sampleRate := 0.5 + config.Policies = []sampling.Policy{{SampleRate: sampleRate}} + config.FlushInterval = time.Minute // disable finalize + + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + + totalTraces := 100 + traceIDGen := func(i int) string { return fmt.Sprintf("trace%d", i) } + + var batch model.Batch + for i := 0; i < totalTraces; i++ { + batch = append(batch, model.APMEvent{ + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: traceIDGen(i)}, + Transaction: &model.Transaction{ + ID: fmt.Sprintf("tx%d", i), + Sampled: true, + }, + }) + } + + assert.NoError(t, processor.ProcessBatch(context.Background(), &batch)) + assert.Empty(t, batch) + assert.NoError(t, processor.Stop(context.Background())) + assert.NoError(t, config.Storage.Flush(0)) + + reader := eventstorage.New(config.DB, eventstorage.JSONCodec{}).NewReadWriter() + + var count int + for i := 0; i < totalTraces; i++ { + if ok, _ := reader.IsTraceSampled(traceIDGen(i)); ok { + count++ + } + } + assert.Equal(t, int(sampleRate*float64(totalTraces)), count) +} + func newTempdirConfig(tb testing.TB) sampling.Config { tempdir, err := os.MkdirTemp("", "samplingtest") require.NoError(tb, err) @@ -721,6 +784,10 @@ func newTempdirConfig(tb testing.TB) sampling.Config { require.NoError(tb, err) tb.Cleanup(func() { badgerDB.Close() }) + eventCodec := eventstorage.JSONCodec{} + storage := eventstorage.New(badgerDB, eventCodec).NewShardedReadWriter() + tb.Cleanup(func() { storage.Close() }) + return sampling.Config{ BeatID: "local-apm-server", BatchProcessor: model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil }), @@ -742,6 +809,7 @@ func newTempdirConfig(tb testing.TB) sampling.Config { }, StorageConfig: sampling.StorageConfig{ DB: badgerDB, + Storage: storage, StorageDir: tempdir, StorageGCInterval: time.Second, TTL: 30 * time.Minute,