Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions cmd/cefasdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,22 @@ func main() {
storageLaneWriteQueue = flag.Int("storage-lane-write-queue", 0, "Write lane queue capacity. 0 inherits default.")

// Adaptive write backpressure.
backpressureEnabled = flag.Bool("storage-backpressure", false, "Enable write backpressure from Pebble pressure metrics.")
backpressureReject = flag.Bool("storage-backpressure-reject-critical", false, "Reject writes instead of only sleeping when pressure is critical.")
backpressureWarnL0 = flag.Int64("storage-backpressure-warning-l0-files", 0, "Warning L0 file threshold. 0 uses default.")
backpressureCriticalL0 = flag.Int64("storage-backpressure-critical-l0-files", 0, "Critical L0 file threshold. 0 uses default.")
backpressureWarnDebt = flag.Uint64("storage-backpressure-warning-debt", 0, "Warning compaction debt threshold in bytes. 0 uses default.")
backpressureCriticalDebt = flag.Uint64("storage-backpressure-critical-debt", 0, "Critical compaction debt threshold in bytes. 0 uses default.")
backpressureWarnReadAmp = flag.Int("storage-backpressure-warning-read-amp", 0, "Warning Pebble read amplification threshold. 0 uses default.")
backpressureCritReadAmp = flag.Int("storage-backpressure-critical-read-amp", 0, "Critical Pebble read amplification threshold. 0 uses default.")
backpressureWarnDelay = flag.Duration("storage-backpressure-warning-delay", 0, "Delay applied to writes in warning state. 0 uses default.")
backpressureCritDelay = flag.Duration("storage-backpressure-critical-delay", 0, "Delay applied to writes in critical state. 0 uses default.")
streamRetention = flag.Duration("storage-stream-retention", 0, "DynamoDB Streams retention window. 0 inherits config/default 24h.")
streamRetentionMaxBytes = flag.Int64("storage-stream-retention-max-bytes", 0, "Maximum logical DynamoDB Streams retained bytes per table. 0 disables byte cap.")
storageChangeLogMode = flag.String("storage-changelog-mode", "", "Physical changelog mode: always, streams-only, or off. Empty inherits config/default.")
storageAdaptiveMode = flag.Bool("storage-adaptive-mode", false, "Enable workload-mode adaptive tuner (read/write ratio observer adjusts commitLoop merge cap and retention interval). Off by default.")
backpressureEnabled = flag.Bool("storage-backpressure", false, "Enable write backpressure from Pebble pressure metrics.")
backpressureReject = flag.Bool("storage-backpressure-reject-critical", false, "Reject writes instead of only sleeping when pressure is critical.")
backpressureWarnL0 = flag.Int64("storage-backpressure-warning-l0-files", 0, "Warning L0 file threshold. 0 uses default.")
backpressureCriticalL0 = flag.Int64("storage-backpressure-critical-l0-files", 0, "Critical L0 file threshold. 0 uses default.")
backpressureWarnDebt = flag.Uint64("storage-backpressure-warning-debt", 0, "Warning compaction debt threshold in bytes. 0 uses default.")
backpressureCriticalDebt = flag.Uint64("storage-backpressure-critical-debt", 0, "Critical compaction debt threshold in bytes. 0 uses default.")
backpressureWarnReadAmp = flag.Int("storage-backpressure-warning-read-amp", 0, "Warning Pebble read amplification threshold. 0 uses default.")
backpressureCritReadAmp = flag.Int("storage-backpressure-critical-read-amp", 0, "Critical Pebble read amplification threshold. 0 uses default.")
backpressureWarnDelay = flag.Duration("storage-backpressure-warning-delay", 0, "Delay applied to writes in warning state. 0 uses default.")
backpressureCritDelay = flag.Duration("storage-backpressure-critical-delay", 0, "Delay applied to writes in critical state. 0 uses default.")
streamRetention = flag.Duration("storage-stream-retention", 0, "DynamoDB Streams retention window. 0 inherits config/default 24h.")
streamRetentionInterval = flag.Duration("storage-stream-retention-interval", 0, "Background CDC retention cleanup interval. Positive enables; negative disables. 0 inherits config/default.")
streamRetentionMaxBytes = flag.Int64("storage-stream-retention-max-bytes", 0, "Deprecated compatibility knob; physical CDC retention is time-window based.")
streamRetentionCleanupBatch = flag.Int("storage-stream-retention-cleanup-batch-size", 0, "Maximum expired CDC records removed per cleanup tick. 0 inherits config/default.")
storageChangeLogMode = flag.String("storage-changelog-mode", "", "Physical changelog mode: always, streams-only, or off. Empty inherits config/default.")
storageAdaptiveMode = flag.Bool("storage-adaptive-mode", false, "Enable workload-mode adaptive tuner (read/write ratio observer adjusts commitLoop merge cap and retention interval). Off by default.")

// Identity/auth flags. Empty -identity-jwks-url keeps the
// server open (single-node dev mode).
Expand Down Expand Up @@ -200,7 +202,7 @@ func main() {
*backpressureEnabled, *backpressureReject, *backpressureWarnL0, *backpressureCriticalL0,
*backpressureWarnDebt, *backpressureCriticalDebt, *backpressureWarnReadAmp,
*backpressureCritReadAmp, *backpressureWarnDelay, *backpressureCritDelay,
*streamRetention, *streamRetentionMaxBytes, *storageChangeLogMode,
*streamRetention, *streamRetentionInterval, *streamRetentionMaxBytes, *streamRetentionCleanupBatch, *storageChangeLogMode,
*identityJwks, *identityIssuer, *identityAudience, *identityClockSkew,
*shardsN, *replicationFactor, *muxAddr,
*grpcAddr, *grpcReflection, *tlsCert, *tlsKey, *mtlsCA,
Expand Down Expand Up @@ -368,7 +370,7 @@ func main() {
// for any stream-enabled table that declares
// StreamSpecification.RetentionSeconds. Zero means "inherit
// cluster default".
db.AttachStreamRetentionResolver(func(table string) int64 {
streamRetentionResolver := func(table string) int64 {
if cat == nil {
return 0
}
Expand All @@ -377,7 +379,15 @@ func main() {
return 0
}
return td.StreamSpecification.RetentionSeconds
})
}
db.AttachStreamRetentionResolver(streamRetentionResolver)
if mgr != nil {
for _, sh := range mgr.Shards() {
if sh != nil && sh.Storage != nil {
sh.Storage.AttachStreamRetentionResolver(streamRetentionResolver)
}
}
}

var raftStore *pebble.DB
if mgr == nil && cfg.Raft.Bind != "" {
Expand Down
2 changes: 2 additions & 0 deletions dist/helm/cefas/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ data:
reflection: true
storage:
fsyncOnCommit: {{ .Values.cluster.fsyncOnCommit }}
streamRetentionInterval: {{ .Values.storage.streamRetentionInterval | quote }}
streamRetentionCleanupBatchSize: {{ .Values.storage.streamRetentionCleanupBatchSize }}
cluster:
shards: {{ .Values.cluster.shards }}
replicationFactor: {{ include "cefas.resolvedReplicationFactor" . }}
Expand Down
5 changes: 5 additions & 0 deletions dist/helm/cefas/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ cluster:
bootstrap: true
fsyncOnCommit: false

storage:
# Physical CDC retention cleanup. Negative interval disables the cleaner.
streamRetentionInterval: 30s
streamRetentionCleanupBatchSize: 65536

# Guards each raft-id with a persistent lease so a stale process cannot
# keep serving after Kubernetes starts a replacement with the same id.
raftIdentity:
Expand Down
9 changes: 8 additions & 1 deletion internal/bootstrap/server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func OverlayFlags(
backpressureWarnDebt, backpressureCriticalDebt uint64,
backpressureWarnReadAmp, backpressureCriticalReadAmp int,
backpressureWarnDelay, backpressureCriticalDelay time.Duration,
streamRetention time.Duration, streamRetentionMaxBytes int64,
streamRetention, streamRetentionInterval time.Duration, streamRetentionMaxBytes int64,
streamRetentionCleanupBatch int,
storageChangeLogMode string,
identityJwks, identityIssuer, identityAudience string, identityClockSkew time.Duration,
shardsN, replicationFactor int, muxAddr string,
Expand Down Expand Up @@ -200,9 +201,15 @@ func OverlayFlags(
if streamRetention > 0 {
cfg.Storage.StreamRetention = streamRetention
}
if streamRetentionInterval != 0 {
cfg.Storage.StreamRetentionInterval = streamRetentionInterval
}
if streamRetentionMaxBytes > 0 {
cfg.Storage.StreamRetentionMaxBytes = streamRetentionMaxBytes
}
if streamRetentionCleanupBatch > 0 {
cfg.Storage.StreamRetentionCleanupBatch = streamRetentionCleanupBatch
}
if storageChangeLogMode != "" {
cfg.Storage.ChangeLogMode = storageChangeLogMode
}
Expand Down
19 changes: 15 additions & 4 deletions internal/bootstrap/server/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ type overlayArgs struct {
backpressureWarnReadAmp, backpressureCritRA int
backpressureWarnDelay, backpressureCritDelay time.Duration

streamRetention time.Duration
streamRetentionMaxBytes int64
storageChangeLogMode string
streamRetention time.Duration
streamRetentionInterval time.Duration
streamRetentionMaxBytes int64
streamRetentionCleanupBatch int
storageChangeLogMode string

identityJwks, identityIssuer, identityAudience string
identityClockSkew time.Duration
Expand Down Expand Up @@ -157,7 +159,8 @@ func runOverlay(cfg *config.Config, a overlayArgs) {
a.backpressureWarnDebt, a.backpressureCriticalDbt,
a.backpressureWarnReadAmp, a.backpressureCritRA,
a.backpressureWarnDelay, a.backpressureCritDelay,
a.streamRetention, a.streamRetentionMaxBytes, a.storageChangeLogMode,
a.streamRetention, a.streamRetentionInterval, a.streamRetentionMaxBytes,
a.streamRetentionCleanupBatch, a.storageChangeLogMode,
a.identityJwks, a.identityIssuer, a.identityAudience, a.identityClockSkew,
a.shardsN, a.replicationFactor, a.muxAddr,
a.grpcAddr, a.grpcRefl, a.tlsCert, a.tlsKey, a.mCA,
Expand Down Expand Up @@ -390,7 +393,9 @@ func TestOverlayFlags_RetentionAndBackup(t *testing.T) {
cfg := baseCfg()
args := zeroArgs()
args.streamRetention = 12 * time.Hour
args.streamRetentionInterval = 5 * time.Minute
args.streamRetentionMaxBytes = 256 << 20
args.streamRetentionCleanupBatch = 12345
args.storageChangeLogMode = "streams-only"
args.backupSchedulerEnabled = true
args.backupSchedulerInterval = 2 * time.Hour
Expand All @@ -407,6 +412,12 @@ func TestOverlayFlags_RetentionAndBackup(t *testing.T) {
if cfg.Storage.StreamRetentionMaxBytes != 256<<20 {
t.Errorf("StreamRetentionMaxBytes = %d", cfg.Storage.StreamRetentionMaxBytes)
}
if cfg.Storage.StreamRetentionInterval != 5*time.Minute {
t.Errorf("StreamRetentionInterval = %v", cfg.Storage.StreamRetentionInterval)
}
if cfg.Storage.StreamRetentionCleanupBatch != 12345 {
t.Errorf("StreamRetentionCleanupBatch = %d", cfg.Storage.StreamRetentionCleanupBatch)
}
if cfg.Storage.ChangeLogMode != "streams-only" {
t.Errorf("ChangeLogMode = %q", cfg.Storage.ChangeLogMode)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/bootstrap/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func StreamRetentionOptions(cfg config.Config) pebble.StreamRetentionOptions {
return pebble.StreamRetentionOptions{
Retention: cfg.Storage.StreamRetention,
MaxBytes: cfg.Storage.StreamRetentionMaxBytes,
Interval: cfg.Storage.StreamRetentionInterval,
BatchSize: cfg.Storage.StreamRetentionCleanupBatch,
}
}

Expand Down
14 changes: 14 additions & 0 deletions internal/bootstrap/server/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func fixtureConfig() config.Config {

c.Storage.StreamRetention = 12 * time.Hour
c.Storage.StreamRetentionMaxBytes = 1 << 30
c.Storage.StreamRetentionInterval = 5 * time.Minute
c.Storage.StreamRetentionCleanupBatch = 12345
c.Storage.ChangeLogMode = "streams-only"

c.Metrics.HotspotBuckets = 32
Expand Down Expand Up @@ -103,6 +105,12 @@ func TestStorageOptions(t *testing.T) {
if opts.StreamRetention.Retention != 12*time.Hour {
t.Errorf("StreamRetention.Retention = %v", opts.StreamRetention.Retention)
}
if opts.StreamRetention.Interval != 5*time.Minute {
t.Errorf("StreamRetention.Interval = %v", opts.StreamRetention.Interval)
}
if opts.StreamRetention.BatchSize != 12345 {
t.Errorf("StreamRetention.BatchSize = %d", opts.StreamRetention.BatchSize)
}
if opts.ChangeLogMode != "streams-only" {
t.Errorf("ChangeLogMode = %q", opts.ChangeLogMode)
}
Expand Down Expand Up @@ -325,6 +333,12 @@ func TestStreamRetentionOptions(t *testing.T) {
if got.MaxBytes != 1<<30 {
t.Errorf("MaxBytes = %d", got.MaxBytes)
}
if got.Interval != 5*time.Minute {
t.Errorf("Interval = %v", got.Interval)
}
if got.BatchSize != 12345 {
t.Errorf("BatchSize = %d", got.BatchSize)
}
}

func TestParsePeers(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type Config struct {
BackpressureCriticalDelay time.Duration `yaml:"backpressureCriticalDelay"`
StreamRetention time.Duration `yaml:"streamRetention"`
StreamRetentionMaxBytes int64 `yaml:"streamRetentionMaxBytes"`
StreamRetentionInterval time.Duration `yaml:"streamRetentionInterval"`
StreamRetentionCleanupBatch int `yaml:"streamRetentionCleanupBatchSize"`
ChangeLogMode string `yaml:"changeLogMode"`
} `yaml:"storage"`
Cluster struct {
Expand Down Expand Up @@ -187,6 +189,8 @@ func Defaults() Config {
c.BackupScheduler.NameTemplate = "scheduled-{{timestamp}}"
c.Storage.Lanes = "auto"
c.Storage.StreamRetention = 24 * time.Hour
c.Storage.StreamRetentionInterval = 30 * time.Second
c.Storage.StreamRetentionCleanupBatch = 65536
c.Raft.HeartbeatTimeout = 2 * time.Second
c.Raft.ElectionTimeout = 10 * time.Second
c.Raft.LeaderLeaseTimeout = 2 * time.Second
Expand Down Expand Up @@ -331,6 +335,8 @@ func ApplyEnv(cfg *Config) error {
cfg.Storage.BackpressureCriticalDelay = dur("STORAGE_BACKPRESSURE_CRITICAL_DELAY", cfg.Storage.BackpressureCriticalDelay)
cfg.Storage.StreamRetention = dur("STORAGE_STREAM_RETENTION", cfg.Storage.StreamRetention)
cfg.Storage.StreamRetentionMaxBytes = integer64("STORAGE_STREAM_RETENTION_MAX_BYTES", cfg.Storage.StreamRetentionMaxBytes)
cfg.Storage.StreamRetentionInterval = dur("STORAGE_STREAM_RETENTION_INTERVAL", cfg.Storage.StreamRetentionInterval)
cfg.Storage.StreamRetentionCleanupBatch = integer("STORAGE_STREAM_RETENTION_CLEANUP_BATCH_SIZE", cfg.Storage.StreamRetentionCleanupBatch)
cfg.Storage.ChangeLogMode = str("STORAGE_CHANGELOG_MODE", cfg.Storage.ChangeLogMode)

cfg.Cluster.Shards = integer("CLUSTER_SHARDS", cfg.Cluster.Shards)
Expand Down
22 changes: 22 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func TestDefaultsPopulated(t *testing.T) {
if d.Storage.Lanes != "auto" {
t.Errorf("storage lanes default = %q", d.Storage.Lanes)
}
if d.Storage.StreamRetentionInterval != 30*time.Second {
t.Errorf("storage stream retention interval default = %v, want 30s", d.Storage.StreamRetentionInterval)
}
if d.Storage.StreamRetentionCleanupBatch != 65536 {
t.Errorf("storage stream retention cleanup batch default = %d, want 65536", d.Storage.StreamRetentionCleanupBatch)
}
}

func TestLoadFileMissingReturnsDefaults(t *testing.T) {
Expand Down Expand Up @@ -85,6 +91,8 @@ cluster:
n2: 10.0.0.2:9100
storage:
changeLogMode: streams-only
streamRetentionInterval: 5m
streamRetentionCleanupBatchSize: 12345
lanes: off
laneReadWorkers: 4
laneWriteWorkers: 3
Expand Down Expand Up @@ -154,6 +162,12 @@ backupScheduler:
if cfg.Storage.ChangeLogMode != "streams-only" {
t.Fatalf("storage changelog mode config not loaded: %+v", cfg.Storage)
}
if cfg.Storage.StreamRetentionInterval != 5*time.Minute {
t.Fatalf("storage stream retention interval config not loaded: %+v", cfg.Storage)
}
if cfg.Storage.StreamRetentionCleanupBatch != 12345 {
t.Fatalf("storage stream retention cleanup batch config not loaded: %+v", cfg.Storage)
}
if cfg.Storage.Lanes != "off" || cfg.Storage.LaneReadWorkers != 4 || cfg.Storage.LaneWriteWorkers != 3 || cfg.Storage.LaneReadQueue != 128 || cfg.Storage.LaneWriteQueue != 64 {
t.Fatalf("storage lanes config not loaded: %+v", cfg.Storage)
}
Expand Down Expand Up @@ -235,6 +249,8 @@ func TestApplyEnv(t *testing.T) {
t.Setenv("CEFAS_BACKUP_SCHEDULER_RETENTION_MAX_AGE", "168h")
t.Setenv("CEFAS_BACKUP_SCHEDULER_RETENTION_DRY_RUN", "true")
t.Setenv("CEFAS_STORAGE_CHANGELOG_MODE", "off")
t.Setenv("CEFAS_STORAGE_STREAM_RETENTION_INTERVAL", "10m")
t.Setenv("CEFAS_STORAGE_STREAM_RETENTION_CLEANUP_BATCH_SIZE", "54321")
t.Setenv("CEFAS_STORAGE_LANES", "on")
t.Setenv("CEFAS_STORAGE_LANE_READ_WORKERS", "5")
t.Setenv("CEFAS_STORAGE_LANE_WRITE_WORKERS", "4")
Expand Down Expand Up @@ -281,6 +297,12 @@ func TestApplyEnv(t *testing.T) {
if cfg.Storage.ChangeLogMode != "off" {
t.Errorf("storage changelog mode env not applied: %+v", cfg.Storage)
}
if cfg.Storage.StreamRetentionInterval != 10*time.Minute {
t.Errorf("storage stream retention interval env not applied: %+v", cfg.Storage)
}
if cfg.Storage.StreamRetentionCleanupBatch != 54321 {
t.Errorf("storage stream retention cleanup batch env not applied: %+v", cfg.Storage)
}
if cfg.Storage.Lanes != "on" || cfg.Storage.LaneReadWorkers != 5 || cfg.Storage.LaneWriteWorkers != 4 || cfg.Storage.LaneReadQueue != 256 || cfg.Storage.LaneWriteQueue != 128 {
t.Errorf("storage lanes env not applied: %+v", cfg.Storage)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/metrics/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func TestRunStorageCollectorExposesPebbleAndLeaderMetrics(t *testing.T) {

func TestRunStorageCollectorExposesStreamRetentionMetrics(t *testing.T) {
m := New()
db, err := pebble.Open(pebble.Options{Path: t.TempDir()})
db, err := pebble.Open(pebble.Options{
Path: t.TempDir(),
StreamRetention: pebble.StreamRetentionOptions{Retention: time.Millisecond},
})
if err != nil {
t.Fatal(err)
}
Expand All @@ -80,6 +83,7 @@ func TestRunStorageCollectorExposesStreamRetentionMetrics(t *testing.T) {
}, pebble.PutOptions{}); err != nil {
t.Fatal(err)
}
wait.For(5 * time.Millisecond)
if _, err := db.ApplyStreamRetention(td.Name, time.Now()); err != nil {
t.Fatal(err)
}
Expand Down
Loading
Loading