Skip to content

Commit

Permalink
TBS: make storage_limit follow processor lifecycle; update TBS proces…
Browse files Browse the repository at this point in the history
…sor config (#15488) (#15491)

Fix a regression from #15235 where storage_limit does not follow processor lifecycle.
Remove storage limit from processor config.
Add storage to processor config validation.

(cherry picked from commit dcb08ac)

Co-authored-by: Carson Ip <[email protected]>
  • Loading branch information
mergify[bot] and carsonip authored Jan 30, 2025
1 parent a18acb9 commit 8077005
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 64 deletions.
3 changes: 1 addition & 2 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(),
StorageLimit: tailSamplingConfig.StorageLimitParsed,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
6 changes: 3 additions & 3 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ type StorageConfig struct {
// Storage is the read writer to DB.
Storage eventstorage.RW

// StorageLimit for the TBS database, in bytes.
StorageLimit uint64

// TTL holds the amount of time before events and sampling decisions
// are expired from local storage.
TTL time.Duration
Expand Down Expand Up @@ -238,6 +235,9 @@ func (config StorageConfig) validate() error {
if config.DB == nil {
return errors.New("DB unspecified")
}
if config.Storage == nil {
return errors.New("Storage unspecified")
}
if config.TTL <= 0 {
return errors.New("TTL unspecified or negative")
}
Expand Down
3 changes: 3 additions & 0 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
assertInvalidConfigError("invalid storage config: DB unspecified")
config.DB = &eventstorage.StorageManager{}

assertInvalidConfigError("invalid storage config: Storage unspecified")
config.Storage = &eventstorage.SplitReadWriter{}

assertInvalidConfigError("invalid storage config: TTL unspecified or negative")
config.TTL = 1
}
25 changes: 23 additions & 2 deletions x-pack/apm-server/sampling/eventstorage/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,36 @@ type storageLimitChecker interface {
StorageLimit() uint64
}

type storageLimitCheckerFunc struct {
diskUsage, storageLimit func() uint64
}

func NewStorageLimitCheckerFunc(diskUsage, storageLimit func() uint64) storageLimitCheckerFunc {
return storageLimitCheckerFunc{
diskUsage: diskUsage,
storageLimit: storageLimit,
}
}

func (f storageLimitCheckerFunc) DiskUsage() uint64 {
return f.diskUsage()
}

func (f storageLimitCheckerFunc) StorageLimit() uint64 {
return f.storageLimit()
}

// StorageLimitReadWriter is a RW that forbids Write* method calls based on disk usage and limit from storageLimitChecker.
// If there is no limit or limit is not reached, method calls are passed through to nextRW.
type StorageLimitReadWriter struct {
name string
checker storageLimitChecker
nextRW RW
}

func NewStorageLimitReadWriter(checker storageLimitChecker, nextRW RW) StorageLimitReadWriter {
func NewStorageLimitReadWriter(name string, checker storageLimitChecker, nextRW RW) StorageLimitReadWriter {
return StorageLimitReadWriter{
name: name,
checker: checker,
nextRW: nextRW,
}
Expand All @@ -81,7 +102,7 @@ func (s StorageLimitReadWriter) checkStorageLimit() error {
if limit != 0 { // unlimited storage
usage := s.checker.DiskUsage()
if usage >= limit {
return fmt.Errorf("%w (current: %d, limit %d)", ErrLimitReached, usage, limit)
return fmt.Errorf("%s: %w (current: %d, limit %d)", s.name, ErrLimitReached, usage, limit)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/sampling/eventstorage/rw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestStorageLimitReadWriter(t *testing.T) {
t.Run(fmt.Sprintf("limit=%d,usage=%d", tt.limit, tt.usage), func(t *testing.T) {
checker := mockChecker{limit: tt.limit, usage: tt.usage}
var callCount int
rw := eventstorage.NewStorageLimitReadWriter(checker, mockRW{
rw := eventstorage.NewStorageLimitReadWriter("foo_storage_limiter", checker, mockRW{
callback: func() {
callCount++
},
Expand Down
10 changes: 5 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func BenchmarkWriteTransaction(b *testing.B) {
test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) {
for _, count := range counts {
b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

for i := 0; i < count; i++ {
transactionID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
for _, hit := range []bool{false, true} {
b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) {
sm := newStorageManager(b)
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

traceIDs := make([]string, b.N)

Expand Down Expand Up @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
}
}

readWriter = sm.NewReadWriter()
readWriter = sm.NewUnlimitedReadWriter()

b.ResetTimer()
var batch modelpb.Batch
Expand Down Expand Up @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) {

// Test with varying numbers of events in the trace.
sm := newStorageManager(b)
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil {
b.Fatal(err)
Expand Down
54 changes: 34 additions & 20 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ type StorageManager struct {

partitioner *Partitioner

storageLimit atomic.Uint64

codec Codec

// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// cachedDiskUsage is a cached result of DiskUsage
cachedDiskUsage atomic.Uint64
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
Expand Down Expand Up @@ -202,17 +200,17 @@ func (sm *StorageManager) Size() (lsm, vlog int64) {
// Also remember to update
// - x-pack/apm-server/sampling/processor.go:CollectMonitoring
// - systemtest/benchtest/expvar/metrics.go
return int64(sm.DiskUsage()), 0
return int64(sm.dbSize()), 0
}

// DiskUsage returns the disk usage of databases in bytes.
func (sm *StorageManager) DiskUsage() uint64 {
// dbSize returns the disk usage of databases in bytes.
func (sm *StorageManager) dbSize() uint64 {
// pebble DiskSpaceUsage overhead is not high, but it adds up when performed per-event.
return sm.cachedDiskUsage.Load()
return sm.cachedDBSize.Load()
}

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDiskUsage.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand All @@ -229,10 +227,6 @@ func (sm *StorageManager) runDiskUsageLoop(stopping <-chan struct{}) error {
}
}

func (sm *StorageManager) StorageLimit() uint64 {
return sm.storageLimit.Load()
}

func (sm *StorageManager) Flush() error {
return errors.Join(
wrapNonNilErr("event db flush error: %w", sm.eventDB.Flush()),
Expand Down Expand Up @@ -260,7 +254,7 @@ func (sm *StorageManager) close() error {

// Reload flushes out pending disk writes to disk by reloading the database.
// For testing only.
// Read writers created prior to Reload cannot be used and will need to be recreated via NewReadWriter.
// Read writers created prior to Reload cannot be used and will need to be recreated via NewUnlimitedReadWriter.
func (sm *StorageManager) Reload() error {
if err := sm.close(); err != nil {
return err
Expand All @@ -269,7 +263,7 @@ func (sm *StorageManager) Reload() error {
}

// Run has the same lifecycle as the TBS processor as opposed to StorageManager to facilitate EA hot reload.
func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, storageLimit uint64) error {
func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration) error {
select {
case <-stopping:
return nil
Expand All @@ -279,8 +273,6 @@ func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, stora
<-sm.runCh
}()

sm.storageLimit.Store(storageLimit)

g := errgroup.Group{}
g.Go(func() error {
return sm.runTTLGCLoop(stopping, ttl)
Expand Down Expand Up @@ -346,11 +338,33 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644)
}

func (sm *StorageManager) NewReadWriter() StorageLimitReadWriter {
return NewStorageLimitReadWriter(sm, SplitReadWriter{
// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
})
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)

return dbStorageLimitRW
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/stretchr/testify/require"
)

func BenchmarkStorageManager_DiskUsage(b *testing.B) {
func BenchmarkStorageManager_Size(b *testing.B) {
stopping := make(chan struct{})
defer close(stopping)
sm := newStorageManager(b)
go sm.Run(stopping, time.Second, 0)
rw := sm.NewReadWriter()
go sm.Run(stopping, time.Second)
rw := sm.NewUnlimitedReadWriter()
for i := 0; i < 1000; i++ {
traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
Expand All @@ -29,7 +29,7 @@ func BenchmarkStorageManager_DiskUsage(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = sm.DiskUsage()
_, _ = sm.Size()
}
b.StopTimer()
}
57 changes: 45 additions & 12 deletions x-pack/apm-server/sampling/eventstorage/storage_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newStorageManagerNoCleanup(tb testing.TB, path string, opts ...eventstorage

func TestStorageManager_samplingDecisionTTL(t *testing.T) {
sm := newStorageManager(t)
rw := sm.NewReadWriter()
rw := sm.NewUnlimitedReadWriter()
traceID := uuid.Must(uuid.NewV4()).String()
err := rw.WriteTraceSampled(traceID, true)
assert.NoError(t, err)
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestStorageManager_samplingDecisionTTL(t *testing.T) {

func TestStorageManager_eventTTL(t *testing.T) {
sm := newStorageManager(t)
rw := sm.NewReadWriter()
rw := sm.NewUnlimitedReadWriter()
traceID := uuid.Must(uuid.NewV4()).String()
txnID1 := uuid.Must(uuid.NewV4()).String()
txn1 := makeTransaction(txnID1, traceID)
Expand Down Expand Up @@ -119,15 +119,15 @@ func TestStorageManager_partitionID(t *testing.T) {
assert.NoError(t, sm.RotatePartitions())

// write to partition 1
err := sm.NewReadWriter().WriteTraceSampled(traceID, true)
err := sm.NewUnlimitedReadWriter().WriteTraceSampled(traceID, true)
assert.NoError(t, err)

assert.NoError(t, sm.Close())

// it should read directly from partition 1 on startup instead of 0
sm = newStorageManagerNoCleanup(t, tmpDir)
defer sm.Close()
sampled, err := sm.NewReadWriter().IsTraceSampled(traceID)
sampled, err := sm.NewUnlimitedReadWriter().IsTraceSampled(traceID)
assert.NoError(t, err)
assert.True(t, sampled)
}
Expand All @@ -136,29 +136,36 @@ func TestStorageManager_DiskUsage(t *testing.T) {
stopping := make(chan struct{})
defer close(stopping)
sm := newStorageManager(t)
go sm.Run(stopping, time.Second, 0)
old := sm.DiskUsage()
go sm.Run(stopping, time.Second)

err := sm.NewReadWriter().WriteTraceSampled("foo", true)
lsm, vlog := sm.Size()
oldSize := lsm + vlog

err := sm.NewUnlimitedReadWriter().WriteTraceSampled("foo", true)
require.NoError(t, err)

err = sm.Flush()
require.NoError(t, err)

assert.Eventually(t, func() bool {
return sm.DiskUsage() > old
lsm, vlog := sm.Size()
newSize := lsm + vlog
return newSize > oldSize
}, 10*time.Second, 100*time.Millisecond)

old = sm.DiskUsage()
lsm, vlog = sm.Size()
oldSize = lsm + vlog

err = sm.NewReadWriter().WriteTraceEvent("foo", "bar", makeTransaction("bar", "foo"))
err = sm.NewUnlimitedReadWriter().WriteTraceEvent("foo", "bar", makeTransaction("bar", "foo"))
require.NoError(t, err)

err = sm.Flush()
require.NoError(t, err)

assert.Eventually(t, func() bool {
return sm.DiskUsage() > old
lsm, vlog := sm.Size()
newSize := lsm + vlog
return newSize > oldSize
}, 10*time.Second, 100*time.Millisecond)
}

Expand All @@ -167,9 +174,35 @@ func TestStorageManager_Run(t *testing.T) {
stopping := make(chan struct{})
sm := newStorageManager(t)
go func() {
assert.NoError(t, sm.Run(stopping, time.Second, 0))
assert.NoError(t, sm.Run(stopping, time.Second))
close(done)
}()
close(stopping)
<-done
}

func TestStorageManager_StorageLimit(t *testing.T) {
done := make(chan struct{})
stopping := make(chan struct{})
sm := newStorageManager(t)
go func() {
assert.NoError(t, sm.Run(stopping, time.Second))
close(done)
}()
require.NoError(t, sm.Flush())
lsm, _ := sm.Size()
assert.Greater(t, lsm, int64(1))

traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
txn := makeTransaction(txnID, traceID)

small := sm.NewReadWriter(1)
assert.ErrorIs(t, small.WriteTraceEvent(traceID, txnID, txn), eventstorage.ErrLimitReached)

big := sm.NewReadWriter(10 << 10)
assert.NoError(t, big.WriteTraceEvent(traceID, txnID, txn))

close(stopping)
<-done
}
Loading

0 comments on commit 8077005

Please sign in to comment.