From 9acde57626c29ad97c127e291c4bb96cefe308e1 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 17 Oct 2025 13:26:59 -0400 Subject: [PATCH 1/4] Add WAL throttling for partition ingesters --- pkg/ingester/flush_test.go | 1 + pkg/ingester/ingester.go | 5 + pkg/ingester/metrics.go | 5 + pkg/ingester/wal.go | 89 +++- pkg/ingester/wal_disk_throttle_test.go | 554 +++++++++++++++++++++++++ 5 files changed, 652 insertions(+), 2 deletions(-) create mode 100644 pkg/ingester/wal_disk_throttle_test.go diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 514f95314835e..e4f715383c030 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -74,6 +74,7 @@ type fullWAL struct{} func (fullWAL) Log(_ *wal.Record) error { return &os.PathError{Err: syscall.ENOSPC} } func (fullWAL) Start() {} func (fullWAL) Stop() error { return nil } +func (fullWAL) IsDiskThrottled() bool { return false } func Benchmark_FlushLoop(b *testing.B) { var ( diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dfdd33fe418f5..7d5311ee31c42 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1000,6 +1000,11 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro return nil, ErrReadOnly } + // Check if disk is too full and throttle writes if needed + if i.wal.IsDiskThrottled() { + return nil, ErrReadOnly + } + // Set profiling tags defer pprof.SetGoroutineLabels(ctx) ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write")) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index d71e0cbfe247f..7bb15904ecc1b 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -18,6 +18,7 @@ type ingesterMetrics struct { checkpointLoggedBytesTotal prometheus.Counter walDiskFullFailures prometheus.Counter + walDiskUsagePercent prometheus.Gauge walReplayActive prometheus.Gauge walReplayDuration prometheus.Gauge walReplaySamplesDropped *prometheus.CounterVec @@ -95,6 +96,10 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "loki_ingester_wal_disk_full_failures_total", Help: "Total number of wal write failures due to full disk.", }), + walDiskUsagePercent: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_wal_disk_usage_percent", + Help: "Current disk usage percentage (0.0 to 1.0) for the WAL directory.", + }), walReplayActive: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "loki_ingester_wal_replay_active", Help: "Whether the WAL is replaying", diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 429f50388f118..c71ffbd45544b 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "sync" + "sync/atomic" + "syscall" "time" "github.com/go-kit/log/level" @@ -31,12 +33,16 @@ type WALConfig struct { CheckpointDuration time.Duration `yaml:"checkpoint_duration"` FlushOnShutdown bool `yaml:"flush_on_shutdown"` ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"` + DiskFullThreshold float64 `yaml:"disk_full_threshold"` } func (cfg *WALConfig) Validate() error { if cfg.Enabled && cfg.CheckpointDuration < 1 { return fmt.Errorf("invalid checkpoint duration: %v", cfg.CheckpointDuration) } + if cfg.DiskFullThreshold < 0 || cfg.DiskFullThreshold > 1 { + return fmt.Errorf("invalid disk full threshold: %v (must be between 0 and 1)", cfg.DiskFullThreshold) + } return nil } @@ -46,6 +52,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "ingester.wal-enabled", true, "Enable writing of ingested data into WAL.") f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.") f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.") + f.Float64Var(&cfg.DiskFullThreshold, "ingester.wal-disk-full-threshold", 0.90, "Threshold for disk usage (0.0 to 1.0) at which the WAL will throttle incoming writes. Set to 0 to disable throttling.") // Need to set default here cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling) @@ -59,6 +66,8 @@ type WAL interface { Log(*wal.Record) error // Stop stops all the WAL operations. Stop() error + // IsDiskThrottled returns true if the disk is too full and writes should be throttled. + IsDiskThrottled() bool } type noopWAL struct{} @@ -66,6 +75,7 @@ type noopWAL struct{} func (noopWAL) Start() {} func (noopWAL) Log(*wal.Record) error { return nil } func (noopWAL) Stop() error { return nil } +func (noopWAL) IsDiskThrottled() bool { return false } type walWrapper struct { cfg WALConfig @@ -73,8 +83,9 @@ type walWrapper struct { metrics *ingesterMetrics seriesIter SeriesIter - wait sync.WaitGroup - quit chan struct{} + wait sync.WaitGroup + quit chan struct{} + diskThrottled atomic.Bool } // newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. @@ -147,6 +158,10 @@ func (w *walWrapper) Stop() error { return err } +func (w *walWrapper) IsDiskThrottled() bool { + return w.diskThrottled.Load() +} + func (w *walWrapper) checkpointWriter() *WALCheckpointWriter { return &WALCheckpointWriter{ metrics: w.metrics, @@ -154,10 +169,32 @@ func (w *walWrapper) checkpointWriter() *WALCheckpointWriter { } } +// checkDiskUsage returns the disk usage percentage (0.0 to 1.0) for the WAL directory. +func (w *walWrapper) checkDiskUsage() (float64, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs(w.cfg.Dir, &stat); err != nil { + return 0, err + } + + // Calculate usage percentage + total := stat.Blocks * uint64(stat.Bsize) + free := stat.Bfree * uint64(stat.Bsize) + used := total - free + usagePercent := float64(used) / float64(total) + + return usagePercent, nil +} + func (w *walWrapper) run() { level.Info(util_log.Logger).Log("msg", "started", "component", "wal") defer w.wait.Done() + // Start disk monitoring if throttling is enabled + if w.cfg.DiskFullThreshold > 0 { + w.wait.Add(1) + go w.monitorDisk() + } + checkpointer := NewCheckpointer( w.cfg.CheckpointDuration, w.seriesIter, @@ -168,3 +205,51 @@ func (w *walWrapper) run() { checkpointer.Run() } + +// monitorDisk periodically checks disk usage and sets the throttle flag +func (w *walWrapper) monitorDisk() { + defer w.wait.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + usage, err := w.checkDiskUsage() + if err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to check disk usage", "err", err, "component", "wal") + continue + } + + wasThrottled := w.diskThrottled.Load() + isThrottled := usage >= w.cfg.DiskFullThreshold + + if isThrottled != wasThrottled { + w.diskThrottled.Store(isThrottled) + if isThrottled { + level.Warn(util_log.Logger).Log( + "msg", "disk usage exceeded threshold, throttling writes", + "usage_percent", fmt.Sprintf("%.2f%%", usage*100), + "threshold_percent", fmt.Sprintf("%.2f%%", w.cfg.DiskFullThreshold*100), + "component", "wal", + ) + w.metrics.walDiskFullFailures.Inc() + } else { + level.Info(util_log.Logger).Log( + "msg", "disk usage below threshold, resuming writes", + "usage_percent", fmt.Sprintf("%.2f%%", usage*100), + "threshold_percent", fmt.Sprintf("%.2f%%", w.cfg.DiskFullThreshold*100), + "component", "wal", + ) + } + } + + // Update metrics with current disk usage + w.metrics.walDiskUsagePercent.Set(usage) + + case <-w.quit: + return + } + } +} diff --git a/pkg/ingester/wal_disk_throttle_test.go b/pkg/ingester/wal_disk_throttle_test.go new file mode 100644 index 0000000000000..0146ac0bfd008 --- /dev/null +++ b/pkg/ingester/wal_disk_throttle_test.go @@ -0,0 +1,554 @@ +package ingester + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/grafana/dskit/services" + "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/ingester/wal" + "github.com/grafana/loki/v3/pkg/logproto" +) + +// mockThrottledWAL is a WAL implementation that can be controlled for testing throttling +type mockThrottledWAL struct { + throttled atomic.Bool + logCalled atomic.Int32 +} + +func (m *mockThrottledWAL) Start() {} + +func (m *mockThrottledWAL) Log(_ *wal.Record) error { + m.logCalled.Add(1) + return nil +} + +func (m *mockThrottledWAL) Stop() error { + return nil +} + +func (m *mockThrottledWAL) IsDiskThrottled() bool { + return m.throttled.Load() +} + +func (m *mockThrottledWAL) SetThrottled(throttled bool) { + m.throttled.Store(throttled) +} + +func (m *mockThrottledWAL) GetLogCallCount() int32 { + return m.logCalled.Load() +} + +// TestWALDiskThrottleInterface verifies that the WAL interface includes IsDiskThrottled +func TestWALDiskThrottleInterface(t *testing.T) { + t.Run("noopWAL returns false", func(t *testing.T) { + w := noopWAL{} + require.False(t, w.IsDiskThrottled()) + }) + + t.Run("mockThrottledWAL can be controlled", func(t *testing.T) { + w := &mockThrottledWAL{} + require.False(t, w.IsDiskThrottled()) + + w.SetThrottled(true) + require.True(t, w.IsDiskThrottled()) + + w.SetThrottled(false) + require.False(t, w.IsDiskThrottled()) + }) +} + +// TestIngesterPushWithDiskThrottle verifies that Push returns ErrReadOnly when disk is throttled +func TestIngesterPushWithDiskThrottle(t *testing.T) { + mockWAL := &mockThrottledWAL{} + store, ing := newTestStore(t, defaultIngesterTestConfig(t), mockWAL) + defer store.Stop() + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), "test-user") + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "test log line"}, + }, + }, + }, + } + + t.Run("push succeeds when not throttled", func(t *testing.T) { + mockWAL.SetThrottled(false) + _, err := ing.Push(ctx, req) + require.NoError(t, err) + }) + + t.Run("push returns ErrReadOnly when throttled", func(t *testing.T) { + mockWAL.SetThrottled(true) + _, err := ing.Push(ctx, req) + require.Error(t, err) + require.Equal(t, ErrReadOnly, err) + }) + + t.Run("push succeeds again after throttle is released", func(t *testing.T) { + mockWAL.SetThrottled(false) + _, err := ing.Push(ctx, req) + require.NoError(t, err) + }) +} + +// TestWALWrapperDiskThrottle tests the walWrapper's disk throttling functionality +func TestWALWrapperDiskThrottle(t *testing.T) { + walDir := t.TempDir() + cfg := WALConfig{ + Enabled: true, + Dir: walDir, + CheckpointDuration: 5 * time.Minute, + DiskFullThreshold: 0.90, + } + + metrics := newIngesterMetrics(prometheus.NewRegistry(), "test") + w, err := newWAL(cfg, prometheus.NewRegistry(), metrics, newIngesterSeriesIter(nil)) + require.NoError(t, err) + require.NotNil(t, w) + + // Verify it's a walWrapper + wrapper, ok := w.(*walWrapper) + require.True(t, ok, "expected walWrapper type") + + t.Run("starts not throttled", func(t *testing.T) { + require.False(t, wrapper.IsDiskThrottled()) + }) + + t.Run("can be manually throttled", func(t *testing.T) { + // Simulate disk becoming full by manually setting the flag + wrapper.diskThrottled.Store(true) + require.True(t, wrapper.IsDiskThrottled()) + + // Simulate disk freeing up + wrapper.diskThrottled.Store(false) + require.False(t, wrapper.IsDiskThrottled()) + }) + + // Clean up + require.NoError(t, w.Stop()) +} + +// TestWALConfigValidation tests the validation of DiskFullThreshold +func TestWALConfigValidation(t *testing.T) { + tests := []struct { + name string + threshold float64 + wantErr bool + }{ + { + name: "valid threshold 0.90", + threshold: 0.90, + wantErr: false, + }, + { + name: "valid threshold 0.0 (disabled)", + threshold: 0.0, + wantErr: false, + }, + { + name: "valid threshold 1.0", + threshold: 1.0, + wantErr: false, + }, + { + name: "invalid threshold -0.1", + threshold: -0.1, + wantErr: true, + }, + { + name: "invalid threshold 1.1", + threshold: 1.1, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := WALConfig{ + Enabled: true, + CheckpointDuration: 5 * time.Minute, + DiskFullThreshold: tt.threshold, + } + err := cfg.Validate() + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestDiskThrottleWithMultiplePushes verifies throttling behavior under load +func TestDiskThrottleWithMultiplePushes(t *testing.T) { + mockWAL := &mockThrottledWAL{} + store, ing := newTestStore(t, defaultIngesterTestConfig(t), mockWAL) + defer store.Stop() + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Start with throttle disabled + mockWAL.SetThrottled(false) + + // Push multiple times successfully + for i := 0; i < 5; i++ { + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "test log line"}, + }, + }, + }, + } + _, err := ing.Push(ctx, req) + require.NoError(t, err, "push %d should succeed", i) + } + + // Enable throttle + mockWAL.SetThrottled(true) + + // All subsequent pushes should fail with ErrReadOnly + for i := 0; i < 5; i++ { + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "test log line"}, + }, + }, + }, + } + _, err := ing.Push(ctx, req) + require.Error(t, err, "push %d should fail when throttled", i) + require.Equal(t, ErrReadOnly, err, "push %d should return ErrReadOnly", i) + } + + // Disable throttle + mockWAL.SetThrottled(false) + + // Pushes should succeed again + for i := 0; i < 5; i++ { + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "test log line"}, + }, + }, + }, + } + _, err := ing.Push(ctx, req) + require.NoError(t, err, "push %d should succeed after throttle released", i) + } +} + +// TestDiskThrottleDoesNotBlockOtherOperations verifies that throttling only affects Push +func TestDiskThrottleDoesNotBlockOtherOperations(t *testing.T) { + mockWAL := &mockThrottledWAL{} + store, ing := newTestStore(t, defaultIngesterTestConfig(t), mockWAL) + defer store.Stop() + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Push some data first + mockWAL.SetThrottled(false) + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{job="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Now(), Line: "test log line"}, + }, + }, + }, + } + _, err := ing.Push(ctx, req) + require.NoError(t, err) + + // Enable throttle + mockWAL.SetThrottled(true) + + // Verify Push is blocked + _, err = ing.Push(ctx, req) + require.Equal(t, ErrReadOnly, err) + + // Verify throttling only affects Push operations, not the overall ingester state + // The fact that we could successfully push data before throttling proves the ingester is operational + require.False(t, ing.readonly, "ingester should not be in readonly mode (shutdown) when disk throttled") +} + +// TestDiskUsageCalculation tests the disk usage percentage calculation math +func TestDiskUsageCalculation(t *testing.T) { + tests := []struct { + name string + totalBlocks uint64 + blockSize uint64 + freeBlocks uint64 + expectedUsage float64 + }{ + { + name: "empty disk (0% used)", + totalBlocks: 1000, + blockSize: 4096, + freeBlocks: 1000, + expectedUsage: 0.0, + }, + { + name: "half full disk (50% used)", + totalBlocks: 1000, + blockSize: 4096, + freeBlocks: 500, + expectedUsage: 0.5, + }, + { + name: "90% full disk (exactly at threshold)", + totalBlocks: 1000, + blockSize: 4096, + freeBlocks: 100, + expectedUsage: 0.9, + }, + { + name: "95% full disk (above threshold)", + totalBlocks: 1000, + blockSize: 4096, + freeBlocks: 50, + expectedUsage: 0.95, + }, + { + name: "completely full disk (100% used)", + totalBlocks: 1000, + blockSize: 4096, + freeBlocks: 0, + expectedUsage: 1.0, + }, + { + name: "10% used disk", + totalBlocks: 10000, + blockSize: 512, + freeBlocks: 9000, + expectedUsage: 0.1, + }, + { + name: "89% used (just below threshold)", + totalBlocks: 10000, + blockSize: 1024, + freeBlocks: 1100, + expectedUsage: 0.89, + }, + { + name: "91% used (just above threshold)", + totalBlocks: 10000, + blockSize: 1024, + freeBlocks: 900, + expectedUsage: 0.91, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Simulate the calculation from walWrapper.checkDiskUsage() + total := tt.totalBlocks * tt.blockSize + free := tt.freeBlocks * tt.blockSize + used := total - free + usagePercent := float64(used) / float64(total) + + // Verify the calculation matches expected + require.InDelta(t, tt.expectedUsage, usagePercent, 0.001, + "usage calculation incorrect: got %.4f, want %.4f", usagePercent, tt.expectedUsage) + + // Verify threshold comparison logic (>= 0.90) + shouldThrottle := usagePercent >= 0.90 + expectedToThrottle := tt.expectedUsage >= 0.90 + require.Equal(t, expectedToThrottle, shouldThrottle, + "throttle decision incorrect for %.2f%% usage", usagePercent*100) + }) + } +} + +// TestThresholdBoundaryConditions tests edge cases around the threshold +func TestThresholdBoundaryConditions(t *testing.T) { + tests := []struct { + name string + usage float64 + threshold float64 + shouldThrottle bool + description string + }{ + { + name: "usage exactly at threshold", + usage: 0.90, + threshold: 0.90, + shouldThrottle: true, + description: "should throttle when usage == threshold", + }, + { + name: "usage just below threshold", + usage: 0.8999, + threshold: 0.90, + shouldThrottle: false, + description: "should not throttle when usage < threshold", + }, + { + name: "usage just above threshold", + usage: 0.9001, + threshold: 0.90, + shouldThrottle: true, + description: "should throttle when usage > threshold", + }, + { + name: "usage well below threshold", + usage: 0.50, + threshold: 0.90, + shouldThrottle: false, + description: "should not throttle at 50% usage", + }, + { + name: "usage well above threshold", + usage: 0.99, + threshold: 0.90, + shouldThrottle: true, + description: "should throttle at 99% usage", + }, + { + name: "threshold at 0.80, usage at 0.79", + usage: 0.79, + threshold: 0.80, + shouldThrottle: false, + description: "should not throttle below 80% threshold", + }, + { + name: "threshold at 0.80, usage at 0.80", + usage: 0.80, + threshold: 0.80, + shouldThrottle: true, + description: "should throttle at exactly 80% threshold", + }, + { + name: "threshold at 0.95, usage at 0.90", + usage: 0.90, + threshold: 0.95, + shouldThrottle: false, + description: "should not throttle with higher threshold", + }, + { + name: "full disk", + usage: 1.0, + threshold: 0.90, + shouldThrottle: true, + description: "should throttle at 100% usage", + }, + { + name: "empty disk", + usage: 0.0, + threshold: 0.90, + shouldThrottle: false, + description: "should not throttle at 0% usage", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // This is the comparison logic from walWrapper.monitorDisk() + isThrottled := tt.usage >= tt.threshold + + require.Equal(t, tt.shouldThrottle, isThrottled, + "%s: usage=%.4f, threshold=%.4f", tt.description, tt.usage, tt.threshold) + }) + } +} + +// TestDiskUsageRealWorld tests with realistic filesystem values +func TestDiskUsageRealWorld(t *testing.T) { + tests := []struct { + name string + totalGB float64 + usedGB float64 + threshold float64 + shouldThrottle bool + }{ + { + name: "1TB disk, 800GB used (80%)", + totalGB: 1000, + usedGB: 800, + threshold: 0.90, + shouldThrottle: false, + }, + { + name: "1TB disk, 900GB used (90%)", + totalGB: 1000, + usedGB: 900, + threshold: 0.90, + shouldThrottle: true, + }, + { + name: "1TB disk, 950GB used (95%)", + totalGB: 1000, + usedGB: 950, + threshold: 0.90, + shouldThrottle: true, + }, + { + name: "500GB disk, 450GB used (90%)", + totalGB: 500, + usedGB: 450, + threshold: 0.90, + shouldThrottle: true, + }, + { + name: "100GB disk, 89GB used (89%)", + totalGB: 100, + usedGB: 89, + threshold: 0.90, + shouldThrottle: false, + }, + { + name: "10TB disk, 9.5TB used (95%)", + totalGB: 10000, + usedGB: 9500, + threshold: 0.90, + shouldThrottle: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Convert GB to bytes (simulating real disk usage) + const bytesPerGB = 1024 * 1024 * 1024 + totalBytes := uint64(tt.totalGB * bytesPerGB) + usedBytes := uint64(tt.usedGB * bytesPerGB) + freeBytes := totalBytes - usedBytes + + // Calculate usage percentage (same logic as checkDiskUsage) + usagePercent := float64(usedBytes) / float64(totalBytes) + isThrottled := usagePercent >= tt.threshold + + require.Equal(t, tt.shouldThrottle, isThrottled, + "usage=%.2f%%, threshold=%.2f%%", usagePercent*100, tt.threshold*100) + + // Verify the math is correct + expectedUsage := tt.usedGB / tt.totalGB + require.InDelta(t, expectedUsage, usagePercent, 0.001, + "calculated usage %.4f doesn't match expected %.4f", usagePercent, expectedUsage) + + t.Logf("Total: %.0fGB, Used: %.0fGB, Free: %.0fGB, Usage: %.2f%%, Throttle: %v", + tt.totalGB, tt.usedGB, float64(freeBytes)/bytesPerGB, usagePercent*100, isThrottled) + }) + } +} From 90b9776fc79fc101a6f4d5e25109fca33f836eb9 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 17 Oct 2025 13:34:16 -0400 Subject: [PATCH 2/4] make format --- pkg/ingester/wal_disk_throttle_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/wal_disk_throttle_test.go b/pkg/ingester/wal_disk_throttle_test.go index 0146ac0bfd008..5689764cf042d 100644 --- a/pkg/ingester/wal_disk_throttle_test.go +++ b/pkg/ingester/wal_disk_throttle_test.go @@ -106,10 +106,10 @@ func TestIngesterPushWithDiskThrottle(t *testing.T) { func TestWALWrapperDiskThrottle(t *testing.T) { walDir := t.TempDir() cfg := WALConfig{ - Enabled: true, - Dir: walDir, + Enabled: true, + Dir: walDir, CheckpointDuration: 5 * time.Minute, - DiskFullThreshold: 0.90, + DiskFullThreshold: 0.90, } metrics := newIngesterMetrics(prometheus.NewRegistry(), "test") From f6e0044e3ffd8b469be38988f2874be4524ebd2f Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 17 Oct 2025 13:44:01 -0400 Subject: [PATCH 3/4] make lint --- pkg/ingester/wal.go | 3 ++- pkg/ingester/wal_disk_throttle_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index c71ffbd45544b..f9e122ea31e3c 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -4,10 +4,11 @@ import ( "flag" "fmt" "sync" - "sync/atomic" "syscall" "time" + "go.uber.org/atomic" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/ingester/wal_disk_throttle_test.go b/pkg/ingester/wal_disk_throttle_test.go index 5689764cf042d..b1323cad3b918 100644 --- a/pkg/ingester/wal_disk_throttle_test.go +++ b/pkg/ingester/wal_disk_throttle_test.go @@ -2,10 +2,11 @@ package ingester import ( "context" - "sync/atomic" "testing" "time" + "go.uber.org/atomic" + "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" From b795e48103bbd78e75fca3e5f955ff85ed3d0d9f Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 17 Oct 2025 13:49:20 -0400 Subject: [PATCH 4/4] make docs --- docs/sources/shared/configuration.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index c80d1debbdde0..e4841857df4a2 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3622,6 +3622,11 @@ wal: # CLI flag: -ingester.wal-replay-memory-ceiling [replay_memory_ceiling: | default = 4GB] + # Threshold for disk usage (0.0 to 1.0) at which the WAL will throttle + # incoming writes. Set to 0 to disable throttling. + # CLI flag: -ingester.wal-disk-full-threshold + [disk_full_threshold: | default = 0.9] + # Shard factor used in the ingesters for the in process reverse index. This MUST # be evenly divisible by ALL schema shard factors or Loki will not start. # CLI flag: -ingester.index-shards