Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] TBS: make storage_limit follow processor lifecycle; update TBS processor config (backport #15488) #15491

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(),
StorageLimit: tailSamplingConfig.StorageLimitParsed,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
6 changes: 3 additions & 3 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ type StorageConfig struct {
// Storage is the read writer to DB.
Storage eventstorage.RW

// StorageLimit for the TBS database, in bytes.
StorageLimit uint64

// TTL holds the amount of time before events and sampling decisions
// are expired from local storage.
TTL time.Duration
Expand Down Expand Up @@ -238,6 +235,9 @@ func (config StorageConfig) validate() error {
if config.DB == nil {
return errors.New("DB unspecified")
}
if config.Storage == nil {
return errors.New("Storage unspecified")
}
if config.TTL <= 0 {
return errors.New("TTL unspecified or negative")
}
Expand Down
3 changes: 3 additions & 0 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
assertInvalidConfigError("invalid storage config: DB unspecified")
config.DB = &eventstorage.StorageManager{}

assertInvalidConfigError("invalid storage config: Storage unspecified")
config.Storage = &eventstorage.SplitReadWriter{}

assertInvalidConfigError("invalid storage config: TTL unspecified or negative")
config.TTL = 1
}
25 changes: 23 additions & 2 deletions x-pack/apm-server/sampling/eventstorage/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,36 @@ type storageLimitChecker interface {
StorageLimit() uint64
}

type storageLimitCheckerFunc struct {
diskUsage, storageLimit func() uint64
}

func NewStorageLimitCheckerFunc(diskUsage, storageLimit func() uint64) storageLimitCheckerFunc {
return storageLimitCheckerFunc{
diskUsage: diskUsage,
storageLimit: storageLimit,
}
}

func (f storageLimitCheckerFunc) DiskUsage() uint64 {
return f.diskUsage()
}

func (f storageLimitCheckerFunc) StorageLimit() uint64 {
return f.storageLimit()
}

// StorageLimitReadWriter is a RW that forbids Write* method calls based on disk usage and limit from storageLimitChecker.
// If there is no limit or limit is not reached, method calls are passed through to nextRW.
type StorageLimitReadWriter struct {
name string
checker storageLimitChecker
nextRW RW
}

func NewStorageLimitReadWriter(checker storageLimitChecker, nextRW RW) StorageLimitReadWriter {
func NewStorageLimitReadWriter(name string, checker storageLimitChecker, nextRW RW) StorageLimitReadWriter {
return StorageLimitReadWriter{
name: name,
checker: checker,
nextRW: nextRW,
}
Expand All @@ -81,7 +102,7 @@ func (s StorageLimitReadWriter) checkStorageLimit() error {
if limit != 0 { // unlimited storage
usage := s.checker.DiskUsage()
if usage >= limit {
return fmt.Errorf("%w (current: %d, limit %d)", ErrLimitReached, usage, limit)
return fmt.Errorf("%s: %w (current: %d, limit %d)", s.name, ErrLimitReached, usage, limit)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/sampling/eventstorage/rw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestStorageLimitReadWriter(t *testing.T) {
t.Run(fmt.Sprintf("limit=%d,usage=%d", tt.limit, tt.usage), func(t *testing.T) {
checker := mockChecker{limit: tt.limit, usage: tt.usage}
var callCount int
rw := eventstorage.NewStorageLimitReadWriter(checker, mockRW{
rw := eventstorage.NewStorageLimitReadWriter("foo_storage_limiter", checker, mockRW{
callback: func() {
callCount++
},
Expand Down
10 changes: 5 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func BenchmarkWriteTransaction(b *testing.B) {
test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) {
for _, count := range counts {
b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

for i := 0; i < count; i++ {
transactionID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
for _, hit := range []bool{false, true} {
b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) {
sm := newStorageManager(b)
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

traceIDs := make([]string, b.N)

Expand Down Expand Up @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
}
}

readWriter = sm.NewReadWriter()
readWriter = sm.NewUnlimitedReadWriter()

b.ResetTimer()
var batch modelpb.Batch
Expand Down Expand Up @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) {

// Test with varying numbers of events in the trace.
sm := newStorageManager(b)
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil {
b.Fatal(err)
Expand Down
54 changes: 34 additions & 20 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ type StorageManager struct {

partitioner *Partitioner

storageLimit atomic.Uint64

codec Codec

// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// cachedDiskUsage is a cached result of DiskUsage
cachedDiskUsage atomic.Uint64
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
Expand Down Expand Up @@ -202,17 +200,17 @@ func (sm *StorageManager) Size() (lsm, vlog int64) {
// Also remember to update
// - x-pack/apm-server/sampling/processor.go:CollectMonitoring
// - systemtest/benchtest/expvar/metrics.go
return int64(sm.DiskUsage()), 0
return int64(sm.dbSize()), 0
}

// DiskUsage returns the disk usage of databases in bytes.
func (sm *StorageManager) DiskUsage() uint64 {
// dbSize returns the disk usage of databases in bytes.
func (sm *StorageManager) dbSize() uint64 {
// pebble DiskSpaceUsage overhead is not high, but it adds up when performed per-event.
return sm.cachedDiskUsage.Load()
return sm.cachedDBSize.Load()
}

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDiskUsage.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand All @@ -229,10 +227,6 @@ func (sm *StorageManager) runDiskUsageLoop(stopping <-chan struct{}) error {
}
}

func (sm *StorageManager) StorageLimit() uint64 {
return sm.storageLimit.Load()
}

func (sm *StorageManager) Flush() error {
return errors.Join(
wrapNonNilErr("event db flush error: %w", sm.eventDB.Flush()),
Expand Down Expand Up @@ -260,7 +254,7 @@ func (sm *StorageManager) close() error {

// Reload flushes out pending disk writes to disk by reloading the database.
// For testing only.
// Read writers created prior to Reload cannot be used and will need to be recreated via NewReadWriter.
// Read writers created prior to Reload cannot be used and will need to be recreated via NewUnlimitedReadWriter.
func (sm *StorageManager) Reload() error {
if err := sm.close(); err != nil {
return err
Expand All @@ -269,7 +263,7 @@ func (sm *StorageManager) Reload() error {
}

// Run has the same lifecycle as the TBS processor as opposed to StorageManager to facilitate EA hot reload.
func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, storageLimit uint64) error {
func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration) error {
select {
case <-stopping:
return nil
Expand All @@ -279,8 +273,6 @@ func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, stora
<-sm.runCh
}()

sm.storageLimit.Store(storageLimit)

g := errgroup.Group{}
g.Go(func() error {
return sm.runTTLGCLoop(stopping, ttl)
Expand Down Expand Up @@ -346,11 +338,33 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644)
}

func (sm *StorageManager) NewReadWriter() StorageLimitReadWriter {
return NewStorageLimitReadWriter(sm, SplitReadWriter{
// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
})
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)

return dbStorageLimitRW
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/stretchr/testify/require"
)

func BenchmarkStorageManager_DiskUsage(b *testing.B) {
func BenchmarkStorageManager_Size(b *testing.B) {
stopping := make(chan struct{})
defer close(stopping)
sm := newStorageManager(b)
go sm.Run(stopping, time.Second, 0)
rw := sm.NewReadWriter()
go sm.Run(stopping, time.Second)
rw := sm.NewUnlimitedReadWriter()
for i := 0; i < 1000; i++ {
traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
Expand All @@ -29,7 +29,7 @@ func BenchmarkStorageManager_DiskUsage(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = sm.DiskUsage()
_, _ = sm.Size()
}
b.StopTimer()
}
57 changes: 45 additions & 12 deletions x-pack/apm-server/sampling/eventstorage/storage_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newStorageManagerNoCleanup(tb testing.TB, path string, opts ...eventstorage

func TestStorageManager_samplingDecisionTTL(t *testing.T) {
sm := newStorageManager(t)
rw := sm.NewReadWriter()
rw := sm.NewUnlimitedReadWriter()
traceID := uuid.Must(uuid.NewV4()).String()
err := rw.WriteTraceSampled(traceID, true)
assert.NoError(t, err)
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestStorageManager_samplingDecisionTTL(t *testing.T) {

func TestStorageManager_eventTTL(t *testing.T) {
sm := newStorageManager(t)
rw := sm.NewReadWriter()
rw := sm.NewUnlimitedReadWriter()
traceID := uuid.Must(uuid.NewV4()).String()
txnID1 := uuid.Must(uuid.NewV4()).String()
txn1 := makeTransaction(txnID1, traceID)
Expand Down Expand Up @@ -119,15 +119,15 @@ func TestStorageManager_partitionID(t *testing.T) {
assert.NoError(t, sm.RotatePartitions())

// write to partition 1
err := sm.NewReadWriter().WriteTraceSampled(traceID, true)
err := sm.NewUnlimitedReadWriter().WriteTraceSampled(traceID, true)
assert.NoError(t, err)

assert.NoError(t, sm.Close())

// it should read directly from partition 1 on startup instead of 0
sm = newStorageManagerNoCleanup(t, tmpDir)
defer sm.Close()
sampled, err := sm.NewReadWriter().IsTraceSampled(traceID)
sampled, err := sm.NewUnlimitedReadWriter().IsTraceSampled(traceID)
assert.NoError(t, err)
assert.True(t, sampled)
}
Expand All @@ -136,29 +136,36 @@ func TestStorageManager_DiskUsage(t *testing.T) {
stopping := make(chan struct{})
defer close(stopping)
sm := newStorageManager(t)
go sm.Run(stopping, time.Second, 0)
old := sm.DiskUsage()
go sm.Run(stopping, time.Second)

err := sm.NewReadWriter().WriteTraceSampled("foo", true)
lsm, vlog := sm.Size()
oldSize := lsm + vlog

err := sm.NewUnlimitedReadWriter().WriteTraceSampled("foo", true)
require.NoError(t, err)

err = sm.Flush()
require.NoError(t, err)

assert.Eventually(t, func() bool {
return sm.DiskUsage() > old
lsm, vlog := sm.Size()
newSize := lsm + vlog
return newSize > oldSize
}, 10*time.Second, 100*time.Millisecond)

old = sm.DiskUsage()
lsm, vlog = sm.Size()
oldSize = lsm + vlog

err = sm.NewReadWriter().WriteTraceEvent("foo", "bar", makeTransaction("bar", "foo"))
err = sm.NewUnlimitedReadWriter().WriteTraceEvent("foo", "bar", makeTransaction("bar", "foo"))
require.NoError(t, err)

err = sm.Flush()
require.NoError(t, err)

assert.Eventually(t, func() bool {
return sm.DiskUsage() > old
lsm, vlog := sm.Size()
newSize := lsm + vlog
return newSize > oldSize
}, 10*time.Second, 100*time.Millisecond)
}

Expand All @@ -167,9 +174,35 @@ func TestStorageManager_Run(t *testing.T) {
stopping := make(chan struct{})
sm := newStorageManager(t)
go func() {
assert.NoError(t, sm.Run(stopping, time.Second, 0))
assert.NoError(t, sm.Run(stopping, time.Second))
close(done)
}()
close(stopping)
<-done
}

func TestStorageManager_StorageLimit(t *testing.T) {
done := make(chan struct{})
stopping := make(chan struct{})
sm := newStorageManager(t)
go func() {
assert.NoError(t, sm.Run(stopping, time.Second))
close(done)
}()
require.NoError(t, sm.Flush())
lsm, _ := sm.Size()
assert.Greater(t, lsm, int64(1))

traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
txn := makeTransaction(txnID, traceID)

small := sm.NewReadWriter(1)
assert.ErrorIs(t, small.WriteTraceEvent(traceID, txnID, txn), eventstorage.ErrLimitReached)

big := sm.NewReadWriter(10 << 10)
assert.NoError(t, big.WriteTraceEvent(traceID, txnID, txn))

close(stopping)
<-done
}
Loading
Loading