From 43d8999003aa762d3f0ddf10f523f3242759004f Mon Sep 17 00:00:00 2001 From: Osvaldo Andrade Date: Wed, 24 Jun 2026 23:48:46 -0300 Subject: [PATCH] Add functional health probes and safe lifecycle --- cmd/cefasdb/main.go | 128 ++++++++- dist/helm/cefas/templates/configmap.yaml | 4 + dist/helm/cefas/templates/service.yaml | 2 +- dist/helm/cefas/templates/statefulset.yaml | 4 +- dist/helm/cefas/values.yaml | 24 +- internal/bootstrap/server/flags.go | 17 ++ internal/bootstrap/server/flags_test.go | 15 + internal/config/config.go | 11 + internal/config/config_test.go | 16 ++ internal/server/grpc_server.go | 19 +- internal/server/health.go | 256 ++++++++++++++++++ internal/server/health_test.go | 122 +++++++++ internal/server/lifecycle.go | 154 +++++++++++ internal/server/server.go | 15 +- internal/storage/adapter/pebble/db.go | 65 +++++ .../storage/adapter/pebble/storage_test.go | 31 +++ 16 files changed, 862 insertions(+), 21 deletions(-) create mode 100644 internal/server/health.go create mode 100644 internal/server/health_test.go create mode 100644 internal/server/lifecycle.go diff --git a/cmd/cefasdb/main.go b/cmd/cefasdb/main.go index 72b9b9f..307e4b6 100644 --- a/cmd/cefasdb/main.go +++ b/cmd/cefasdb/main.go @@ -6,6 +6,7 @@ package main import ( "context" + "errors" "flag" "fmt" "log/slog" @@ -13,6 +14,7 @@ import ( "net/http" "os" "os/signal" + "sync/atomic" "syscall" "time" @@ -127,10 +129,13 @@ func main() { mtlsCA = flag.String("mtls-ca", "", "Path to a client-CA bundle. When set, the gRPC listener requires mTLS.") // Observability + config. - configPath = flag.String("config", "", "Path to YAML config file. Flag/env values override the file.") - metricsOff = flag.Bool("metrics-disabled", false, "Disable the /metrics Prometheus endpoint.") - tracingURL = flag.String("tracing-endpoint", "", "OTLP/gRPC collector endpoint (e.g. 'jaeger:4317'). Empty disables tracing.") - tracingIns = flag.Bool("tracing-insecure", true, "Disable TLS to the OTLP collector.") + configPath = flag.String("config", "", "Path to YAML config file. Flag/env values override the file.") + metricsOff = flag.Bool("metrics-disabled", false, "Disable the /metrics Prometheus endpoint.") + tracingURL = flag.String("tracing-endpoint", "", "OTLP/gRPC collector endpoint (e.g. 'jaeger:4317'). Empty disables tracing.") + tracingIns = flag.Bool("tracing-insecure", true, "Disable TLS to the OTLP collector.") + shutdownGracePeriod = flag.Duration("shutdown-grace-period", 0, "Grace period for HTTP/gRPC/Raft/storage shutdown. 0 inherits config/default.") + shutdownDrainDelay = flag.Duration("shutdown-drain-delay", -1, "Delay after readiness is dropped before listener shutdown. Negative inherits config/default; 0 disables delay.") + shutdownLeadershipTransfer = flag.Duration("shutdown-leadership-transfer-timeout", 0, "Per-shard leadership transfer timeout during shutdown. 0 inherits config/default.") // pprof debug listener. Default empty = disabled. Recommended // bind '127.0.0.1:6060' for benchmark runs; binding to a @@ -210,6 +215,7 @@ func main() { *raftIdentityLeaseBackend, *raftIdentityLeasePath, *raftIdentityLeaseName, *raftIdentityLeaseNamespace, *raftIdentityLeaseAPIURL, *raftIdentityLeaseTTL, *raftIdentityLeaseRenew) + bootstrapserver.OverlayLifecycleFlags(&cfg, *shutdownGracePeriod, *shutdownDrainDelay, *shutdownLeadershipTransfer) // Initialise tracing first so subsequent setup gets spans on // failure. tracingShutdown is a no-op when no endpoint is set. @@ -247,6 +253,7 @@ func main() { leaseCtx, leaseCancel := context.WithCancel(context.Background()) defer leaseCancel() leaseLost := make(chan error, 1) + var leaseHealthy atomic.Bool var raftIdentityGuard *identitylease.Guard if raftIdentityLeaseRequired(cfg) { raftIdentityGuard, err = identitylease.Acquire(context.Background(), identitylease.Options{ @@ -274,9 +281,11 @@ func main() { logger.Error("release raft identity lease", "err", err, "raftID", cfg.Cluster.SelfID) } }() + leaseHealthy.Store(true) rec := raftIdentityGuard.Record() logger.Info("raft identity lease acquired", "raftID", rec.NodeID, "holder", rec.HolderID, "epoch", rec.Epoch, "backend", rec.Backend, "resource", rec.Resource, "expires", rec.ExpiresAt) go raftIdentityGuard.RenewLoop(leaseCtx, func(err error) { + leaseHealthy.Store(false) logger.Error("raft identity lease lost", "err", err, "raftID", cfg.Cluster.SelfID) select { case leaseLost <- err: @@ -481,6 +490,21 @@ func main() { } } } + if raftIdentityGuard != nil { + apiSrv.AddReadinessCheck("raft_identity_lease", func(context.Context) error { + if !leaseHealthy.Load() { + return fmt.Errorf("raft identity lease not healthy") + } + rec := raftIdentityGuard.Record() + if rec.NodeID == "" || rec.HolderID == "" || rec.Epoch == 0 { + return fmt.Errorf("raft identity lease incomplete") + } + if !rec.ExpiresAt.IsZero() && time.Now().UTC().After(rec.ExpiresAt) { + return fmt.Errorf("raft identity lease expired at %s", rec.ExpiresAt.Format(time.RFC3339Nano)) + } + return nil + }) + } if cfg.BackupScheduler.Enabled { go backupScheduler.Run(runtimeCtx) logger.Info("scheduled backups enabled", "interval", cfg.BackupScheduler.Interval, "dryRun", cfg.BackupScheduler.DryRun, "template", cfg.BackupScheduler.NameTemplate, "tables", cfg.BackupScheduler.Tables) @@ -519,6 +543,7 @@ func main() { // gRPC listener (optional). var gsrv *grpc.Server + var gsrvImpl *server.GRPCServer if cfg.GRPC.Addr != "" { // Workload prioritization (#499): wire the per-SL quota // controller against the catalog. Hot reload via @@ -540,7 +565,8 @@ func main() { clu = sh.Raft } } - gsrvImpl := server.NewGRPCServer(db, cat, clu) + gsrvImpl = server.NewGRPCServer(db, cat, clu) + gsrvImpl.AttachLifecycle(apiSrv.Lifecycle()) if mgr != nil { gsrvImpl.AttachManager(mgr) } @@ -569,6 +595,7 @@ func main() { } }() } + apiSrv.MarkStarted() stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) @@ -580,15 +607,35 @@ func main() { shutdownReason = "raft identity lease lost: " + err.Error() } logger.Info("shutting down", "reason", shutdownReason) + apiSrv.StartDraining(shutdownReason) + rejectWrites(db, mgr, shutdownReason) + if cfg.Lifecycle.DrainDelay > 0 { + time.Sleep(cfg.Lifecycle.DrainDelay) + } + if err := transferLeadershipOnShutdown(context.Background(), raftDB, mgr, cfg.Cluster.SelfID, cfg.Cluster.Peers, cfg.Lifecycle.LeadershipTransferTimeout); err != nil { + logger.Error("shutdown leadership transfer", "err", err) + } leaseCancel() runtimeCancel() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Lifecycle.ShutdownGracePeriod) defer cancel() _ = srv.Shutdown(ctx) _ = server.ShutdownPprof(ctx, pprofSrv) + if gsrvImpl != nil { + gsrvImpl.StopMVScheduler() + } if gsrv != nil { - gsrv.GracefulStop() + done := make(chan struct{}) + go func() { + gsrv.GracefulStop() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + gsrv.Stop() + } } } @@ -598,3 +645,70 @@ func raftIdentityLeaseRequired(cfg config.Config) bool { } return cfg.Cluster.Shards > 0 || cfg.Raft.Bind != "" } + +func rejectWrites(db *pebble.DB, mgr *cluster.Manager, reason string) { + if mgr == nil { + if db != nil { + db.RejectWrites(reason) + } + return + } + for _, sh := range mgr.Shards() { + if sh != nil && sh.Storage != nil { + sh.Storage.RejectWrites(reason) + } + } +} + +func transferLeadershipOnShutdown(ctx context.Context, raftDB *craft.DB, mgr *cluster.Manager, selfID string, peers map[string]string, timeout time.Duration) error { + if timeout <= 0 { + return nil + } + if mgr == nil { + if raftDB == nil || !raftDB.IsLeader() { + return nil + } + targetID, targetAddr := firstPeerTarget(selfID, peers, nil) + if targetID == "" { + return nil + } + return raftDB.TransferLeadership(targetID, targetAddr, timeout) + } + var errs []error + for _, sh := range mgr.Shards() { + if err := ctx.Err(); err != nil { + return err + } + if sh == nil || sh.Raft == nil || !sh.Raft.IsLeader() { + continue + } + targetID, targetAddr := firstPeerTarget(selfID, peers, sh.Voters) + if targetID == "" { + continue + } + if err := sh.Raft.TransferLeadership(targetID, targetAddr, timeout); err != nil { + errs = append(errs, fmt.Errorf("shard %d: %w", sh.ID, err)) + } + } + return errors.Join(errs...) +} + +func firstPeerTarget(selfID string, peers map[string]string, voters []string) (string, string) { + if len(voters) > 0 { + for _, id := range voters { + if id == "" || id == selfID { + continue + } + if addr := peers[id]; addr != "" { + return id, addr + } + } + return "", "" + } + for id, addr := range peers { + if id != "" && id != selfID && addr != "" { + return id, addr + } + } + return "", "" +} diff --git a/dist/helm/cefas/templates/configmap.yaml b/dist/helm/cefas/templates/configmap.yaml index ca51154..27fe1aa 100644 --- a/dist/helm/cefas/templates/configmap.yaml +++ b/dist/helm/cefas/templates/configmap.yaml @@ -9,6 +9,10 @@ data: data: /var/lib/cefas http: addr: ":{{ .Values.service.http }}" + lifecycle: + shutdownGracePeriod: {{ .Values.lifecycle.shutdownGracePeriod | quote }} + drainDelay: {{ .Values.lifecycle.drainDelay | quote }} + leadershipTransferTimeout: {{ .Values.lifecycle.leadershipTransferTimeout | quote }} grpc: addr: ":{{ .Values.service.grpc }}" reflection: true diff --git a/dist/helm/cefas/templates/service.yaml b/dist/helm/cefas/templates/service.yaml index 9cc2290..91be2b4 100644 --- a/dist/helm/cefas/templates/service.yaml +++ b/dist/helm/cefas/templates/service.yaml @@ -27,7 +27,7 @@ metadata: spec: type: ClusterIP clusterIP: None - publishNotReadyAddresses: true + publishNotReadyAddresses: {{ .Values.service.headless.publishNotReadyAddresses }} selector: {{- include "cefas.selectorLabels" . | nindent 4 }} ports: diff --git a/dist/helm/cefas/templates/statefulset.yaml b/dist/helm/cefas/templates/statefulset.yaml index b7babe4..6f425aa 100644 --- a/dist/helm/cefas/templates/statefulset.yaml +++ b/dist/helm/cefas/templates/statefulset.yaml @@ -21,7 +21,7 @@ spec: serviceAccountName: {{ include "cefas.serviceAccountName" . }} securityContext: {{- toYaml .Values.securityContext | nindent 8 }} - terminationGracePeriodSeconds: 30 + terminationGracePeriodSeconds: {{ .Values.lifecycle.terminationGracePeriodSeconds }} containers: - name: cefas image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" @@ -66,6 +66,8 @@ spec: {{- toYaml .Values.livenessProbe | nindent 12 }} readinessProbe: {{- toYaml .Values.readinessProbe | nindent 12 }} + startupProbe: + {{- toYaml .Values.startupProbe | nindent 12 }} volumeMounts: - name: data mountPath: /var/lib/cefas diff --git a/dist/helm/cefas/values.yaml b/dist/helm/cefas/values.yaml index ae95e7d..cdac4e9 100644 --- a/dist/helm/cefas/values.yaml +++ b/dist/helm/cefas/values.yaml @@ -33,6 +33,17 @@ service: http: 8080 grpc: 9090 mux: 9100 + headless: + # Required for StatefulSet peer bootstrap: Pods can resolve each + # other before readiness passes. Client traffic uses the regular + # service above, which does not publish non-ready endpoints. + publishNotReadyAddresses: true + +lifecycle: + terminationGracePeriodSeconds: 30 + shutdownGracePeriod: 25s + drainDelay: 2s + leadershipTransferTimeout: 5s # Multi-Raft sharding. shards>1 turns on the mux transport. cluster: @@ -85,17 +96,24 @@ securityContext: runAsNonRoot: true runAsUser: 65532 -# Probes target the cheap /v1/Health endpoint. livenessProbe: httpGet: - path: /v1/Health + path: /livez port: 8080 initialDelaySeconds: 15 periodSeconds: 30 readinessProbe: httpGet: - path: /v1/Health + path: /readyz port: 8080 initialDelaySeconds: 5 periodSeconds: 10 + failureThreshold: 1 + +startupProbe: + httpGet: + path: /startupz + port: 8080 + failureThreshold: 30 + periodSeconds: 2 diff --git a/internal/bootstrap/server/flags.go b/internal/bootstrap/server/flags.go index 16b3b7a..1f5f139 100644 --- a/internal/bootstrap/server/flags.go +++ b/internal/bootstrap/server/flags.go @@ -341,6 +341,23 @@ func OverlayRaftIdentityLeaseFlags( } } +// OverlayLifecycleFlags keeps shutdown/drain tuning separate from the +// large historical OverlayFlags signature. +func OverlayLifecycleFlags( + cfg *config.Config, + shutdownGracePeriod, drainDelay, leadershipTransferTimeout time.Duration, +) { + if shutdownGracePeriod > 0 { + cfg.Lifecycle.ShutdownGracePeriod = shutdownGracePeriod + } + if drainDelay >= 0 { + cfg.Lifecycle.DrainDelay = drainDelay + } + if leadershipTransferTimeout > 0 { + cfg.Lifecycle.LeadershipTransferTimeout = leadershipTransferTimeout + } +} + // SplitCSVFlag splits a comma-separated CLI flag value into a trimmed // slice. Blank entries are dropped, so "a, ,b" yields ["a","b"]. func SplitCSVFlag(in string) []string { diff --git a/internal/bootstrap/server/flags_test.go b/internal/bootstrap/server/flags_test.go index 2a59c37..049d114 100644 --- a/internal/bootstrap/server/flags_test.go +++ b/internal/bootstrap/server/flags_test.go @@ -315,6 +315,21 @@ func TestOverlayRaftIdentityLeaseFlags(t *testing.T) { } } +func TestOverlayLifecycleFlags(t *testing.T) { + cfg := baseCfg() + OverlayLifecycleFlags(&cfg, 20*time.Second, 500*time.Millisecond, 3*time.Second) + + if cfg.Lifecycle.ShutdownGracePeriod != 20*time.Second { + t.Errorf("ShutdownGracePeriod = %v", cfg.Lifecycle.ShutdownGracePeriod) + } + if cfg.Lifecycle.DrainDelay != 500*time.Millisecond { + t.Errorf("DrainDelay = %v", cfg.Lifecycle.DrainDelay) + } + if cfg.Lifecycle.LeadershipTransferTimeout != 3*time.Second { + t.Errorf("LeadershipTransferTimeout = %v", cfg.Lifecycle.LeadershipTransferTimeout) + } +} + func TestOverlayFlags_PeerSetGroup(t *testing.T) { cfg := baseCfg() args := zeroArgs() diff --git a/internal/config/config.go b/internal/config/config.go index b211587..de7c88a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -25,6 +25,11 @@ type Config struct { HTTP struct { Addr string `yaml:"addr"` } `yaml:"http"` + Lifecycle struct { + ShutdownGracePeriod time.Duration `yaml:"shutdownGracePeriod"` + DrainDelay time.Duration `yaml:"drainDelay"` + LeadershipTransferTimeout time.Duration `yaml:"leadershipTransferTimeout"` + } `yaml:"lifecycle"` GRPC struct { Addr string `yaml:"addr"` Reflection bool `yaml:"reflection"` @@ -157,6 +162,9 @@ func Defaults() Config { var c Config c.Data = "./cefas-data" c.HTTP.Addr = ":8080" + c.Lifecycle.ShutdownGracePeriod = 25 * time.Second + c.Lifecycle.DrainDelay = 2 * time.Second + c.Lifecycle.LeadershipTransferTimeout = 5 * time.Second c.Identity.ClockSkew = 30 * time.Second c.Metrics.Enabled = true c.Metrics.HotspotBuckets = 64 @@ -284,6 +292,9 @@ func ApplyEnv(cfg *Config) error { cfg.Data = str("DATA", cfg.Data) cfg.HTTP.Addr = str("HTTP_ADDR", cfg.HTTP.Addr) + cfg.Lifecycle.ShutdownGracePeriod = dur("LIFECYCLE_SHUTDOWN_GRACE_PERIOD", cfg.Lifecycle.ShutdownGracePeriod) + cfg.Lifecycle.DrainDelay = dur("LIFECYCLE_DRAIN_DELAY", cfg.Lifecycle.DrainDelay) + cfg.Lifecycle.LeadershipTransferTimeout = dur("LIFECYCLE_LEADERSHIP_TRANSFER_TIMEOUT", cfg.Lifecycle.LeadershipTransferTimeout) cfg.GRPC.Addr = str("GRPC_ADDR", cfg.GRPC.Addr) cfg.GRPC.Reflection = boolean("GRPC_REFLECTION", cfg.GRPC.Reflection) cfg.GRPC.TLSCertPath = str("GRPC_TLS_CERT", cfg.GRPC.TLSCertPath) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index bc87534..e94a856 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -17,6 +17,9 @@ func TestDefaultsPopulated(t *testing.T) { if d.Identity.ClockSkew != 30*time.Second { t.Errorf("clock skew default = %v", d.Identity.ClockSkew) } + if d.Lifecycle.ShutdownGracePeriod != 25*time.Second || d.Lifecycle.DrainDelay != 2*time.Second || d.Lifecycle.LeadershipTransferTimeout != 5*time.Second { + t.Errorf("lifecycle defaults not populated: %+v", d.Lifecycle) + } if !d.Metrics.Enabled { t.Errorf("metrics should default on") } @@ -69,6 +72,10 @@ func TestLoadFileYAML(t *testing.T) { data: /var/lib/cefas-test http: addr: ":18080" +lifecycle: + shutdownGracePeriod: 20s + drainDelay: 1500ms + leadershipTransferTimeout: 4s cluster: shards: 3 replicationFactor: 2 @@ -141,6 +148,9 @@ backupScheduler: if cfg.Identity.ClockSkew != 45*time.Second { t.Fatalf("clock skew = %v", cfg.Identity.ClockSkew) } + if cfg.Lifecycle.ShutdownGracePeriod != 20*time.Second || cfg.Lifecycle.DrainDelay != 1500*time.Millisecond || cfg.Lifecycle.LeadershipTransferTimeout != 4*time.Second { + t.Fatalf("lifecycle config not loaded: %+v", cfg.Lifecycle) + } if cfg.Storage.ChangeLogMode != "streams-only" { t.Fatalf("storage changelog mode config not loaded: %+v", cfg.Storage) } @@ -187,6 +197,9 @@ backupScheduler: func TestApplyEnv(t *testing.T) { t.Setenv("CEFAS_HTTP_ADDR", ":19090") + t.Setenv("CEFAS_LIFECYCLE_SHUTDOWN_GRACE_PERIOD", "21s") + t.Setenv("CEFAS_LIFECYCLE_DRAIN_DELAY", "750ms") + t.Setenv("CEFAS_LIFECYCLE_LEADERSHIP_TRANSFER_TIMEOUT", "6s") t.Setenv("CEFAS_CLUSTER_SHARDS", "4") t.Setenv("CEFAS_CLUSTER_REPLICATION_FACTOR", "3") t.Setenv("CEFAS_RAFT_HEARTBEAT_TIMEOUT", "2500ms") @@ -235,6 +248,9 @@ func TestApplyEnv(t *testing.T) { if cfg.HTTP.Addr != ":19090" { t.Errorf("http addr override: %q", cfg.HTTP.Addr) } + if cfg.Lifecycle.ShutdownGracePeriod != 21*time.Second || cfg.Lifecycle.DrainDelay != 750*time.Millisecond || cfg.Lifecycle.LeadershipTransferTimeout != 6*time.Second { + t.Errorf("lifecycle env not applied: %+v", cfg.Lifecycle) + } if cfg.Cluster.Shards != 4 { t.Errorf("shards override: %d", cfg.Cluster.Shards) } diff --git a/internal/server/grpc_server.go b/internal/server/grpc_server.go index 56e0c57..e819199 100644 --- a/internal/server/grpc_server.go +++ b/internal/server/grpc_server.go @@ -36,14 +36,15 @@ type GRPCServer struct { cefaspb.UnimplementedCefasServer cefaspb.UnimplementedReplicaServer - db *pebble.DB - cat *catalog.Catalog - cluster Cluster // nil in single-node mode - stream ChangeStream // nil when no CDC source attached - manager *cluster.Manager // nil in single-shard mode - plugins *plugin.Registry // nil → uses plugin.Default - metrics *metrics.Metrics // nil when metrics disabled - backups BackupSchedulerStatusProvider + db *pebble.DB + cat *catalog.Catalog + cluster Cluster // nil in single-node mode + stream ChangeStream // nil when no CDC source attached + manager *cluster.Manager // nil in single-shard mode + plugins *plugin.Registry // nil → uses plugin.Default + metrics *metrics.Metrics // nil when metrics disabled + backups BackupSchedulerStatusProvider + lifecycle *Lifecycle mvScheduler *mvScheduler } @@ -2330,6 +2331,8 @@ func mapStorageErr(err error) error { return status.Error(codes.FailedPrecondition, err.Error()) case errors.Is(err, craft.ErrNotLeader): return status.Error(codes.FailedPrecondition, err.Error()) + case errors.Is(err, pebble.ErrDraining): + return status.Error(codes.Unavailable, err.Error()) case errors.Is(err, pebble.ErrThrottled): return status.Error(codes.ResourceExhausted, err.Error()) } diff --git a/internal/server/health.go b/internal/server/health.go new file mode 100644 index 0000000..7c72bf9 --- /dev/null +++ b/internal/server/health.go @@ -0,0 +1,256 @@ +package server + +import ( + "context" + "fmt" + "net/http" + "time" +) + +const probeTimeout = time.Second + +type probeComponent struct { + Name string `json:"name"` + OK bool `json:"ok"` + Error string `json:"error,omitempty"` +} + +type probeResponse struct { + Status string `json:"status"` + Lifecycle LifecycleSnapshot `json:"lifecycle"` + Components []probeComponent `json:"components,omitempty"` + Raft *raftProbeResponse `json:"raft,omitempty"` +} + +type raftProbeResponse struct { + Mode string `json:"mode"` + SelfID string `json:"selfId,omitempty"` + BindAddr string `json:"bindAddr,omitempty"` + IsLeader bool `json:"isLeader,omitempty"` + LeaderID string `json:"leaderId,omitempty"` + LeaderAddr string `json:"leaderAddr,omitempty"` + Shards []raftShardStatus `json:"shards,omitempty"` +} + +type raftShardStatus struct { + ID uint32 `json:"id"` + LocalVoter bool `json:"localVoter"` + LocalNonVoter bool `json:"localNonVoter"` + Voters []string `json:"voters,omitempty"` + NonVoters []string `json:"nonVoters,omitempty"` + IsLeader bool `json:"isLeader"` + LeaderID string `json:"leaderId,omitempty"` + LeaderAddr string `json:"leaderAddr,omitempty"` + LeaderKnown bool `json:"leaderKnown"` + StorageOpen bool `json:"storageOpen"` + RaftInitialized bool `json:"raftInitialized"` +} + +func (s *Server) handleLivez(w http.ResponseWriter, r *http.Request) { + components := s.storageProbeComponents(r.Context()) + if !componentsOK(components) { + writeJSON(w, http.StatusServiceUnavailable, probeResponse{ + Status: "unhealthy", + Lifecycle: s.Lifecycle().Snapshot(), + Components: components, + }) + return + } + writeJSON(w, http.StatusOK, probeResponse{ + Status: "ok", + Lifecycle: s.Lifecycle().Snapshot(), + Components: components, + }) +} + +func (s *Server) handleStartupz(w http.ResponseWriter, r *http.Request) { + snap := s.Lifecycle().Snapshot() + if !snap.Started { + writeJSON(w, http.StatusServiceUnavailable, probeResponse{ + Status: "starting", + Lifecycle: snap, + }) + return + } + writeJSON(w, http.StatusOK, probeResponse{ + Status: "ok", + Lifecycle: snap, + }) +} + +func (s *Server) handleReadyz(w http.ResponseWriter, r *http.Request) { + resp, ok := s.readinessReport(r.Context()) + status := http.StatusOK + if !ok { + status = http.StatusServiceUnavailable + } + writeJSON(w, status, resp) +} + +func (s *Server) handleRaftz(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, s.raftProbe()) +} + +func (s *Server) readinessReport(ctx context.Context) (probeResponse, bool) { + resp := probeResponse{ + Status: "ok", + Lifecycle: s.Lifecycle().Snapshot(), + Raft: s.raftProbe(), + } + add := func(name string, err error) { + c := probeComponent{Name: name, OK: err == nil} + if err != nil { + c.Error = err.Error() + resp.Status = "unhealthy" + } + resp.Components = append(resp.Components, c) + } + + add("lifecycle", s.Lifecycle().servingError()) + for _, c := range s.storageProbeComponents(ctx) { + if !c.OK { + resp.Status = "unhealthy" + } + resp.Components = append(resp.Components, c) + } + add("raft", s.raftReadinessError()) + for _, check := range s.Lifecycle().readinessChecks() { + add(check.name, check.fn(ctx)) + } + return resp, resp.Status == "ok" +} + +func (s *Server) storageProbeComponents(ctx context.Context) []probeComponent { + ctx, cancel := context.WithTimeout(ctx, probeTimeout) + defer cancel() + if s.manager == nil { + return []probeComponent{probeComponentFromError("storage", s.db.Health(ctx))} + } + shards := s.manager.Shards() + out := make([]probeComponent, 0, len(shards)) + for _, sh := range shards { + name := "storage" + if sh != nil { + name = fmt.Sprintf("storage/shard-%d", sh.ID) + } + if sh == nil || sh.Storage == nil { + out = append(out, probeComponentFromError(name, fmt.Errorf("storage not open"))) + continue + } + out = append(out, probeComponentFromError(name, sh.Storage.Health(ctx))) + } + return out +} + +func probeComponentFromError(name string, err error) probeComponent { + c := probeComponent{Name: name, OK: err == nil} + if err != nil { + c.Error = err.Error() + } + return c +} + +func componentsOK(components []probeComponent) bool { + for _, c := range components { + if !c.OK { + return false + } + } + return true +} + +func (s *Server) raftProbe() *raftProbeResponse { + if s.manager != nil { + shards := s.manager.Shards() + out := &raftProbeResponse{Mode: "multi-shard-local"} + for _, sh := range shards { + if sh != nil && sh.Raft != nil { + out.Mode = "multi-raft" + break + } + } + for _, sh := range shards { + if sh == nil { + continue + } + st := raftShardStatus{ + ID: sh.ID, + LocalVoter: sh.IsLocalVoter, + LocalNonVoter: sh.IsLocalNonVoter, + Voters: append([]string(nil), sh.Voters...), + NonVoters: append([]string(nil), sh.NonVoters...), + StorageOpen: sh.Storage != nil, + } + if sh.Raft != nil { + st.RaftInitialized = true + st.IsLeader = sh.Raft.IsLeader() + st.LeaderID, st.LeaderAddr = sh.Raft.LeaderInfo() + st.LeaderKnown = st.LeaderID != "" + } + out.Shards = append(out.Shards, st) + } + return out + } + if s.cluster == nil { + return &raftProbeResponse{Mode: "single-node"} + } + leaderID, leaderAddr := s.cluster.LeaderInfo() + return &raftProbeResponse{ + Mode: "raft", + SelfID: s.cluster.SelfID(), + BindAddr: s.cluster.BindAddr(), + IsLeader: s.cluster.IsLeader(), + LeaderID: leaderID, + LeaderAddr: leaderAddr, + } +} + +func (s *Server) raftReadinessError() error { + if s.manager != nil { + shards := s.manager.Shards() + raftConfigured := false + for _, sh := range shards { + if sh != nil && sh.Raft != nil { + raftConfigured = true + break + } + } + if !raftConfigured { + return nil + } + var first error + for _, sh := range shards { + if sh == nil { + if first == nil { + first = fmt.Errorf("missing shard") + } + continue + } + if sh.Storage == nil { + if first == nil { + first = fmt.Errorf("shard %d storage not open", sh.ID) + } + continue + } + if sh.Raft == nil { + if first == nil { + first = fmt.Errorf("shard %d raft not initialized", sh.ID) + } + continue + } + leaderID, _ := sh.Raft.LeaderInfo() + if leaderID == "" && first == nil { + first = fmt.Errorf("shard %d has no known leader", sh.ID) + } + } + return first + } + if s.cluster == nil { + return nil + } + leaderID, _ := s.cluster.LeaderInfo() + if leaderID == "" { + return fmt.Errorf("raft has no known leader") + } + return nil +} diff --git a/internal/server/health_test.go b/internal/server/health_test.go new file mode 100644 index 0000000..b7660d5 --- /dev/null +++ b/internal/server/health_test.go @@ -0,0 +1,122 @@ +package server_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/CefasDb/cefasdb/internal/catalog" + apiserver "github.com/CefasDb/cefasdb/internal/server" + pebble "github.com/CefasDb/cefasdb/internal/storage/adapter/pebble" +) + +func newProbeMux(t *testing.T) (*apiserver.Server, *http.ServeMux) { + t.Helper() + db, err := pebble.Open(pebble.Options{Path: t.TempDir()}) + if err != nil { + t.Fatalf("open db: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + cat, err := catalog.New(db) + if err != nil { + t.Fatalf("catalog: %v", err) + } + srv := apiserver.New(db, cat) + mux := http.NewServeMux() + srv.Routes(mux) + return srv, mux +} + +func probeStatus(mux http.Handler, path string) int { + req := httptest.NewRequest(http.MethodGet, path, nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + return rec.Code +} + +func TestStartupAndReadinessLifecycle(t *testing.T) { + srv, mux := newProbeMux(t) + + if got := probeStatus(mux, "/startupz"); got != http.StatusServiceUnavailable { + t.Fatalf("startup before MarkStarted = %d, want 503", got) + } + if got := probeStatus(mux, "/readyz"); got != http.StatusServiceUnavailable { + t.Fatalf("ready before MarkStarted = %d, want 503", got) + } + + srv.MarkStarted() + if got := probeStatus(mux, "/startupz"); got != http.StatusOK { + t.Fatalf("startup after MarkStarted = %d, want 200", got) + } + if got := probeStatus(mux, "/readyz"); got != http.StatusOK { + t.Fatalf("ready after MarkStarted = %d, want 200", got) + } + + srv.StartDraining("test") + if got := probeStatus(mux, "/livez"); got != http.StatusOK { + t.Fatalf("live while draining = %d, want 200", got) + } + if got := probeStatus(mux, "/readyz"); got != http.StatusServiceUnavailable { + t.Fatalf("ready while draining = %d, want 503", got) + } +} + +func TestReadinessCheckFailure(t *testing.T) { + srv, mux := newProbeMux(t) + srv.MarkStarted() + srv.AddReadinessCheck("lease", func(context.Context) error { + return fmt.Errorf("lost") + }) + + if got := probeStatus(mux, "/readyz"); got != http.StatusServiceUnavailable { + t.Fatalf("ready with failing check = %d, want 503", got) + } +} + +func TestRaftReadinessRequiresKnownLeader(t *testing.T) { + srv, mux := newProbeMux(t) + cluster := &fakeProbeCluster{selfID: "n1", bindAddr: "127.0.0.1:9001"} + srv.AttachCluster(cluster) + srv.MarkStarted() + + if got := probeStatus(mux, "/readyz"); got != http.StatusServiceUnavailable { + t.Fatalf("ready without raft leader = %d, want 503", got) + } + if got := probeStatus(mux, "/raftz"); got != http.StatusOK { + t.Fatalf("raftz without raft leader = %d, want 200", got) + } + + cluster.leaderID = "n2" + cluster.leaderAddr = "127.0.0.1:9002" + if got := probeStatus(mux, "/readyz"); got != http.StatusOK { + t.Fatalf("ready with raft leader = %d, want 200", got) + } +} + +type fakeProbeCluster struct { + leader bool + selfID string + bindAddr string + leaderID string + leaderAddr string + leaderHTTP string +} + +func (f *fakeProbeCluster) IsLeader() bool { return f.leader } + +func (f *fakeProbeCluster) LeaderInfo() (string, string) { return f.leaderID, f.leaderAddr } + +func (f *fakeProbeCluster) LeaderHTTPAddr() string { return f.leaderHTTP } + +func (f *fakeProbeCluster) AddVoter(string, string, time.Duration) error { return nil } + +func (f *fakeProbeCluster) RemoveServer(string, time.Duration) error { return nil } + +func (f *fakeProbeCluster) Barrier(time.Duration) error { return nil } + +func (f *fakeProbeCluster) SelfID() string { return f.selfID } + +func (f *fakeProbeCluster) BindAddr() string { return f.bindAddr } diff --git a/internal/server/lifecycle.go b/internal/server/lifecycle.go new file mode 100644 index 0000000..3708d4f --- /dev/null +++ b/internal/server/lifecycle.go @@ -0,0 +1,154 @@ +package server + +import ( + "context" + "fmt" + "sync" + "time" +) + +type LifecycleState string + +const ( + LifecycleStarting LifecycleState = "starting" + LifecycleServing LifecycleState = "serving" + LifecycleDraining LifecycleState = "draining" +) + +type LifecycleSnapshot struct { + State LifecycleState `json:"state"` + Started bool `json:"started"` + Draining bool `json:"draining"` + Reason string `json:"reason,omitempty"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type ReadinessCheck func(context.Context) error + +type namedReadinessCheck struct { + name string + fn ReadinessCheck +} + +type Lifecycle struct { + mu sync.RWMutex + checks []namedReadinessCheck + + started bool + draining bool + reason string + updatedAt time.Time +} + +func NewLifecycle() *Lifecycle { + return &Lifecycle{updatedAt: time.Now().UTC()} +} + +func (l *Lifecycle) MarkStarted() { + if l == nil { + return + } + l.mu.Lock() + defer l.mu.Unlock() + l.started = true + l.updatedAt = time.Now().UTC() +} + +func (l *Lifecycle) StartDraining(reason string) { + if l == nil { + return + } + if reason == "" { + reason = "draining" + } + l.mu.Lock() + defer l.mu.Unlock() + l.draining = true + l.reason = reason + l.updatedAt = time.Now().UTC() +} + +func (l *Lifecycle) Snapshot() LifecycleSnapshot { + if l == nil { + return LifecycleSnapshot{State: LifecycleStarting, UpdatedAt: time.Now().UTC()} + } + l.mu.RLock() + defer l.mu.RUnlock() + state := LifecycleStarting + if l.draining { + state = LifecycleDraining + } else if l.started { + state = LifecycleServing + } + return LifecycleSnapshot{ + State: state, + Started: l.started, + Draining: l.draining, + Reason: l.reason, + UpdatedAt: l.updatedAt, + } +} + +func (l *Lifecycle) AddReadinessCheck(name string, fn ReadinessCheck) { + if l == nil || fn == nil { + return + } + if name == "" { + name = "readiness" + } + l.mu.Lock() + defer l.mu.Unlock() + l.checks = append(l.checks, namedReadinessCheck{name: name, fn: fn}) +} + +func (l *Lifecycle) readinessChecks() []namedReadinessCheck { + if l == nil { + return nil + } + l.mu.RLock() + defer l.mu.RUnlock() + return append([]namedReadinessCheck(nil), l.checks...) +} + +func (l *Lifecycle) servingError() error { + snap := l.Snapshot() + if !snap.Started { + return fmt.Errorf("startup not complete") + } + if snap.Draining { + if snap.Reason != "" { + return fmt.Errorf("draining: %s", snap.Reason) + } + return fmt.Errorf("draining") + } + return nil +} + +func (s *Server) Lifecycle() *Lifecycle { + if s.lifecycle == nil { + s.lifecycle = NewLifecycle() + } + return s.lifecycle +} + +func (s *Server) AttachLifecycle(l *Lifecycle) { + if l == nil { + l = NewLifecycle() + } + s.lifecycle = l +} + +func (s *Server) MarkStarted() { s.Lifecycle().MarkStarted() } + +func (s *Server) StartDraining(reason string) { s.Lifecycle().StartDraining(reason) } + +func (s *Server) AddReadinessCheck(name string, fn ReadinessCheck) { + s.Lifecycle().AddReadinessCheck(name, fn) +} + +func (s *GRPCServer) AttachLifecycle(l *Lifecycle) { + if l == nil { + l = NewLifecycle() + } + s.lifecycle = l +} diff --git a/internal/server/server.go b/internal/server/server.go index b5237ac..4fc0fc8 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -42,6 +42,7 @@ import ( // when the server runs in Raft mode. nil means single-node. type Cluster interface { IsLeader() bool + LeaderInfo() (id, addr string) LeaderHTTPAddr() string AddVoter(id, addr string, timeout time.Duration) error RemoveServer(id string, timeout time.Duration) error @@ -81,6 +82,7 @@ type Server struct { validator *auth.Validator // nil when auth disabled (dev mode) metrics *metrics.Metrics // nil when metrics disabled backups BackupSchedulerStatusProvider + lifecycle *Lifecycle } type BackupSchedulerStatusProvider interface { @@ -130,7 +132,7 @@ func (s *Server) AttachBackupScheduler(p BackupSchedulerStatusProvider) { s.back func (s *Server) AttachMetrics(m *metrics.Metrics) { s.metrics = m } func New(db *pebble.DB, cat *catalog.Catalog) *Server { - return &Server{db: db, cat: cat} + return &Server{db: db, cat: cat, lifecycle: NewLifecycle()} } // AttachManager wires the multi-shard manager. Pass nil to keep the @@ -237,6 +239,10 @@ func (s *Server) AttachAuth(v *auth.Validator) { s.validator = v } var publicPaths = map[string]bool{ "/v1/Health": true, "/v1/cluster/status": true, + "/livez": true, + "/readyz": true, + "/startupz": true, + "/raftz": true, } // Routes attaches cefas HTTP endpoints onto mux. Path layout follows @@ -303,6 +309,10 @@ func (s *Server) Routes(mux *http.ServeMux) { register("/v1/DeleteBackup", backupHandlers.HandleDeleteBackup) register("/v1/ApplyBackupRetention", backupHandlers.HandleApplyBackupRetention) register("/v1/Health", s.handleHealth) + register("/livez", s.handleLivez) + register("/readyz", s.handleReadyz) + register("/startupz", s.handleStartupz) + register("/raftz", s.handleRaftz) clusterHandlers := clusterhttp.New(s.cluster, s.manager, writeWriteErr, s.clusterStatusExtras) register("/v1/cluster/status", clusterHandlers.HandleStatus) register("/v1/cluster/AddVoter", clusterHandlers.HandleAddVoter) @@ -672,6 +682,9 @@ func mapWriteErr(err error) int { if errors.Is(err, craft.ErrNotLeader) { return http.StatusServiceUnavailable } + if errors.Is(err, pebble.ErrDraining) { + return http.StatusServiceUnavailable + } if errors.Is(err, pebble.ErrThrottled) { return http.StatusTooManyRequests } diff --git a/internal/storage/adapter/pebble/db.go b/internal/storage/adapter/pebble/db.go index 59ef707..0de396b 100644 --- a/internal/storage/adapter/pebble/db.go +++ b/internal/storage/adapter/pebble/db.go @@ -36,6 +36,11 @@ var _ storage.Engine = (*DB)(nil) // carrying the leader's HTTP URL when known. var ErrNotLeader = errors.New("cefas/storage: not leader") +// ErrDraining is returned by caller-facing write APIs after the +// process has entered shutdown drain. Raft FSM apply paths bypass this +// gate so already-committed entries can still flush cleanly. +var ErrDraining = errors.New("cefas/storage: writes disabled") + // ErrNotFound is the sentinel for missing keys — re-exported from Pebble. var ErrNotFound = pebbledb.ErrNotFound @@ -53,6 +58,19 @@ func (e *NotLeaderError) Error() string { func (e *NotLeaderError) LeaderHTTPAddr() string { return e.LeaderURL } func (e *NotLeaderError) Is(target error) bool { return target == ErrNotLeader } +type DrainingError struct { + Reason string +} + +func (e *DrainingError) Error() string { + if e.Reason == "" { + return ErrDraining.Error() + } + return ErrDraining.Error() + ": " + e.Reason +} + +func (e *DrainingError) Is(target error) bool { return target == ErrDraining } + // Options configures the engine. Mirrors codeq's Options for parity. type Options struct { Path string @@ -103,6 +121,9 @@ type DB struct { bp backpressureController lanes *dbLanes + writeDraining atomic.Bool + writeDrainingReason atomic.Value // string + slSharesMu sync.RWMutex slSharesResolver ServiceLevelSharesResolver slSharesCache sync.Map @@ -229,6 +250,38 @@ func (d *DB) AttachStreamRetentionResolver(fn func(table string) int64) { // Must be called before any concurrent writes are in flight. func (d *DB) AttachReplicator(r Replicator) { d.repl = r } +// RejectWrites flips this handle into drain mode. New caller-facing +// writes fail fast, while ApplyCommittedBatch continues to accept +// already-committed Raft entries during shutdown. +func (d *DB) RejectWrites(reason string) { + if d == nil { + return + } + if reason == "" { + reason = "draining" + } + d.writeDrainingReason.Store(reason) + d.writeDraining.Store(true) +} + +// AllowWrites clears drain mode. Tests use it to prove the gate is +// reversible; production normally only calls RejectWrites during exit. +func (d *DB) AllowWrites() { + if d == nil { + return + } + d.writeDraining.Store(false) + d.writeDrainingReason.Store("") +} + +func (d *DB) rejectWriteErr() error { + if d == nil || !d.writeDraining.Load() { + return nil + } + reason, _ := d.writeDrainingReason.Load().(string) + return &DrainingError{Reason: reason} +} + func (d *DB) Close() error { if d == nil || d.db == nil { return nil @@ -321,6 +374,9 @@ func (d *DB) hasNoLane(key []byte) (bool, error) { // Set writes a single key/value. With a replicator attached, it flows // through Replicate as a 1-op batch. func (d *DB) Set(key, value []byte) error { + if err := d.rejectWriteErr(); err != nil { + return err + } if d.repl != nil { if !d.repl.IsLeader() { return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()} @@ -339,6 +395,9 @@ func (d *DB) Set(key, value []byte) error { // Delete removes a key. func (d *DB) Delete(key []byte) error { + if err := d.rejectWriteErr(); err != nil { + return err + } if d.repl != nil { if !d.repl.IsLeader() { return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()} @@ -370,6 +429,9 @@ func (d *DB) Batch() *pebbledb.Batch { return d.db.NewBatch() } // commitLoop's maxMergeBatch coalescer. Callers pay only the cheap // goroutine park on <-req.done, which scales freely. func (d *DB) CommitBatch(b *pebbledb.Batch) error { + if err := d.rejectWriteErr(); err != nil { + return err + } if d.workload != nil { d.workload.recordWrite() } @@ -388,6 +450,9 @@ func (d *DB) CommitBatch(b *pebbledb.Batch) error { // responsible for ensuring the receiving node is the right owner — // followers will not see the resulting state. func (d *DB) CommitBatchLocal(b *pebbledb.Batch) error { + if err := d.rejectWriteErr(); err != nil { + return err + } if d.workload != nil { d.workload.recordWrite() } diff --git a/internal/storage/adapter/pebble/storage_test.go b/internal/storage/adapter/pebble/storage_test.go index 2fb8098..346b025 100644 --- a/internal/storage/adapter/pebble/storage_test.go +++ b/internal/storage/adapter/pebble/storage_test.go @@ -90,6 +90,37 @@ func TestPutGetDelete(t *testing.T) { } } +func TestRejectWritesBlocksCallerWritesButAllowsCommittedApply(t *testing.T) { + db := openTestDB(t) + + b := db.Batch() + if err := b.Set([]byte("cefas/data/applied"), []byte("ok"), nil); err != nil { + t.Fatalf("batch set: %v", err) + } + repr := append([]byte(nil), b.Repr()...) + _ = b.Close() + + db.RejectWrites("shutdown") + if err := db.Set([]byte("cefas/data/direct"), []byte("blocked")); !errors.Is(err, pebble.ErrDraining) { + t.Fatalf("Set while draining error = %v, want ErrDraining", err) + } + if err := db.ApplyCommittedBatch(repr); err != nil { + t.Fatalf("ApplyCommittedBatch while draining: %v", err) + } + got, err := db.Get([]byte("cefas/data/applied")) + if err != nil { + t.Fatalf("Get applied: %v", err) + } + if string(got) != "ok" { + t.Fatalf("applied value = %q, want ok", got) + } + + db.AllowWrites() + if err := db.Set([]byte("cefas/data/direct"), []byte("allowed")); err != nil { + t.Fatalf("Set after AllowWrites: %v", err) + } +} + func TestQueryByPK(t *testing.T) { db := openTestDB(t) ks := types.KeySchema{PK: "user_id", SK: "ts"}