Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3622,6 +3622,11 @@ wal:
# CLI flag: -ingester.wal-replay-memory-ceiling
[replay_memory_ceiling: <int> | default = 4GB]

# Threshold for disk usage (0.0 to 1.0) at which the WAL will throttle
# incoming writes. Set to 0 to disable throttling.
# CLI flag: -ingester.wal-disk-full-threshold
[disk_full_threshold: <float> | default = 0.9]

# Shard factor used in the ingesters for the in process reverse index. This MUST
# be evenly divisible by ALL schema shard factors or Loki will not start.
# CLI flag: -ingester.index-shards
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type fullWAL struct{}
func (fullWAL) Log(_ *wal.Record) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Start() {}
func (fullWAL) Stop() error { return nil }
func (fullWAL) IsDiskThrottled() bool { return false }

func Benchmark_FlushLoop(b *testing.B) {
var (
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,11 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return nil, ErrReadOnly
}

// Check if disk is too full and throttle writes if needed
if i.wal.IsDiskThrottled() {
return nil, ErrReadOnly
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write"))
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ingesterMetrics struct {
checkpointLoggedBytesTotal prometheus.Counter

walDiskFullFailures prometheus.Counter
walDiskUsagePercent prometheus.Gauge
walReplayActive prometheus.Gauge
walReplayDuration prometheus.Gauge
walReplaySamplesDropped *prometheus.CounterVec
Expand Down Expand Up @@ -95,6 +96,10 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "loki_ingester_wal_disk_full_failures_total",
Help: "Total number of wal write failures due to full disk.",
}),
walDiskUsagePercent: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_wal_disk_usage_percent",
Help: "Current disk usage percentage (0.0 to 1.0) for the WAL directory.",
}),
walReplayActive: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_wal_replay_active",
Help: "Whether the WAL is replaying",
Expand Down
90 changes: 88 additions & 2 deletions pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"flag"
"fmt"
"sync"
"syscall"
"time"

"go.uber.org/atomic"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -31,12 +34,16 @@ type WALConfig struct {
CheckpointDuration time.Duration `yaml:"checkpoint_duration"`
FlushOnShutdown bool `yaml:"flush_on_shutdown"`
ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"`
DiskFullThreshold float64 `yaml:"disk_full_threshold"`
}

func (cfg *WALConfig) Validate() error {
if cfg.Enabled && cfg.CheckpointDuration < 1 {
return fmt.Errorf("invalid checkpoint duration: %v", cfg.CheckpointDuration)
}
if cfg.DiskFullThreshold < 0 || cfg.DiskFullThreshold > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot use 0 to disable it (but is mentioned in the docs).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How so? This is validating the number is less than zero or greater than 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misread like an idiot 😆

return fmt.Errorf("invalid disk full threshold: %v (must be between 0 and 1)", cfg.DiskFullThreshold)
}
return nil
}

Expand All @@ -46,6 +53,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "ingester.wal-enabled", true, "Enable writing of ingested data into WAL.")
f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.")
f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.")
f.Float64Var(&cfg.DiskFullThreshold, "ingester.wal-disk-full-threshold", 0.90, "Threshold for disk usage (0.0 to 1.0) at which the WAL will throttle incoming writes. Set to 0 to disable throttling.")

// Need to set default here
cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling)
Expand All @@ -59,22 +67,26 @@ type WAL interface {
Log(*wal.Record) error
// Stop stops all the WAL operations.
Stop() error
// IsDiskThrottled returns true if the disk is too full and writes should be throttled.
IsDiskThrottled() bool
}

type noopWAL struct{}

func (noopWAL) Start() {}
func (noopWAL) Log(*wal.Record) error { return nil }
func (noopWAL) Stop() error { return nil }
func (noopWAL) IsDiskThrottled() bool { return false }

type walWrapper struct {
cfg WALConfig
wal *wlog.WL
metrics *ingesterMetrics
seriesIter SeriesIter

wait sync.WaitGroup
quit chan struct{}
wait sync.WaitGroup
quit chan struct{}
diskThrottled atomic.Bool
}

// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL.
Expand Down Expand Up @@ -147,17 +159,43 @@ func (w *walWrapper) Stop() error {
return err
}

func (w *walWrapper) IsDiskThrottled() bool {
return w.diskThrottled.Load()
}

func (w *walWrapper) checkpointWriter() *WALCheckpointWriter {
return &WALCheckpointWriter{
metrics: w.metrics,
segmentWAL: w.wal,
}
}

// checkDiskUsage returns the disk usage percentage (0.0 to 1.0) for the WAL directory.
func (w *walWrapper) checkDiskUsage() (float64, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(w.cfg.Dir, &stat); err != nil {
return 0, err
}

// Calculate usage percentage
total := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := total - free
usagePercent := float64(used) / float64(total)

return usagePercent, nil
}

func (w *walWrapper) run() {
level.Info(util_log.Logger).Log("msg", "started", "component", "wal")
defer w.wait.Done()

// Start disk monitoring if throttling is enabled
if w.cfg.DiskFullThreshold > 0 {
w.wait.Add(1)
go w.monitorDisk()
}

checkpointer := NewCheckpointer(
w.cfg.CheckpointDuration,
w.seriesIter,
Expand All @@ -168,3 +206,51 @@ func (w *walWrapper) run() {
checkpointer.Run()

}

// monitorDisk periodically checks disk usage and sets the throttle flag
func (w *walWrapper) monitorDisk() {
defer w.wait.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
usage, err := w.checkDiskUsage()
if err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to check disk usage", "err", err, "component", "wal")
continue
}

wasThrottled := w.diskThrottled.Load()
isThrottled := usage >= w.cfg.DiskFullThreshold

if isThrottled != wasThrottled {
w.diskThrottled.Store(isThrottled)
if isThrottled {
level.Warn(util_log.Logger).Log(
"msg", "disk usage exceeded threshold, throttling writes",
"usage_percent", fmt.Sprintf("%.2f%%", usage*100),
"threshold_percent", fmt.Sprintf("%.2f%%", w.cfg.DiskFullThreshold*100),
"component", "wal",
)
w.metrics.walDiskFullFailures.Inc()
} else {
level.Info(util_log.Logger).Log(
"msg", "disk usage below threshold, resuming writes",
"usage_percent", fmt.Sprintf("%.2f%%", usage*100),
"threshold_percent", fmt.Sprintf("%.2f%%", w.cfg.DiskFullThreshold*100),
"component", "wal",
)
}
}

// Update metrics with current disk usage
w.metrics.walDiskUsagePercent.Set(usage)

case <-w.quit:
return
}
}
}
Loading