diff --git a/cmd/cefasdb/main.go b/cmd/cefasdb/main.go index 307e4b6..2ed3dcc 100644 --- a/cmd/cefasdb/main.go +++ b/cmd/cefasdb/main.go @@ -94,20 +94,22 @@ func main() { storageLaneWriteQueue = flag.Int("storage-lane-write-queue", 0, "Write lane queue capacity. 0 inherits default.") // Adaptive write backpressure. - backpressureEnabled = flag.Bool("storage-backpressure", false, "Enable write backpressure from Pebble pressure metrics.") - backpressureReject = flag.Bool("storage-backpressure-reject-critical", false, "Reject writes instead of only sleeping when pressure is critical.") - backpressureWarnL0 = flag.Int64("storage-backpressure-warning-l0-files", 0, "Warning L0 file threshold. 0 uses default.") - backpressureCriticalL0 = flag.Int64("storage-backpressure-critical-l0-files", 0, "Critical L0 file threshold. 0 uses default.") - backpressureWarnDebt = flag.Uint64("storage-backpressure-warning-debt", 0, "Warning compaction debt threshold in bytes. 0 uses default.") - backpressureCriticalDebt = flag.Uint64("storage-backpressure-critical-debt", 0, "Critical compaction debt threshold in bytes. 0 uses default.") - backpressureWarnReadAmp = flag.Int("storage-backpressure-warning-read-amp", 0, "Warning Pebble read amplification threshold. 0 uses default.") - backpressureCritReadAmp = flag.Int("storage-backpressure-critical-read-amp", 0, "Critical Pebble read amplification threshold. 0 uses default.") - backpressureWarnDelay = flag.Duration("storage-backpressure-warning-delay", 0, "Delay applied to writes in warning state. 0 uses default.") - backpressureCritDelay = flag.Duration("storage-backpressure-critical-delay", 0, "Delay applied to writes in critical state. 0 uses default.") - streamRetention = flag.Duration("storage-stream-retention", 0, "DynamoDB Streams retention window. 0 inherits config/default 24h.") - streamRetentionMaxBytes = flag.Int64("storage-stream-retention-max-bytes", 0, "Maximum logical DynamoDB Streams retained bytes per table. 0 disables byte cap.") - storageChangeLogMode = flag.String("storage-changelog-mode", "", "Physical changelog mode: always, streams-only, or off. Empty inherits config/default.") - storageAdaptiveMode = flag.Bool("storage-adaptive-mode", false, "Enable workload-mode adaptive tuner (read/write ratio observer adjusts commitLoop merge cap and retention interval). Off by default.") + backpressureEnabled = flag.Bool("storage-backpressure", false, "Enable write backpressure from Pebble pressure metrics.") + backpressureReject = flag.Bool("storage-backpressure-reject-critical", false, "Reject writes instead of only sleeping when pressure is critical.") + backpressureWarnL0 = flag.Int64("storage-backpressure-warning-l0-files", 0, "Warning L0 file threshold. 0 uses default.") + backpressureCriticalL0 = flag.Int64("storage-backpressure-critical-l0-files", 0, "Critical L0 file threshold. 0 uses default.") + backpressureWarnDebt = flag.Uint64("storage-backpressure-warning-debt", 0, "Warning compaction debt threshold in bytes. 0 uses default.") + backpressureCriticalDebt = flag.Uint64("storage-backpressure-critical-debt", 0, "Critical compaction debt threshold in bytes. 0 uses default.") + backpressureWarnReadAmp = flag.Int("storage-backpressure-warning-read-amp", 0, "Warning Pebble read amplification threshold. 0 uses default.") + backpressureCritReadAmp = flag.Int("storage-backpressure-critical-read-amp", 0, "Critical Pebble read amplification threshold. 0 uses default.") + backpressureWarnDelay = flag.Duration("storage-backpressure-warning-delay", 0, "Delay applied to writes in warning state. 0 uses default.") + backpressureCritDelay = flag.Duration("storage-backpressure-critical-delay", 0, "Delay applied to writes in critical state. 0 uses default.") + streamRetention = flag.Duration("storage-stream-retention", 0, "DynamoDB Streams retention window. 0 inherits config/default 24h.") + streamRetentionInterval = flag.Duration("storage-stream-retention-interval", 0, "Background CDC retention cleanup interval. Positive enables; negative disables. 0 inherits config/default.") + streamRetentionMaxBytes = flag.Int64("storage-stream-retention-max-bytes", 0, "Deprecated compatibility knob; physical CDC retention is time-window based.") + streamRetentionCleanupBatch = flag.Int("storage-stream-retention-cleanup-batch-size", 0, "Maximum expired CDC records removed per cleanup tick. 0 inherits config/default.") + storageChangeLogMode = flag.String("storage-changelog-mode", "", "Physical changelog mode: always, streams-only, or off. Empty inherits config/default.") + storageAdaptiveMode = flag.Bool("storage-adaptive-mode", false, "Enable workload-mode adaptive tuner (read/write ratio observer adjusts commitLoop merge cap and retention interval). Off by default.") // Identity/auth flags. Empty -identity-jwks-url keeps the // server open (single-node dev mode). @@ -200,7 +202,7 @@ func main() { *backpressureEnabled, *backpressureReject, *backpressureWarnL0, *backpressureCriticalL0, *backpressureWarnDebt, *backpressureCriticalDebt, *backpressureWarnReadAmp, *backpressureCritReadAmp, *backpressureWarnDelay, *backpressureCritDelay, - *streamRetention, *streamRetentionMaxBytes, *storageChangeLogMode, + *streamRetention, *streamRetentionInterval, *streamRetentionMaxBytes, *streamRetentionCleanupBatch, *storageChangeLogMode, *identityJwks, *identityIssuer, *identityAudience, *identityClockSkew, *shardsN, *replicationFactor, *muxAddr, *grpcAddr, *grpcReflection, *tlsCert, *tlsKey, *mtlsCA, @@ -368,7 +370,7 @@ func main() { // for any stream-enabled table that declares // StreamSpecification.RetentionSeconds. Zero means "inherit // cluster default". - db.AttachStreamRetentionResolver(func(table string) int64 { + streamRetentionResolver := func(table string) int64 { if cat == nil { return 0 } @@ -377,7 +379,15 @@ func main() { return 0 } return td.StreamSpecification.RetentionSeconds - }) + } + db.AttachStreamRetentionResolver(streamRetentionResolver) + if mgr != nil { + for _, sh := range mgr.Shards() { + if sh != nil && sh.Storage != nil { + sh.Storage.AttachStreamRetentionResolver(streamRetentionResolver) + } + } + } var raftStore *pebble.DB if mgr == nil && cfg.Raft.Bind != "" { diff --git a/dist/helm/cefas/templates/configmap.yaml b/dist/helm/cefas/templates/configmap.yaml index e091960..d84008c 100644 --- a/dist/helm/cefas/templates/configmap.yaml +++ b/dist/helm/cefas/templates/configmap.yaml @@ -18,6 +18,8 @@ data: reflection: true storage: fsyncOnCommit: {{ .Values.cluster.fsyncOnCommit }} + streamRetentionInterval: {{ .Values.storage.streamRetentionInterval | quote }} + streamRetentionCleanupBatchSize: {{ .Values.storage.streamRetentionCleanupBatchSize }} cluster: shards: {{ .Values.cluster.shards }} replicationFactor: {{ include "cefas.resolvedReplicationFactor" . }} diff --git a/dist/helm/cefas/values.yaml b/dist/helm/cefas/values.yaml index af6585e..ac8d4eb 100644 --- a/dist/helm/cefas/values.yaml +++ b/dist/helm/cefas/values.yaml @@ -110,6 +110,11 @@ cluster: bootstrap: true fsyncOnCommit: false +storage: + # Physical CDC retention cleanup. Negative interval disables the cleaner. + streamRetentionInterval: 30s + streamRetentionCleanupBatchSize: 65536 + # Guards each raft-id with a persistent lease so a stale process cannot # keep serving after Kubernetes starts a replacement with the same id. raftIdentity: diff --git a/internal/bootstrap/server/flags.go b/internal/bootstrap/server/flags.go index 1f5f139..00c283f 100644 --- a/internal/bootstrap/server/flags.go +++ b/internal/bootstrap/server/flags.go @@ -33,7 +33,8 @@ func OverlayFlags( backpressureWarnDebt, backpressureCriticalDebt uint64, backpressureWarnReadAmp, backpressureCriticalReadAmp int, backpressureWarnDelay, backpressureCriticalDelay time.Duration, - streamRetention time.Duration, streamRetentionMaxBytes int64, + streamRetention, streamRetentionInterval time.Duration, streamRetentionMaxBytes int64, + streamRetentionCleanupBatch int, storageChangeLogMode string, identityJwks, identityIssuer, identityAudience string, identityClockSkew time.Duration, shardsN, replicationFactor int, muxAddr string, @@ -200,9 +201,15 @@ func OverlayFlags( if streamRetention > 0 { cfg.Storage.StreamRetention = streamRetention } + if streamRetentionInterval != 0 { + cfg.Storage.StreamRetentionInterval = streamRetentionInterval + } if streamRetentionMaxBytes > 0 { cfg.Storage.StreamRetentionMaxBytes = streamRetentionMaxBytes } + if streamRetentionCleanupBatch > 0 { + cfg.Storage.StreamRetentionCleanupBatch = streamRetentionCleanupBatch + } if storageChangeLogMode != "" { cfg.Storage.ChangeLogMode = storageChangeLogMode } diff --git a/internal/bootstrap/server/flags_test.go b/internal/bootstrap/server/flags_test.go index 049d114..1a46bfb 100644 --- a/internal/bootstrap/server/flags_test.go +++ b/internal/bootstrap/server/flags_test.go @@ -90,9 +90,11 @@ type overlayArgs struct { backpressureWarnReadAmp, backpressureCritRA int backpressureWarnDelay, backpressureCritDelay time.Duration - streamRetention time.Duration - streamRetentionMaxBytes int64 - storageChangeLogMode string + streamRetention time.Duration + streamRetentionInterval time.Duration + streamRetentionMaxBytes int64 + streamRetentionCleanupBatch int + storageChangeLogMode string identityJwks, identityIssuer, identityAudience string identityClockSkew time.Duration @@ -157,7 +159,8 @@ func runOverlay(cfg *config.Config, a overlayArgs) { a.backpressureWarnDebt, a.backpressureCriticalDbt, a.backpressureWarnReadAmp, a.backpressureCritRA, a.backpressureWarnDelay, a.backpressureCritDelay, - a.streamRetention, a.streamRetentionMaxBytes, a.storageChangeLogMode, + a.streamRetention, a.streamRetentionInterval, a.streamRetentionMaxBytes, + a.streamRetentionCleanupBatch, a.storageChangeLogMode, a.identityJwks, a.identityIssuer, a.identityAudience, a.identityClockSkew, a.shardsN, a.replicationFactor, a.muxAddr, a.grpcAddr, a.grpcRefl, a.tlsCert, a.tlsKey, a.mCA, @@ -390,7 +393,9 @@ func TestOverlayFlags_RetentionAndBackup(t *testing.T) { cfg := baseCfg() args := zeroArgs() args.streamRetention = 12 * time.Hour + args.streamRetentionInterval = 5 * time.Minute args.streamRetentionMaxBytes = 256 << 20 + args.streamRetentionCleanupBatch = 12345 args.storageChangeLogMode = "streams-only" args.backupSchedulerEnabled = true args.backupSchedulerInterval = 2 * time.Hour @@ -407,6 +412,12 @@ func TestOverlayFlags_RetentionAndBackup(t *testing.T) { if cfg.Storage.StreamRetentionMaxBytes != 256<<20 { t.Errorf("StreamRetentionMaxBytes = %d", cfg.Storage.StreamRetentionMaxBytes) } + if cfg.Storage.StreamRetentionInterval != 5*time.Minute { + t.Errorf("StreamRetentionInterval = %v", cfg.Storage.StreamRetentionInterval) + } + if cfg.Storage.StreamRetentionCleanupBatch != 12345 { + t.Errorf("StreamRetentionCleanupBatch = %d", cfg.Storage.StreamRetentionCleanupBatch) + } if cfg.Storage.ChangeLogMode != "streams-only" { t.Errorf("ChangeLogMode = %q", cfg.Storage.ChangeLogMode) } diff --git a/internal/bootstrap/server/options.go b/internal/bootstrap/server/options.go index dee035a..c236ffc 100644 --- a/internal/bootstrap/server/options.go +++ b/internal/bootstrap/server/options.go @@ -137,6 +137,8 @@ func StreamRetentionOptions(cfg config.Config) pebble.StreamRetentionOptions { return pebble.StreamRetentionOptions{ Retention: cfg.Storage.StreamRetention, MaxBytes: cfg.Storage.StreamRetentionMaxBytes, + Interval: cfg.Storage.StreamRetentionInterval, + BatchSize: cfg.Storage.StreamRetentionCleanupBatch, } } diff --git a/internal/bootstrap/server/options_test.go b/internal/bootstrap/server/options_test.go index e1d2b53..7ba5eb6 100644 --- a/internal/bootstrap/server/options_test.go +++ b/internal/bootstrap/server/options_test.go @@ -45,6 +45,8 @@ func fixtureConfig() config.Config { c.Storage.StreamRetention = 12 * time.Hour c.Storage.StreamRetentionMaxBytes = 1 << 30 + c.Storage.StreamRetentionInterval = 5 * time.Minute + c.Storage.StreamRetentionCleanupBatch = 12345 c.Storage.ChangeLogMode = "streams-only" c.Metrics.HotspotBuckets = 32 @@ -103,6 +105,12 @@ func TestStorageOptions(t *testing.T) { if opts.StreamRetention.Retention != 12*time.Hour { t.Errorf("StreamRetention.Retention = %v", opts.StreamRetention.Retention) } + if opts.StreamRetention.Interval != 5*time.Minute { + t.Errorf("StreamRetention.Interval = %v", opts.StreamRetention.Interval) + } + if opts.StreamRetention.BatchSize != 12345 { + t.Errorf("StreamRetention.BatchSize = %d", opts.StreamRetention.BatchSize) + } if opts.ChangeLogMode != "streams-only" { t.Errorf("ChangeLogMode = %q", opts.ChangeLogMode) } @@ -325,6 +333,12 @@ func TestStreamRetentionOptions(t *testing.T) { if got.MaxBytes != 1<<30 { t.Errorf("MaxBytes = %d", got.MaxBytes) } + if got.Interval != 5*time.Minute { + t.Errorf("Interval = %v", got.Interval) + } + if got.BatchSize != 12345 { + t.Errorf("BatchSize = %d", got.BatchSize) + } } func TestParsePeers(t *testing.T) { diff --git a/internal/config/config.go b/internal/config/config.go index de7c88a..e2197a6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -68,6 +68,8 @@ type Config struct { BackpressureCriticalDelay time.Duration `yaml:"backpressureCriticalDelay"` StreamRetention time.Duration `yaml:"streamRetention"` StreamRetentionMaxBytes int64 `yaml:"streamRetentionMaxBytes"` + StreamRetentionInterval time.Duration `yaml:"streamRetentionInterval"` + StreamRetentionCleanupBatch int `yaml:"streamRetentionCleanupBatchSize"` ChangeLogMode string `yaml:"changeLogMode"` } `yaml:"storage"` Cluster struct { @@ -187,6 +189,8 @@ func Defaults() Config { c.BackupScheduler.NameTemplate = "scheduled-{{timestamp}}" c.Storage.Lanes = "auto" c.Storage.StreamRetention = 24 * time.Hour + c.Storage.StreamRetentionInterval = 30 * time.Second + c.Storage.StreamRetentionCleanupBatch = 65536 c.Raft.HeartbeatTimeout = 2 * time.Second c.Raft.ElectionTimeout = 10 * time.Second c.Raft.LeaderLeaseTimeout = 2 * time.Second @@ -331,6 +335,8 @@ func ApplyEnv(cfg *Config) error { cfg.Storage.BackpressureCriticalDelay = dur("STORAGE_BACKPRESSURE_CRITICAL_DELAY", cfg.Storage.BackpressureCriticalDelay) cfg.Storage.StreamRetention = dur("STORAGE_STREAM_RETENTION", cfg.Storage.StreamRetention) cfg.Storage.StreamRetentionMaxBytes = integer64("STORAGE_STREAM_RETENTION_MAX_BYTES", cfg.Storage.StreamRetentionMaxBytes) + cfg.Storage.StreamRetentionInterval = dur("STORAGE_STREAM_RETENTION_INTERVAL", cfg.Storage.StreamRetentionInterval) + cfg.Storage.StreamRetentionCleanupBatch = integer("STORAGE_STREAM_RETENTION_CLEANUP_BATCH_SIZE", cfg.Storage.StreamRetentionCleanupBatch) cfg.Storage.ChangeLogMode = str("STORAGE_CHANGELOG_MODE", cfg.Storage.ChangeLogMode) cfg.Cluster.Shards = integer("CLUSTER_SHARDS", cfg.Cluster.Shards) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index e94a856..40edede 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -53,6 +53,12 @@ func TestDefaultsPopulated(t *testing.T) { if d.Storage.Lanes != "auto" { t.Errorf("storage lanes default = %q", d.Storage.Lanes) } + if d.Storage.StreamRetentionInterval != 30*time.Second { + t.Errorf("storage stream retention interval default = %v, want 30s", d.Storage.StreamRetentionInterval) + } + if d.Storage.StreamRetentionCleanupBatch != 65536 { + t.Errorf("storage stream retention cleanup batch default = %d, want 65536", d.Storage.StreamRetentionCleanupBatch) + } } func TestLoadFileMissingReturnsDefaults(t *testing.T) { @@ -85,6 +91,8 @@ cluster: n2: 10.0.0.2:9100 storage: changeLogMode: streams-only + streamRetentionInterval: 5m + streamRetentionCleanupBatchSize: 12345 lanes: off laneReadWorkers: 4 laneWriteWorkers: 3 @@ -154,6 +162,12 @@ backupScheduler: if cfg.Storage.ChangeLogMode != "streams-only" { t.Fatalf("storage changelog mode config not loaded: %+v", cfg.Storage) } + if cfg.Storage.StreamRetentionInterval != 5*time.Minute { + t.Fatalf("storage stream retention interval config not loaded: %+v", cfg.Storage) + } + if cfg.Storage.StreamRetentionCleanupBatch != 12345 { + t.Fatalf("storage stream retention cleanup batch config not loaded: %+v", cfg.Storage) + } if cfg.Storage.Lanes != "off" || cfg.Storage.LaneReadWorkers != 4 || cfg.Storage.LaneWriteWorkers != 3 || cfg.Storage.LaneReadQueue != 128 || cfg.Storage.LaneWriteQueue != 64 { t.Fatalf("storage lanes config not loaded: %+v", cfg.Storage) } @@ -235,6 +249,8 @@ func TestApplyEnv(t *testing.T) { t.Setenv("CEFAS_BACKUP_SCHEDULER_RETENTION_MAX_AGE", "168h") t.Setenv("CEFAS_BACKUP_SCHEDULER_RETENTION_DRY_RUN", "true") t.Setenv("CEFAS_STORAGE_CHANGELOG_MODE", "off") + t.Setenv("CEFAS_STORAGE_STREAM_RETENTION_INTERVAL", "10m") + t.Setenv("CEFAS_STORAGE_STREAM_RETENTION_CLEANUP_BATCH_SIZE", "54321") t.Setenv("CEFAS_STORAGE_LANES", "on") t.Setenv("CEFAS_STORAGE_LANE_READ_WORKERS", "5") t.Setenv("CEFAS_STORAGE_LANE_WRITE_WORKERS", "4") @@ -281,6 +297,12 @@ func TestApplyEnv(t *testing.T) { if cfg.Storage.ChangeLogMode != "off" { t.Errorf("storage changelog mode env not applied: %+v", cfg.Storage) } + if cfg.Storage.StreamRetentionInterval != 10*time.Minute { + t.Errorf("storage stream retention interval env not applied: %+v", cfg.Storage) + } + if cfg.Storage.StreamRetentionCleanupBatch != 54321 { + t.Errorf("storage stream retention cleanup batch env not applied: %+v", cfg.Storage) + } if cfg.Storage.Lanes != "on" || cfg.Storage.LaneReadWorkers != 5 || cfg.Storage.LaneWriteWorkers != 4 || cfg.Storage.LaneReadQueue != 256 || cfg.Storage.LaneWriteQueue != 128 { t.Errorf("storage lanes env not applied: %+v", cfg.Storage) } diff --git a/internal/metrics/collector_test.go b/internal/metrics/collector_test.go index f912489..95f9899 100644 --- a/internal/metrics/collector_test.go +++ b/internal/metrics/collector_test.go @@ -62,7 +62,10 @@ func TestRunStorageCollectorExposesPebbleAndLeaderMetrics(t *testing.T) { func TestRunStorageCollectorExposesStreamRetentionMetrics(t *testing.T) { m := New() - db, err := pebble.Open(pebble.Options{Path: t.TempDir()}) + db, err := pebble.Open(pebble.Options{ + Path: t.TempDir(), + StreamRetention: pebble.StreamRetentionOptions{Retention: time.Millisecond}, + }) if err != nil { t.Fatal(err) } @@ -80,6 +83,7 @@ func TestRunStorageCollectorExposesStreamRetentionMetrics(t *testing.T) { }, pebble.PutOptions{}); err != nil { t.Fatal(err) } + wait.For(5 * time.Millisecond) if _, err := db.ApplyStreamRetention(td.Name, time.Now()); err != nil { t.Fatal(err) } diff --git a/internal/storage/adapter/pebble/changelog.go b/internal/storage/adapter/pebble/changelog.go index 371e36c..521d5dc 100644 --- a/internal/storage/adapter/pebble/changelog.go +++ b/internal/storage/adapter/pebble/changelog.go @@ -57,9 +57,9 @@ type ChangeRecord struct { OpKind string `json:"opKind,omitempty"` } -// StreamRetentionStats is the persisted logical retention state for one table -// stream. OldestSequence is the first readable stream sequence. Sequences below -// it are considered trimmed even though the physical changelog remains for PITR. +// StreamRetentionStats is the persisted retention state for one table stream. +// OldestSequence is the first readable stream sequence. Sequences below it were +// physically removed by the CDC retention cleaner and are considered trimmed. type StreamRetentionStats struct { Table string `json:"table"` OldestSequence uint64 `json:"oldestSequence,omitempty"` @@ -70,11 +70,7 @@ type StreamRetentionStats struct { LastTrimUnixNano int64 `json:"lastTrimUnixNano,omitempty"` } -type streamRetentionRecord struct { - Index uint64 - UnixNano int64 - Bytes int64 -} +const maxUnixNano = int64(1<<63 - 1) func (d *DB) shouldAppendChangeRecord(td types.TableDescriptor) bool { switch d.changeLogMode { @@ -105,21 +101,51 @@ func (d *DB) appendChangeRecord(b *pebbledb.Batch, rec ChangeRecord) (ChangeReco if err != nil { return rec, fmt.Errorf("encode change record: %w", err) } - // The persisted index lives implicitly in the largest KeyChangeLog - // suffix. seedChangeIndex recovers from that scan on Open, so the - // old hot-key write of storage.ChangeCounterKey is unnecessary and - // only added one rewrite of the same key per mutation. Old - // deployments still keep the key on disk; loadPersistedChangeIndex - // reads it for forward compatibility and the max with the scan wins. if err := b.Set(storage.KeyChangeLog(rec.Index), raw, nil); err != nil { return rec, err } + if err := d.persistChangeHighWater(b, rec.Index); err != nil { + return rec, err + } + if expireUnixNano, ok := d.changeLogExpireUnixNano(rec); ok { + if err := b.Set(storage.KeyChangeLogExpiration(expireUnixNano, rec.Index), nil, nil); err != nil { + return rec, err + } + } if rec.StreamRecord { d.trackStreamTable(rec.Table) } return rec, nil } +func (d *DB) persistChangeHighWater(b *pebbledb.Batch, index uint64) error { + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], index) + return b.Set(storage.ChangeCounterKey, buf[:], nil) +} + +func (d *DB) streamRetentionForTable(table string) time.Duration { + retention := d.streamRetention.Retention + if d.streamRetentionResolver != nil { + if secs := d.streamRetentionResolver(table); secs > 0 { + retention = time.Duration(secs) * time.Second + } + } + return retention +} + +func (d *DB) changeLogExpireUnixNano(rec ChangeRecord) (int64, bool) { + retention := d.streamRetentionForTable(rec.Table) + if retention <= 0 || rec.UnixNano <= 0 { + return 0, false + } + delta := int64(retention) + if rec.UnixNano > maxUnixNano-delta { + return maxUnixNano, true + } + return rec.UnixNano + delta, true +} + func newChangeRecord(td types.TableDescriptor, op ChangeOp, key, oldItem, newItem types.Item) ChangeRecord { rec := ChangeRecord{ Op: op, @@ -521,9 +547,9 @@ func (d *DB) CurrentChangeIndex() (uint64, error) { return d.changeIndex.Load(), nil } -// ApplyStreamRetention advances the logical stream trim point for table using -// the configured retention policy. It preserves physical changelog entries so -// PITR and backups keep seeing the full change history. +// ApplyStreamRetention runs one bounded physical CDC retention pass, then +// returns the persisted trim state for table. It is safe to call explicitly; +// the background loop uses the same cleaner path. func (d *DB) ApplyStreamRetention(table string, now time.Time) (StreamRetentionStats, error) { if table == "" { return StreamRetentionStats{}, fmt.Errorf("stream retention table required") @@ -531,53 +557,48 @@ func (d *DB) ApplyStreamRetention(table string, now time.Time) (StreamRetentionS if now.IsZero() { now = time.Now() } - b := d.Batch() - defer b.Close() - stats, err := d.applyStreamRetentionLocked(b, table, now, nil) - if err != nil { - return StreamRetentionStats{}, err - } - if err := d.CommitBatch(b); err != nil { + if _, err := d.applyExpiredChangeLogRetention(now); err != nil { return StreamRetentionStats{}, err } - return stats, nil + return d.StreamRetentionStats(table) } // StreamRetentionStats returns the latest persisted logical retention state. -// If the state is missing (for stores created before Streams retention), it is -// reconstructed from the preserved changelog without mutating storage. +// Missing state means no records have been physically trimmed for the table +// yet; callers must not backfill by scanning the full changelog. func (d *DB) StreamRetentionStats(table string) (StreamRetentionStats, error) { if table == "" { return StreamRetentionStats{}, fmt.Errorf("stream retention table required") } if stats, ok, err := d.loadStreamRetentionState(table); err != nil || ok { - return stats, err + if err != nil { + return stats, err + } + current, currentErr := d.CurrentChangeIndex() + if currentErr != nil { + return StreamRetentionStats{}, currentErr + } + if current > stats.NewestSequence { + stats.NewestSequence = current + } + return stats, nil } - records, err := d.scanStreamRetentionRecords(table, nil) + current, err := d.CurrentChangeIndex() if err != nil { return StreamRetentionStats{}, err } - return d.computeStreamRetentionStats(table, records, StreamRetentionStats{}, time.Now()), nil + return StreamRetentionStats{Table: table, NewestSequence: current}, nil } -// PreviewStreamRetention computes the trim state as of now without writing it. -// Read paths use this so stream polling stays safe on Raft followers. +// PreviewStreamRetention returns the persisted trim state without doing cleanup. +// Read paths intentionally avoid retention work so GetRecords/GetShardIterator +// cannot turn into a changelog scan under load. func (d *DB) PreviewStreamRetention(table string, now time.Time) (StreamRetentionStats, error) { if table == "" { return StreamRetentionStats{}, fmt.Errorf("stream retention table required") } - if now.IsZero() { - now = time.Now() - } - previous, _, err := d.loadStreamRetentionState(table) - if err != nil { - return StreamRetentionStats{}, err - } - records, err := d.scanStreamRetentionRecords(table, nil) - if err != nil { - return StreamRetentionStats{}, err - } - return d.computeStreamRetentionStats(table, records, previous, now), nil + _ = now + return d.StreamRetentionStats(table) } // ListStreamRetentionStats returns every persisted table stream retention @@ -604,26 +625,6 @@ func (d *DB) ListStreamRetentionStats() ([]StreamRetentionStats, error) { return out, nil } -func (d *DB) applyStreamRetentionLocked(b *pebbledb.Batch, table string, now time.Time, extra *ChangeRecord) (StreamRetentionStats, error) { - previous, _, err := d.loadStreamRetentionState(table) - if err != nil { - return StreamRetentionStats{}, err - } - records, err := d.scanStreamRetentionRecords(table, extra) - if err != nil { - return StreamRetentionStats{}, err - } - stats := d.computeStreamRetentionStats(table, records, previous, now) - raw, err := json.Marshal(stats) - if err != nil { - return StreamRetentionStats{}, fmt.Errorf("marshal stream retention state: %w", err) - } - if err := b.Set(storage.KeyStreamRetention(table), raw, nil); err != nil { - return StreamRetentionStats{}, err - } - return stats, nil -} - func (d *DB) loadStreamRetentionState(table string) (StreamRetentionStats, bool, error) { raw, err := d.Get(storage.KeyStreamRetention(table)) if errors.Is(err, ErrNotFound) { @@ -639,116 +640,149 @@ func (d *DB) loadStreamRetentionState(table string) (StreamRetentionStats, bool, return stats, true, nil } -func (d *DB) scanStreamRetentionRecords(table string, extra *ChangeRecord) ([]streamRetentionRecord, error) { - lower, upper := storage.PrefixChangeLog() +type expiredChangeLogEntry struct { + expireKey []byte + index uint64 + rawBytes int64 + record ChangeRecord + hasRecord bool +} + +type streamTrimUpdate struct { + trimmed uint64 + newestDeleted uint64 + bytesDeleted int64 +} + +func (d *DB) applyExpiredChangeLogRetention(now time.Time) (int, error) { + if now.IsZero() { + now = time.Now() + } + if d.repl != nil && !d.repl.IsLeader() { + return 0, nil + } + before := now.UnixNano() + if before < maxUnixNano { + before++ + } + lower, upper := storage.PrefixChangeLogExpirationBefore(before) it, err := d.Iter(lower, upper) if err != nil { - return nil, err + return 0, err } defer it.Close() - var records []streamRetentionRecord - for valid := it.First(); valid; valid = it.Next() { - rec, err := decodeChangeRecord(it.Value()) - if err != nil { - return nil, fmt.Errorf("decode change record at %x: %w", it.Key(), err) + + limit := d.streamRetention.BatchSize + if limit <= 0 { + limit = DefaultStreamRetentionCleanupBatch + } + expired := make([]expiredChangeLogEntry, 0, minInt(limit, 1024)) + for valid := it.First(); valid && len(expired) < limit; valid = it.Next() { + expireKey := append([]byte(nil), it.Key()...) + _, index, ok := storage.ParseChangeLogExpirationKey(expireKey) + if !ok { + continue + } + entry := expiredChangeLogEntry{ + expireKey: expireKey, + index: index, } - if rec.Table == table && rec.StreamRecord { - records = append(records, retentionRecordFromChange(rec)) + raw, err := d.getNoLane(storage.KeyChangeLog(index)) + if err != nil && !errors.Is(err, ErrNotFound) { + return 0, err } + if raw != nil { + rec, err := decodeChangeRecord(raw) + if err != nil { + return 0, fmt.Errorf("decode expired change record at index %d: %w", index, err) + } + entry.record = rec + entry.rawBytes = int64(len(raw)) + entry.hasRecord = true + } + expired = append(expired, entry) } if err := it.Error(); err != nil { - return nil, err + return 0, err } - if extra != nil && extra.Table == table && extra.StreamRecord { - records = append(records, retentionRecordFromChange(*extra)) + if len(expired) == 0 { + return 0, nil } - return records, nil -} -func retentionRecordFromChange(rec ChangeRecord) streamRetentionRecord { - size := rec.SizeBytes - if size <= 0 { - size = approximateChangeRecordSize(rec) - } - return streamRetentionRecord{ - Index: rec.Index, - UnixNano: rec.UnixNano, - Bytes: size, + b := d.Batch() + defer b.Close() + updates := map[string]streamTrimUpdate{} + for _, entry := range expired { + if err := b.Delete(entry.expireKey, nil); err != nil { + return 0, err + } + if !entry.hasRecord { + continue + } + if err := b.Delete(storage.KeyChangeLog(entry.index), nil); err != nil { + return 0, err + } + if entry.record.StreamRecord && entry.record.Table != "" { + u := updates[entry.record.Table] + u.trimmed++ + if entry.record.Index > u.newestDeleted { + u.newestDeleted = entry.record.Index + } + u.bytesDeleted += entry.rawBytes + updates[entry.record.Table] = u + } } -} - -func (d *DB) computeStreamRetentionStats(table string, records []streamRetentionRecord, previous StreamRetentionStats, now time.Time) StreamRetentionStats { - stats := StreamRetentionStats{ - Table: table, - RecordsTrimmed: previous.RecordsTrimmed, - LastTrimUnixNano: previous.LastTrimUnixNano, + if err := d.persistStreamTrimUpdates(b, updates, now); err != nil { + return 0, err } - if len(records) == 0 { - return stats + if err := d.CommitBatch(b); err != nil { + return 0, err } + return len(expired), nil +} - stats.RecordsAppended = uint64(len(records)) - stats.NewestSequence = records[len(records)-1].Index - - start := 0 - retention := d.streamRetention.Retention - // Per-table override from #521. The resolver returns the - // StreamSpecification.RetentionSeconds for table or 0 when no - // override is set; non-zero replaces the cluster default. - if d.streamRetentionResolver != nil { - if secs := d.streamRetentionResolver(table); secs > 0 { - retention = time.Duration(secs) * time.Second +func (d *DB) persistStreamTrimUpdates(b *pebbledb.Batch, updates map[string]streamTrimUpdate, now time.Time) error { + for table, update := range updates { + if update.trimmed == 0 { + continue } - } - if retention > 0 { - cutoff := now.Add(-retention).UnixNano() - for start < len(records) && records[start].UnixNano < cutoff { - start++ + stats, _, err := d.loadStreamRetentionState(table) + if err != nil { + return err } - } - if d.streamRetention.MaxBytes > 0 && start < len(records) { - byteStart := len(records) - var retained int64 - for i := len(records) - 1; i >= start; i-- { - if byteStart < len(records) && retained+records[i].Bytes > d.streamRetention.MaxBytes { - break - } - retained += records[i].Bytes - byteStart = i + stats.Table = table + if floor := update.newestDeleted + 1; floor > stats.OldestSequence { + stats.OldestSequence = floor } - if byteStart == len(records) { - byteStart = len(records) - 1 + if update.newestDeleted > stats.NewestSequence { + stats.NewestSequence = update.newestDeleted } - if byteStart > start { - start = byteStart + stats.RecordsTrimmed += update.trimmed + if stats.RecordsAppended < stats.RecordsTrimmed { + stats.RecordsAppended = stats.RecordsTrimmed } - } - if previous.OldestSequence > 0 { - for start < len(records) && records[start].Index < previous.OldestSequence { - start++ + if update.bytesDeleted >= stats.RetainedBytes { + stats.RetainedBytes = 0 + } else { + stats.RetainedBytes -= update.bytesDeleted } - } - - if start < len(records) { - stats.OldestSequence = records[start].Index - for _, rec := range records[start:] { - stats.RetainedBytes += rec.Bytes + stats.LastTrimUnixNano = now.UnixNano() + raw, err := json.Marshal(stats) + if err != nil { + return fmt.Errorf("marshal stream retention state: %w", err) } - } else { - stats.OldestSequence = stats.NewestSequence + 1 - } - - var trimmed uint64 - for _, rec := range records { - if rec.Index < stats.OldestSequence { - trimmed++ + if err := b.Set(storage.KeyStreamRetention(table), raw, nil); err != nil { + return err } } - if trimmed > stats.RecordsTrimmed { - stats.RecordsTrimmed = trimmed - stats.LastTrimUnixNano = now.UnixNano() + return nil +} + +func minInt(a, b int) int { + if a < b { + return a } - return stats + return b } // ChangeRecordsAfter exposes changeRecordsAfter for FAST refresh diff --git a/internal/storage/adapter/pebble/changelog_stream_test.go b/internal/storage/adapter/pebble/changelog_stream_test.go index df320b6..ff41d47 100644 --- a/internal/storage/adapter/pebble/changelog_stream_test.go +++ b/internal/storage/adapter/pebble/changelog_stream_test.go @@ -61,9 +61,6 @@ func appendStreamChangeAt(t *testing.T, db *DB, td types.TableDescriptor, id str if err := db.CommitBatch(b); err != nil { t.Fatalf("commit change: %v", err) } - if _, err := db.ApplyStreamRetention(td.Name, ts); err != nil { - t.Fatalf("apply retention: %v", err) - } } type streamCatalog struct { @@ -317,7 +314,7 @@ func TestTTLReaperEmitsRemoveStreamRecord(t *testing.T) { } } -func TestStreamRetentionTrimsOldRecordsLogically(t *testing.T) { +func TestStreamRetentionDeletesOldRecordsPhysically(t *testing.T) { db := openChangeLogTestDBWithOptions(t, Options{ Path: t.TempDir(), StreamRetention: StreamRetentionOptions{Retention: time.Hour}, @@ -333,9 +330,7 @@ func TestStreamRetentionTrimsOldRecordsLogically(t *testing.T) { } if stats.OldestSequence != 2 || stats.NewestSequence != 2 || - stats.RecordsAppended != 2 || - stats.RecordsTrimmed != 1 || - stats.RetainedBytes <= 0 { + stats.RecordsTrimmed != 1 { t.Fatalf("stats = %+v", stats) } @@ -354,8 +349,8 @@ func TestStreamRetentionTrimsOldRecordsLogically(t *testing.T) { if err != nil { t.Fatalf("pitr records: %v", err) } - if len(all) != 2 { - t.Fatalf("physical changelog records = %d, want 2", len(all)) + if len(all) != 1 { + t.Fatalf("physical changelog records = %d, want 1", len(all)) } } @@ -658,7 +653,7 @@ func TestStreamRetentionMetadataSurvivesRestart(t *testing.T) { } } -func TestStreamRetentionMaxBytesBoundsRetainedRecords(t *testing.T) { +func TestStreamRetentionMaxBytesDoesNotTriggerFullScan(t *testing.T) { db := openChangeLogTestDBWithOptions(t, Options{ Path: t.TempDir(), StreamRetention: StreamRetentionOptions{Retention: 24 * time.Hour, MaxBytes: 1}, @@ -672,7 +667,14 @@ func TestStreamRetentionMaxBytesBoundsRetainedRecords(t *testing.T) { if err != nil { t.Fatalf("retention: %v", err) } - if stats.OldestSequence != 2 || stats.RecordsTrimmed != 1 { - t.Fatalf("stats = %+v, want only newest retained under byte cap", stats) + if stats.RecordsTrimmed != 0 || stats.OldestSequence != 0 { + t.Fatalf("stats = %+v, want no byte-cap trim without expired records", stats) + } + records, _, err := db.StreamRecords(td.Name, 1, 0, 10, 0) + if err != nil { + t.Fatalf("stream records: %v", err) + } + if len(records) != 2 { + t.Fatalf("records = %d, want both records retained", len(records)) } } diff --git a/internal/storage/adapter/pebble/db.go b/internal/storage/adapter/pebble/db.go index 0de396b..eb811de 100644 --- a/internal/storage/adapter/pebble/db.go +++ b/internal/storage/adapter/pebble/db.go @@ -87,12 +87,13 @@ type Options struct { // Backpressure slows or rejects caller-facing writes when Pebble // LSM pressure crosses configured thresholds. Backpressure BackpressureOptions - // StreamRetention bounds the logical DynamoDB Streams retention - // window. The physical changelog is preserved for PITR/backup. + // StreamRetention bounds the DynamoDB Streams / CDC retention window. + // Expired changelog records are physically removed by the bounded cleanup + // loop; PITR and FAST materialized views depend on the retained window. StreamRetention StreamRetentionOptions - // ChangeLogMode controls physical changelog writes. "always" preserves - // PITR/backup records for every write; "streams-only" only writes records - // for stream-enabled tables; "off" disables changelog writes. + // ChangeLogMode controls physical changelog writes. "always" writes + // retention-bound PITR/backup records for every write; "streams-only" only + // writes records for stream-enabled tables; "off" disables changelog writes. ChangeLogMode string // Lanes configures the read/write worker lanes above this Pebble handle. Lanes LaneOptions diff --git a/internal/storage/adapter/pebble/options.go b/internal/storage/adapter/pebble/options.go index 3a51344..a7a82e5 100644 --- a/internal/storage/adapter/pebble/options.go +++ b/internal/storage/adapter/pebble/options.go @@ -66,23 +66,25 @@ type BackpressureOptions struct { } const ( - DefaultStreamRetention = 24 * time.Hour - DefaultStreamRetentionInterval = 30 * time.Second + DefaultStreamRetention = 24 * time.Hour + DefaultStreamRetentionLoopInterval = 30 * time.Second + DefaultStreamRetentionCleanupBatch = 65536 ) -// StreamRetentionOptions controls logical DynamoDB Streams retention. -// Retention defaults to 24h for DynamoDB parity. MaxBytes <= 0 means -// only the time window is enforced. Trimming advances per-table stream -// high-water marks but keeps the physical changelog for PITR. +// StreamRetentionOptions controls DynamoDB Streams / CDC retention. +// Retention defaults to 24h for DynamoDB parity. Expiration is physical and +// lazy: each changelog entry receives a time-ordered expiration pointer on +// append, and the background cleaner deletes only expired pointers in bounded +// batches. // -// Interval controls how often the background loop scans stream-enabled -// tables and applies retention. Defaults to 30s. Set to a negative -// duration to disable the loop (callers can still invoke -// ApplyStreamRetention explicitly). Zero inherits the default. +// Interval controls the cleaner cadence. Positive enables the cleaner, +// negative disables it, and zero inherits the default. MaxBytes is retained for +// config compatibility; the physical cleaner is time-window based. type StreamRetentionOptions struct { Retention time.Duration MaxBytes int64 Interval time.Duration + BatchSize int } func normalizeStreamRetentionOptions(o StreamRetentionOptions) StreamRetentionOptions { @@ -93,7 +95,10 @@ func normalizeStreamRetentionOptions(o StreamRetentionOptions) StreamRetentionOp o.MaxBytes = 0 } if o.Interval == 0 { - o.Interval = DefaultStreamRetentionInterval + o.Interval = DefaultStreamRetentionLoopInterval + } + if o.BatchSize <= 0 { + o.BatchSize = DefaultStreamRetentionCleanupBatch } return o } @@ -114,8 +119,8 @@ func normalizeProfile(profile string) string { } // normalizeChangeLogMode resolves the wire string into the canonical -// mode. Empty input maps to "streams-only" — only tables with stream -// enabled pay the per-mutation changelog write. Operators who need +// mode. Empty input maps to "streams-only" — only tables with stream enabled +// pay the per-mutation changelog write. Operators who need retention-bound // PITR over every table opt in explicitly with "always". func normalizeChangeLogMode(mode string) string { switch strings.ToLower(strings.TrimSpace(mode)) { diff --git a/internal/storage/adapter/pebble/retention_loop.go b/internal/storage/adapter/pebble/retention_loop.go index 7b89085..ed2c641 100644 --- a/internal/storage/adapter/pebble/retention_loop.go +++ b/internal/storage/adapter/pebble/retention_loop.go @@ -5,25 +5,12 @@ import ( ) // startRetentionLoop launches the background goroutine that periodically -// invokes ApplyStreamRetention for every stream-enabled table that has -// produced at least one change record. It is a no-op when the configured +// removes expired CDC/changelog entries. It is a no-op when the configured // interval is negative. // -// Tables are discovered lazily via trackStreamTable, called from -// appendChangeRecord when the record carries StreamRecord == true. The -// loop never blocks foreground writes — it acquires only the read side -// of streamTablesMu to snapshot the table set, then drives -// ApplyStreamRetention serially on the snapshot. ApplyStreamRetention -// takes changeMu like any other writer, so its overhead is bounded by -// the normal write-coalescer. -// -// Hot writes used to call refreshStreamRetentionAfterWrite at the end -// of every PutItem / DeleteItem / BatchWrite / Atomic / TTL evict path, -// which scanned the entire changelog prefix and committed an extra -// batch per write — O(N) on the live changelog per write. The -// background loop replaces that with an O(stream-tables) sweep every -// Interval, paid amortized across every write that fired during the -// window. +// The loop is bounded by StreamRetentionOptions.BatchSize. It scans only the +// time-ordered expiration prefix, so an idle database with no expired CDC +// records performs a tiny range probe instead of walking the changelog. func (d *DB) startRetentionLoop() { if d == nil { return @@ -59,20 +46,11 @@ func (d *DB) runRetentionLoop(interval time.Duration) { } } -// tickRetention snapshots the known stream-enabled tables and applies -// retention to each. Errors are intentionally swallowed: a transient -// failure on one table must not block the others, and there is no -// foreground caller to return the error to. The next tick will retry. +// tickRetention applies one bounded physical cleanup pass. Errors are +// intentionally swallowed because there is no foreground caller to return them +// to; the next tick retries. func (d *DB) tickRetention(now time.Time) { - d.streamTablesMu.RLock() - tables := make([]string, 0, len(d.streamTables)) - for name := range d.streamTables { - tables = append(tables, name) - } - d.streamTablesMu.RUnlock() - for _, name := range tables { - _, _ = d.ApplyStreamRetention(name, now) - } + _, _ = d.applyExpiredChangeLogRetention(now) } func (d *DB) stopRetentionLoop() { diff --git a/internal/storage/adapter/pebble/retention_loop_test.go b/internal/storage/adapter/pebble/retention_loop_test.go index 010f639..26281c1 100644 --- a/internal/storage/adapter/pebble/retention_loop_test.go +++ b/internal/storage/adapter/pebble/retention_loop_test.go @@ -1,33 +1,49 @@ package pebble import ( + "fmt" "testing" "time" "github.com/CefasDb/cefasdb/pkg/types" ) -// TestRetentionLoopFiresOnTick exercises the happy path: writes to a -// stream-enabled table without calling ApplyStreamRetention explicitly, -// then waits for one tick and asserts persisted state appeared. -func TestRetentionLoopFiresOnTick(t *testing.T) { +func TestRetentionLoopEnabledByDefault(t *testing.T) { + db := openChangeLogTestDBWithOptions(t, Options{ + ChangeLogMode: ChangeLogModeStreamsOnly, + }) + if db.retentionStopCh == nil { + t.Fatalf("loop should start with the default positive cleanup interval") + } + td := streamTestTable() + if err := db.PutItemWith(td, types.Item{ + "id": streamS("k"), + "status": streamS("v"), + }, PutOptions{}); err != nil { + t.Fatalf("put: %v", err) + } + if _, ok, _ := db.loadStreamRetentionState(td.Name); ok { + t.Fatalf("retention state should not exist without explicit apply") + } +} + +// TestRetentionLoopFiresOnExplicitInterval exercises the opt-in path: writes to +// a stream-enabled table without calling ApplyStreamRetention explicitly, then +// waits for one tick and asserts expired CDC state was trimmed. +func TestRetentionLoopFiresOnExplicitInterval(t *testing.T) { db := openChangeLogTestDBWithOptions(t, Options{ ChangeLogMode: ChangeLogModeStreamsOnly, StreamRetention: StreamRetentionOptions{ - Interval: 50 * time.Millisecond, + Retention: 10 * time.Millisecond, + Interval: 50 * time.Millisecond, }, }) td := streamTestTable() for i := 0; i < 5; i++ { - if err := db.PutItemWith(td, types.Item{ - "id": streamS("k"), - "status": streamS("v"), - }, PutOptions{}); err != nil { - t.Fatalf("put: %v", err) - } + appendStreamChangeAt(t, db, td, fmt.Sprintf("k-%d", i), time.Now().Add(-time.Second)) } - // Before the tick, nothing was persisted (per-write refresh is gone). + // Before the tick, nothing was persisted. if _, ok, err := db.loadStreamRetentionState(td.Name); err != nil { t.Fatalf("load: %v", err) } else if ok { @@ -62,6 +78,7 @@ func TestRetentionLoopDisabledWhenIntervalNegative(t *testing.T) { if _, ok, _ := db.loadStreamRetentionState(td.Name); ok { t.Fatalf("retention state should not exist without explicit apply") } + appendStreamChangeAt(t, db, td, "old", time.Now().Add(-48*time.Hour)) if _, err := db.ApplyStreamRetention(td.Name, time.Now()); err != nil { t.Fatalf("apply: %v", err) } diff --git a/internal/storage/keys.go b/internal/storage/keys.go index 563f886..fc01cf8 100644 --- a/internal/storage/keys.go +++ b/internal/storage/keys.go @@ -41,6 +41,8 @@ const ( pGlobalIndex = pInternal + "global-index/" pStreams = pAdmin + "streams/by-arn/" pStreamTrim = pAdmin + "streams/retention/" + pChangeLog = pAdmin + "change/log/" + pChangeExpire = pAdmin + "change/expire/" SegPrimary = "/p/" segGSI = "/gsi/" @@ -53,7 +55,7 @@ const ( var ChangeCounterKey = []byte(pAdmin + "change/counter") func KeyChangeLog(index uint64) []byte { - base := []byte(pAdmin + "change/log/") + base := []byte(pChangeLog) var b [8]byte binary.BigEndian.PutUint64(b[:], index) out := make([]byte, 0, len(base)+len(b)) @@ -63,7 +65,7 @@ func KeyChangeLog(index uint64) []byte { } func PrefixChangeLog() (lower, upper []byte) { - p := []byte(pAdmin + "change/log/") + p := []byte(pChangeLog) return p, prefixUpper(p) } @@ -71,7 +73,7 @@ func PrefixChangeLog() (lower, upper []byte) { // a key produced by KeyChangeLog. Returns an error on malformed keys so // callers can flag a corrupt iterator entry instead of silently zeroing. func ChangeLogIndexFromKey(key []byte) (uint64, error) { - prefix := []byte(pAdmin + "change/log/") + prefix := []byte(pChangeLog) if len(key) != len(prefix)+8 { return 0, fmt.Errorf("change log key length %d, want %d", len(key), len(prefix)+8) } @@ -83,6 +85,44 @@ func ChangeLogIndexFromKey(key []byte) (uint64, error) { return binary.BigEndian.Uint64(key[len(prefix):]), nil } +// KeyChangeLogExpiration builds the time-ordered pointer used by the CDC +// retention cleaner. The changelog record itself remains addressed by +// KeyChangeLog(index); this pointer lets the cleaner visit only expired +// records instead of scanning the full changelog. +// +// cefas/admin/change/expire// +func KeyChangeLogExpiration(expireUnixNano int64, index uint64) []byte { + base := []byte(pChangeExpire) + var exp, idx [8]byte + binary.BigEndian.PutUint64(exp[:], uint64(expireUnixNano)) + binary.BigEndian.PutUint64(idx[:], index) + out := make([]byte, 0, len(base)+len(exp)+len(idx)) + out = append(out, base...) + out = append(out, exp[:]...) + out = append(out, idx[:]...) + return out +} + +// PrefixChangeLogExpirationBefore returns the range covering expiration +// pointers with expire_unix_nano < beforeUnixNano. +func PrefixChangeLogExpirationBefore(beforeUnixNano int64) (lower, upper []byte) { + p := []byte(pChangeExpire) + lower = p + upper = make([]byte, 0, len(p)+8) + upper = append(upper, p...) + upper = append(upper, be8(uint64(beforeUnixNano))...) + return lower, upper +} + +func ParseChangeLogExpirationKey(key []byte) (expireUnixNano int64, index uint64, ok bool) { + prefix := []byte(pChangeExpire) + if len(key) != len(prefix)+16 || !bytes.HasPrefix(key, prefix) { + return 0, 0, false + } + rest := key[len(prefix):] + return int64(binary.BigEndian.Uint64(rest[:8])), binary.BigEndian.Uint64(rest[8:]), true +} + // KeyStreamDescriptor stores persisted DynamoDB Streams metadata by ARN. func KeyStreamDescriptor(streamArn string) []byte { return []byte(pStreams + escapeKeySegment(streamArn))