Skip to content

Commit

Permalink
[8.4] Fix event loss during reload of TBS processor (backport #8809) (#…
Browse files Browse the repository at this point in the history
…8877)

* Fix event loss during reload of TBS processor (#8809)

* Maintain TBS read writers outside processor lifecycle

Configuration reload logic for TBS processor may lead
to the existence of multiple processors at the same time
which introduces race between the two processors for
flush of a shard (traceID). Additionally, badger uses
serializable snapshot isolation which might cause a
newly created read-write shard (based on traceID) to
not see data committed by older shard for the same
traceID.

* Finalize traces before stopping the TBS processor

(cherry picked from commit 269216e)

# Conflicts:
#	changelogs/head.asciidoc

* Fix conflicts

Co-authored-by: Vishal Raj <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mergify[bot] and lahsivjar authored Aug 25, 2022
1 parent e2a604f commit 53f4eab
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 160 deletions.
34 changes: 33 additions & 1 deletion x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ var (
// badgerDB holds the badger database to use when tail-based sampling is configured.
badgerMu sync.Mutex
badgerDB *badger.DB

storageMu sync.Mutex
storage *eventstorage.ShardedReadWriter
)

type namedProcessor struct {
Expand Down Expand Up @@ -108,6 +111,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
if err != nil {
return nil, errors.Wrap(err, "failed to get Badger database")
}
readWriters := getStorage(badgerDB)

policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
Expand All @@ -121,6 +125,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
SampleRate: in.SampleRate,
}
}

return sampling.NewProcessor(sampling.Config{
BeatID: args.Info.ID.String(),
BatchProcessor: args.BatchProcessor,
Expand All @@ -141,6 +146,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: badgerDB,
Storage: readWriters,
StorageDir: storageDir,
StorageGCInterval: tailSamplingConfig.StorageGCInterval,
StorageLimit: tailSamplingConfig.StorageLimitParsed,
Expand All @@ -162,6 +168,16 @@ func getBadgerDB(storageDir string) (*badger.DB, error) {
return badgerDB, nil
}

func getStorage(db *badger.DB) *eventstorage.ShardedReadWriter {
storageMu.Lock()
defer storageMu.Unlock()
if storage == nil {
eventCodec := eventstorage.JSONCodec{}
storage = eventstorage.New(db, eventCodec).NewShardedReadWriter()
}
return storage
}

// runServerWithProcessors runs the APM Server and the given list of processors.
//
// newProcessors returns a list of processors which will process events in
Expand Down Expand Up @@ -230,14 +246,30 @@ func closeBadger() error {
return nil
}

func closeStorage() {
if storage != nil {
storage.Close()
}
}

func cleanup() (result error) {
// Close the underlying storage, the storage will be flushed on processor stop.
closeStorage()

if err := closeBadger(); err != nil {
result = multierror.Append(result, err)
}
return result
}

func Main() error {
rootCmd := newXPackRootCommand(
beater.NewCreator(beater.CreatorParams{
WrapRunServer: wrapRunServer,
}),
)
result := rootCmd.Execute()
if err := closeBadger(); err != nil {
if err := cleanup(); err != nil {
result = multierror.Append(result, err)
}
return result
Expand Down
10 changes: 10 additions & 0 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/model"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
)

Expand Down Expand Up @@ -94,6 +95,12 @@ type StorageConfig struct {
// DB will not be closed when the processor is closed.
DB *badger.DB

// Storage holds the read writers which provide sharded, locked access to storage.
//
// Storage lives outside processor lifecycle and will not be closed when processor
// is closed
Storage *eventstorage.ShardedReadWriter

// StorageDir holds the directory in which event storage will be maintained.
StorageDir string

Expand Down Expand Up @@ -225,6 +232,9 @@ func (config StorageConfig) validate() error {
if config.DB == nil {
return errors.New("DB unspecified")
}
if config.Storage == nil {
return errors.New("Storage unspecified")
}
if config.StorageDir == "" {
return errors.New("StorageDir unspecified")
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/model"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
)

func TestNewProcessorConfigInvalid(t *testing.T) {
Expand Down Expand Up @@ -75,6 +76,9 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
assertInvalidConfigError("invalid storage config: DB unspecified")
config.DB = &badger.DB{}

assertInvalidConfigError("invalid storage config: Storage unspecified")
config.Storage = &eventstorage.ShardedReadWriter{}

assertInvalidConfigError("invalid storage config: StorageDir unspecified")
config.StorageDir = "tbs"

Expand Down
24 changes: 12 additions & 12 deletions x-pack/apm-server/sampling/eventstorage/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func (s *ShardedReadWriter) Close() {
}

// Flush flushes all sharded storage readWriters.
func (s *ShardedReadWriter) Flush() error {
func (s *ShardedReadWriter) Flush(limit int64) error {
var result error
for i := range s.readWriters {
if err := s.readWriters[i].Flush(); err != nil {
if err := s.readWriters[i].Flush(limit); err != nil {
result = multierror.Append(result, err)
}
}
Expand All @@ -57,13 +57,13 @@ func (s *ShardedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) er
}

// WriteTraceEvent calls Writer.WriteTraceEvent, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error {
return s.getWriter(traceID).WriteTraceEvent(traceID, id, event)
func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent, opts WriterOpts) error {
return s.getWriter(traceID).WriteTraceEvent(traceID, id, event, opts)
}

// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
return s.getWriter(traceID).WriteTraceSampled(traceID, sampled)
func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
return s.getWriter(traceID).WriteTraceSampled(traceID, sampled, opts)
}

// IsTraceSampled calls Writer.IsTraceSampled, using a sharded, locked, Writer.
Expand Down Expand Up @@ -98,10 +98,10 @@ func (rw *lockedReadWriter) Close() {
rw.rw.Close()
}

func (rw *lockedReadWriter) Flush() error {
func (rw *lockedReadWriter) Flush(limit int64) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.Flush()
return rw.rw.Flush(limit)
}

func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
Expand All @@ -110,16 +110,16 @@ func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) er
return rw.rw.ReadTraceEvents(traceID, out)
}

func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error {
func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent, opts WriterOpts) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteTraceEvent(traceID, id, event)
return rw.rw.WriteTraceEvent(traceID, id, event, opts)
}

func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteTraceSampled(traceID, sampled)
return rw.rw.WriteTraceSampled(traceID, sampled, opts)
}

func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) {
Expand Down
18 changes: 12 additions & 6 deletions x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ import (

func BenchmarkShardedWriteTransactionUncontended(b *testing.B) {
db := newBadgerDB(b, badgerOptions)
ttl := time.Minute
store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0)
store := eventstorage.New(db, eventstorage.JSONCodec{})
sharded := store.NewShardedReadWriter()
defer sharded.Close()
wOpts := eventstorage.WriterOpts{
TTL: time.Minute,
StorageLimitInBytes: 0,
}

b.RunParallel(func(pb *testing.PB) {
traceID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{ID: traceID},
}
for pb.Next() {
if err := sharded.WriteTraceEvent(traceID, traceID, transaction); err != nil {
if err := sharded.WriteTraceEvent(traceID, traceID, transaction, wOpts); err != nil {
b.Fatal(err)
}
}
Expand All @@ -36,10 +39,13 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) {

func BenchmarkShardedWriteTransactionContended(b *testing.B) {
db := newBadgerDB(b, badgerOptions)
ttl := time.Minute
store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl, 0)
store := eventstorage.New(db, eventstorage.JSONCodec{})
sharded := store.NewShardedReadWriter()
defer sharded.Close()
wOpts := eventstorage.WriterOpts{
TTL: time.Minute,
StorageLimitInBytes: 0,
}

// Use a single trace ID, causing all events to go through
// the same sharded writer, contending for a single lock.
Expand All @@ -51,7 +57,7 @@ func BenchmarkShardedWriteTransactionContended(b *testing.B) {
Transaction: &model.Transaction{ID: transactionID},
}
for pb.Next() {
if err := sharded.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
if err := sharded.WriteTraceEvent(traceID, transactionID, transaction, wOpts); err != nil {
b.Fatal(err)
}
}
Expand Down
55 changes: 21 additions & 34 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ const (
entryMetaTraceEvent = 'e'
)

const (
storageLimitThreshold = 0.90 // Allow 90% of the quota to be used.
)

var (
// ErrNotFound is returned by by the Storage.IsTraceSampled method,
// for non-existing trace IDs.
Expand All @@ -42,8 +38,6 @@ var (
type Storage struct {
db *badger.DB
codec Codec
ttl time.Duration
limit int64
}

// Codec provides methods for encoding and decoding events.
Expand All @@ -53,17 +47,8 @@ type Codec interface {
}

// New returns a new Storage using db and codec.
//
// Storage entries expire after ttl.
// The amount of storage that can be consumed can be limited by passing in a
// limit value greater than zero. The hard limit on storage is set to 90% of
// the limit to account for delay in the size reporting by badger.
// https://github.com/dgraph-io/badger/blob/82b00f27e3827022082225221ae05c03f0d37620/db.go#L1302-L1319.
func New(db *badger.DB, codec Codec, ttl time.Duration, limit int64) *Storage {
if limit > 1 {
limit = int64(float64(limit) * storageLimitThreshold)
}
return &Storage{db: db, codec: codec, ttl: ttl, limit: limit}
func New(db *badger.DB, codec Codec) *Storage {
return &Storage{db: db, codec: codec}
}

// NewShardedReadWriter returns a new ShardedReadWriter, for sharded
Expand All @@ -86,8 +71,8 @@ func (s *Storage) NewReadWriter() *ReadWriter {
}
}

func (s *Storage) limitReached() bool {
if s.limit == 0 {
func (s *Storage) limitReached(limit int64) bool {
if limit == 0 {
return false
}
// The badger database has an async size reconciliation, with a 1 minute
Expand All @@ -96,7 +81,13 @@ func (s *Storage) limitReached() bool {
// lookup is cheap.
lsm, vlog := s.db.Size()
current := lsm + vlog
return current >= s.limit
return current >= limit
}

// WriterOpts provides configuration options for writes to storage
type WriterOpts struct {
TTL time.Duration
StorageLimitInBytes int64
}

// ReadWriter provides a means of reading events from storage, and batched
Expand Down Expand Up @@ -134,8 +125,8 @@ const flushErrFmt = "flush pending writes: %w"
// may be lost.
// Flush returns ErrLimitReached when the StorageLimiter reports that
// the size of LSM and Vlog files exceeds the configured threshold.
func (rw *ReadWriter) Flush() error {
if rw.s.limitReached() {
func (rw *ReadWriter) Flush(limit int64) error {
if rw.s.limitReached(limit) {
return fmt.Errorf(flushErrFmt, ErrLimitReached)
}
err := rw.txn.Commit()
Expand All @@ -148,14 +139,13 @@ func (rw *ReadWriter) Flush() error {
}

// WriteTraceSampled records the tail-sampling decision for the given trace ID.
func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
key := []byte(traceID)
var meta uint8 = entryMetaTraceUnsampled
if sampled {
meta = entryMetaTraceSampled
}
entry := badger.NewEntry(key[:], nil).WithMeta(meta)
return rw.writeEntry(entry.WithTTL(rw.s.ttl))
return rw.writeEntry(badger.NewEntry(key[:], nil).WithMeta(meta), opts)
}

// IsTraceSampled reports whether traceID belongs to a trace that is sampled
Expand All @@ -177,29 +167,26 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) {
//
// WriteTraceEvent may return before the write is committed to storage.
// Call Flush to ensure the write is committed.
func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent) error {
func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent, opts WriterOpts) error {
key := append(append([]byte(traceID), ':'), id...)
data, err := rw.s.codec.EncodeEvent(event)
if err != nil {
return err
}
return rw.writeEntry(badger.NewEntry(key[:], data).
WithMeta(entryMetaTraceEvent).
WithTTL(rw.s.ttl),
)
return rw.writeEntry(badger.NewEntry(key[:], data).WithMeta(entryMetaTraceEvent), opts)
}

func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
rw.pendingWrites++
err := rw.txn.SetEntry(e)
err := rw.txn.SetEntry(e.WithTTL(opts.TTL))
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
if rw.pendingWrites >= 200 {
if err := rw.Flush(); err != nil {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
}
Expand All @@ -209,7 +196,7 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
if err != badger.ErrTxnTooBig {
return err
}
if err := rw.Flush(); err != nil {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
return rw.txn.SetEntry(e)
Expand Down
Loading

0 comments on commit 53f4eab

Please sign in to comment.