Skip to content

Commit

Permalink
TBS: set default sampling.tail.storage_limit to 0 but limit disk us…
Browse files Browse the repository at this point in the history
…age to 90% (#15467)

This is a breaking change to the default storage_limit to enable a more user-friendly TBS disk usage handling. This new default will automatically scale with a larger disk.

Change sampling.tail.storage_limit default to 0.
While 0 means unlimited local tail-sampling database size,
it now enforces a max 90% disk usage on the disk where the data directory is located.
Any tail sampling writes after this threshold will be rejected,
similar to what happens when tail-sampling database size exceeds a non-0 storage limit.
Setting sampling.tail.storage_limit to non-0 maintains the existing behavior
which limits the tail-sampling database size to sampling.tail.storage_limit
and does not have the new disk usage threshold check.
  • Loading branch information
carsonip authored Jan 31, 2025
1 parent 87b6a29 commit d019277
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 56 deletions.
1 change: 1 addition & 0 deletions changelogs/9.0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/v\...v9.0.0[View commits]

[float]
==== Breaking Changes
- Change `sampling.tail.storage_limit` default to `0`. While `0` means unlimited local tail-sampling database size, it now enforces a max 90% disk usage on the disk where the data directory is located. Any tail sampling writes after this threshold will be rejected, similar to what happens when tail-sampling database size exceeds a non-0 storage limit. Setting `sampling.tail.storage_limit` to non-0 maintains the existing behavior which limits the tail-sampling database size to `sampling.tail.storage_limit` and does not have the new disk usage threshold check. {pull}15467[15467]

[float]
==== Deprecations
Expand Down
18 changes: 18 additions & 0 deletions changelogs/all-breaking-changes.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@
This section describes the breaking changes and deprecations introduced in this release
and previous minor versions.

// tag::90-bc[]
[float]
[[breaking-changes-9.0]]
=== 9.0

The following breaking changes are introduced in APM version 9.0.0:

- Change `sampling.tail.storage_limit` default to `0`.
While `0` means unlimited local tail-sampling database size,
it now enforces a max 90% disk usage on the disk where the data directory is located.
Any tail sampling writes after this threshold will be rejected,
similar to what happens when tail-sampling database size exceeds a non-0 storage limit.
Setting `sampling.tail.storage_limit` to non-0 maintains the existing behavior
which limits the tail-sampling database size to `sampling.tail.storage_limit`
and does not have the new disk usage threshold check.
For more details, see https://github.com/elastic/apm-server/pull/15467[PR #15467]
// end::90-bc[]

// tag::811-bc[]
[float]
[[breaking-changes-8.11]]
Expand Down
17 changes: 10 additions & 7 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ func TestUnpackConfig(t *testing.T) {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
StorageLimit: "3GB",
StorageLimitParsed: 3000000000,
StorageLimit: "0",
StorageLimitParsed: 0,
DiskUsageThreshold: 0.9,
TTL: 30 * time.Minute,
},
},
Expand Down Expand Up @@ -410,11 +411,12 @@ func TestUnpackConfig(t *testing.T) {
},
},
"sampling.tail": map[string]interface{}{
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"disk_usage_threshold": 0.8,
},
"data_streams": map[string]interface{}{
"namespace": "foo",
Expand Down Expand Up @@ -495,6 +497,7 @@ func TestUnpackConfig(t *testing.T) {
IngestRateDecayFactor: 1.0,
StorageLimit: "1GB",
StorageLimitParsed: 1000000000,
DiskUsageThreshold: 0.8,
TTL: 30 * time.Minute,
},
},
Expand Down
18 changes: 14 additions & 4 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,18 @@ type TailSamplingConfig struct {
Interval time.Duration `config:"interval" validate:"min=1s"`
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
TTL time.Duration `config:"ttl" validate:"min=1s"`
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// StorageLimit is the user-configured tail-sampling database size limit.
// 0 means unlimited storage with DiskUsageThreshold check enabled.
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// DiskUsageThreshold controls the proportion of the disk to be filled at max, irrespective of db size.
// e.g. 0.9 means the last 10% of disk should not be written to.
//
// Any non-0 StorageLimit causes DiskUsageThreshold to be ignored.
DiskUsageThreshold float64 `config:"disk_usage_threshold" validate:"min=0, max=1"`

DiscardOnWriteFailure bool `config:"discard_on_write_failure"`

esConfigured bool
Expand Down Expand Up @@ -98,7 +108,6 @@ func (c *TailSamplingConfig) Unpack(in *config.C) error {
cfg.Enabled = in.Enabled()
*c = TailSamplingConfig(cfg)
c.esConfigured = in.HasField("elasticsearch")
c.StorageLimitParsed = limit
err = errors.Wrap(c.Validate(), "invalid config")
return nil
}
Expand Down Expand Up @@ -151,7 +160,8 @@ func defaultTailSamplingConfig() TailSamplingConfig {
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
TTL: 30 * time.Minute,
StorageLimit: "3GB",
StorageLimit: "0",
DiskUsageThreshold: 0.9,
DiscardOnWriteFailure: false,
}
parsed, err := humanize.ParseBytes(cfg.StorageLimit)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed, tailSamplingConfig.DiskUsageThreshold),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
10 changes: 5 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func BenchmarkWriteTransaction(b *testing.B) {
test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) {
for _, count := range counts {
b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

for i := 0; i < count; i++ {
transactionID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
for _, hit := range []bool{false, true} {
b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) {
sm := newStorageManager(b)
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

traceIDs := make([]string, b.N)

Expand Down Expand Up @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
}
}

readWriter = sm.NewUnlimitedReadWriter()
readWriter = newUnlimitedReadWriter(sm)

b.ResetTimer()
var batch modelpb.Batch
Expand Down Expand Up @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) {

// Test with varying numbers of events in the trace.
sm := newStorageManager(b)
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil {
b.Fatal(err)
Expand Down
134 changes: 113 additions & 21 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -46,6 +47,12 @@ const (

// diskUsageFetchInterval is how often disk usage is fetched which is equivalent to how long disk usage is cached.
diskUsageFetchInterval = 1 * time.Second

// dbStorageLimitFallback is the default fallback storage limit in bytes
// that applies when disk usage threshold cannot be enforced due to an error.
dbStorageLimitFallback = 3 << 30

gb = float64(1 << 30)
)

type StorageManagerOptions func(*StorageManager)
Expand All @@ -62,6 +69,27 @@ func WithMeterProvider(mp metric.MeterProvider) StorageManagerOptions {
}
}

// WithGetDBSize configures getDBSize function used by StorageManager.
// For testing only.
func WithGetDBSize(getDBSize func() uint64) StorageManagerOptions {
return func(sm *StorageManager) {
sm.getDBSize = getDBSize
}
}

// WithGetDiskUsage configures getDiskUsage function used by StorageManager.
// For testing only.
func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOptions {
return func(sm *StorageManager) {
sm.getDiskUsage = getDiskUsage
}
}

// DiskUsage is the struct returned by getDiskUsage.
type DiskUsage struct {
UsedBytes, TotalBytes uint64
}

// StorageManager encapsulates pebble.DB.
// It assumes exclusive access to pebble DB at storageDir.
type StorageManager struct {
Expand All @@ -80,9 +108,20 @@ type StorageManager struct {
// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// getDBSize returns the total size of databases in bytes.
getDBSize func() uint64
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

// getDiskUsage returns the disk / filesystem usage statistics of storageDir.
getDiskUsage func() (DiskUsage, error)
// getDiskUsageFailed indicates if getDiskUsage calls ever failed.
getDiskUsageFailed atomic.Bool
// cachedDiskStat is disk usage statistics about the disk only, not related to the databases.
cachedDiskStat struct {
used, total atomic.Uint64
}

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
runCh chan struct{}
Expand All @@ -100,6 +139,16 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora
runCh: make(chan struct{}, 1),
logger: logp.NewLogger(logs.Sampling),
codec: ProtobufCodec{},
getDiskUsage: func() (DiskUsage, error) {
usage, err := vfs.Default.GetDiskUsage(storageDir)
return DiskUsage{
UsedBytes: usage.UsedBytes,
TotalBytes: usage.TotalBytes,
}, err
},
}
sm.getDBSize = func() uint64 {
return sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()
}
for _, opt := range opts {
opt(sm)
Expand Down Expand Up @@ -210,7 +259,30 @@ func (sm *StorageManager) dbSize() uint64 {
}

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
sm.cachedDBSize.Store(sm.getDBSize())

if sm.getDiskUsageFailed.Load() {
// Skip GetDiskUsage under the assumption that
// it will always get the same error if GetDiskUsage ever returns one,
// such that it does not keep logging GetDiskUsage errors.
return
}
usage, err := sm.getDiskUsage()
if err != nil {
sm.logger.With(logp.Error(err)).Warn("failed to get disk usage")
sm.getDiskUsageFailed.Store(true)
sm.cachedDiskStat.used.Store(0)
sm.cachedDiskStat.total.Store(0) // setting total to 0 to disable any running disk usage threshold checks
return
}
sm.cachedDiskStat.used.Store(usage.UsedBytes)
sm.cachedDiskStat.total.Store(usage.TotalBytes)
}

// diskUsed returns the actual used disk space in bytes.
// Not to be confused with dbSize which is specific to database.
func (sm *StorageManager) diskUsed() uint64 {
return sm.cachedDiskStat.used.Load()
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand Down Expand Up @@ -338,33 +410,53 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644)
}

// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
// NewReadWriter returns a read writer configured with storage limit and disk usage threshold.
func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskUsageThreshold float64) RW {
var rw RW = SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
// If db storage limit is set, only enforce db storage limit.
if storageLimit > 0 {
// dbStorageLimit returns max size of db in bytes.
// If size of db exceeds dbStorageLimit, writes should be rejected.
dbStorageLimit := func() uint64 {
return storageLimit
}
sm.logger.Infof("setting database storage limit to %0.1fgb", float64(storageLimit)/gb)
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw)
return rw
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)
// DB storage limit is unlimited, enforce disk usage threshold if possible.
// Load whether getDiskUsage failed, as it was called during StorageManager initialization.
if sm.getDiskUsageFailed.Load() {
// Limit db size to fallback storage limit as getDiskUsage returned an error
dbStorageLimit := func() uint64 {
return dbStorageLimitFallback
}
sm.logger.Warnf("overriding database storage limit to fallback default of %0.1fgb as get disk usage failed", float64(dbStorageLimitFallback)/gb)
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw)
return rw
}

return dbStorageLimitRW
// diskThreshold returns max used disk space in bytes, not in percentage.
// If size of used disk space exceeds diskThreshold, writes should be rejected.
diskThreshold := func() uint64 {
return uint64(float64(sm.cachedDiskStat.total.Load()) * diskUsageThreshold)
}
// the total disk space could change in runtime, but it is still useful to print it out in logs.
sm.logger.Infof("setting disk usage threshold to %.2f of total disk space of %0.1fgb", diskUsageThreshold, float64(sm.cachedDiskStat.total.Load())/gb)
diskThresholdChecker := NewStorageLimitCheckerFunc(sm.diskUsed, diskThreshold)
rw = NewStorageLimitReadWriter(
fmt.Sprintf("disk usage threshold %.2f", diskUsageThreshold),
diskThresholdChecker,
rw,
)
return rw
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func BenchmarkStorageManager_Size(b *testing.B) {
defer close(stopping)
sm := newStorageManager(b)
go sm.Run(stopping, time.Second)
rw := sm.NewUnlimitedReadWriter()
rw := newUnlimitedReadWriter(sm)
for i := 0; i < 1000; i++ {
traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
Expand Down
Loading

0 comments on commit d019277

Please sign in to comment.