Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 121 additions & 7 deletions cmd/cefasdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -127,10 +129,13 @@ func main() {
mtlsCA = flag.String("mtls-ca", "", "Path to a client-CA bundle. When set, the gRPC listener requires mTLS.")

// Observability + config.
configPath = flag.String("config", "", "Path to YAML config file. Flag/env values override the file.")
metricsOff = flag.Bool("metrics-disabled", false, "Disable the /metrics Prometheus endpoint.")
tracingURL = flag.String("tracing-endpoint", "", "OTLP/gRPC collector endpoint (e.g. 'jaeger:4317'). Empty disables tracing.")
tracingIns = flag.Bool("tracing-insecure", true, "Disable TLS to the OTLP collector.")
configPath = flag.String("config", "", "Path to YAML config file. Flag/env values override the file.")
metricsOff = flag.Bool("metrics-disabled", false, "Disable the /metrics Prometheus endpoint.")
tracingURL = flag.String("tracing-endpoint", "", "OTLP/gRPC collector endpoint (e.g. 'jaeger:4317'). Empty disables tracing.")
tracingIns = flag.Bool("tracing-insecure", true, "Disable TLS to the OTLP collector.")
shutdownGracePeriod = flag.Duration("shutdown-grace-period", 0, "Grace period for HTTP/gRPC/Raft/storage shutdown. 0 inherits config/default.")
shutdownDrainDelay = flag.Duration("shutdown-drain-delay", -1, "Delay after readiness is dropped before listener shutdown. Negative inherits config/default; 0 disables delay.")
shutdownLeadershipTransfer = flag.Duration("shutdown-leadership-transfer-timeout", 0, "Per-shard leadership transfer timeout during shutdown. 0 inherits config/default.")

// pprof debug listener. Default empty = disabled. Recommended
// bind '127.0.0.1:6060' for benchmark runs; binding to a
Expand Down Expand Up @@ -210,6 +215,7 @@ func main() {
*raftIdentityLeaseBackend, *raftIdentityLeasePath, *raftIdentityLeaseName,
*raftIdentityLeaseNamespace, *raftIdentityLeaseAPIURL,
*raftIdentityLeaseTTL, *raftIdentityLeaseRenew)
bootstrapserver.OverlayLifecycleFlags(&cfg, *shutdownGracePeriod, *shutdownDrainDelay, *shutdownLeadershipTransfer)

// Initialise tracing first so subsequent setup gets spans on
// failure. tracingShutdown is a no-op when no endpoint is set.
Expand Down Expand Up @@ -247,6 +253,7 @@ func main() {
leaseCtx, leaseCancel := context.WithCancel(context.Background())
defer leaseCancel()
leaseLost := make(chan error, 1)
var leaseHealthy atomic.Bool
var raftIdentityGuard *identitylease.Guard
if raftIdentityLeaseRequired(cfg) {
raftIdentityGuard, err = identitylease.Acquire(context.Background(), identitylease.Options{
Expand Down Expand Up @@ -274,9 +281,11 @@ func main() {
logger.Error("release raft identity lease", "err", err, "raftID", cfg.Cluster.SelfID)
}
}()
leaseHealthy.Store(true)
rec := raftIdentityGuard.Record()
logger.Info("raft identity lease acquired", "raftID", rec.NodeID, "holder", rec.HolderID, "epoch", rec.Epoch, "backend", rec.Backend, "resource", rec.Resource, "expires", rec.ExpiresAt)
go raftIdentityGuard.RenewLoop(leaseCtx, func(err error) {
leaseHealthy.Store(false)
logger.Error("raft identity lease lost", "err", err, "raftID", cfg.Cluster.SelfID)
select {
case leaseLost <- err:
Expand Down Expand Up @@ -481,6 +490,21 @@ func main() {
}
}
}
if raftIdentityGuard != nil {
apiSrv.AddReadinessCheck("raft_identity_lease", func(context.Context) error {
if !leaseHealthy.Load() {
return fmt.Errorf("raft identity lease not healthy")
}
rec := raftIdentityGuard.Record()
if rec.NodeID == "" || rec.HolderID == "" || rec.Epoch == 0 {
return fmt.Errorf("raft identity lease incomplete")
}
if !rec.ExpiresAt.IsZero() && time.Now().UTC().After(rec.ExpiresAt) {
return fmt.Errorf("raft identity lease expired at %s", rec.ExpiresAt.Format(time.RFC3339Nano))
}
return nil
})
}
if cfg.BackupScheduler.Enabled {
go backupScheduler.Run(runtimeCtx)
logger.Info("scheduled backups enabled", "interval", cfg.BackupScheduler.Interval, "dryRun", cfg.BackupScheduler.DryRun, "template", cfg.BackupScheduler.NameTemplate, "tables", cfg.BackupScheduler.Tables)
Expand Down Expand Up @@ -519,6 +543,7 @@ func main() {

// gRPC listener (optional).
var gsrv *grpc.Server
var gsrvImpl *server.GRPCServer
if cfg.GRPC.Addr != "" {
// Workload prioritization (#499): wire the per-SL quota
// controller against the catalog. Hot reload via
Expand All @@ -540,7 +565,8 @@ func main() {
clu = sh.Raft
}
}
gsrvImpl := server.NewGRPCServer(db, cat, clu)
gsrvImpl = server.NewGRPCServer(db, cat, clu)
gsrvImpl.AttachLifecycle(apiSrv.Lifecycle())
if mgr != nil {
gsrvImpl.AttachManager(mgr)
}
Expand Down Expand Up @@ -569,6 +595,7 @@ func main() {
}
}()
}
apiSrv.MarkStarted()

stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -580,15 +607,35 @@ func main() {
shutdownReason = "raft identity lease lost: " + err.Error()
}
logger.Info("shutting down", "reason", shutdownReason)
apiSrv.StartDraining(shutdownReason)
rejectWrites(db, mgr, shutdownReason)
if cfg.Lifecycle.DrainDelay > 0 {
time.Sleep(cfg.Lifecycle.DrainDelay)
}
if err := transferLeadershipOnShutdown(context.Background(), raftDB, mgr, cfg.Cluster.SelfID, cfg.Cluster.Peers, cfg.Lifecycle.LeadershipTransferTimeout); err != nil {
logger.Error("shutdown leadership transfer", "err", err)
}
leaseCancel()
runtimeCancel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Lifecycle.ShutdownGracePeriod)
defer cancel()
_ = srv.Shutdown(ctx)
_ = server.ShutdownPprof(ctx, pprofSrv)
if gsrvImpl != nil {
gsrvImpl.StopMVScheduler()
}
if gsrv != nil {
gsrv.GracefulStop()
done := make(chan struct{})
go func() {
gsrv.GracefulStop()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
gsrv.Stop()
}
}
}

Expand All @@ -598,3 +645,70 @@ func raftIdentityLeaseRequired(cfg config.Config) bool {
}
return cfg.Cluster.Shards > 0 || cfg.Raft.Bind != ""
}

func rejectWrites(db *pebble.DB, mgr *cluster.Manager, reason string) {
if mgr == nil {
if db != nil {
db.RejectWrites(reason)
}
return
}
for _, sh := range mgr.Shards() {
if sh != nil && sh.Storage != nil {
sh.Storage.RejectWrites(reason)
}
}
}

func transferLeadershipOnShutdown(ctx context.Context, raftDB *craft.DB, mgr *cluster.Manager, selfID string, peers map[string]string, timeout time.Duration) error {
if timeout <= 0 {
return nil
}
if mgr == nil {
if raftDB == nil || !raftDB.IsLeader() {
return nil
}
targetID, targetAddr := firstPeerTarget(selfID, peers, nil)
if targetID == "" {
return nil
}
return raftDB.TransferLeadership(targetID, targetAddr, timeout)
}
var errs []error
for _, sh := range mgr.Shards() {
if err := ctx.Err(); err != nil {
return err
}
if sh == nil || sh.Raft == nil || !sh.Raft.IsLeader() {
continue
}
targetID, targetAddr := firstPeerTarget(selfID, peers, sh.Voters)
if targetID == "" {
continue
}
if err := sh.Raft.TransferLeadership(targetID, targetAddr, timeout); err != nil {
errs = append(errs, fmt.Errorf("shard %d: %w", sh.ID, err))
}
}
return errors.Join(errs...)
}

func firstPeerTarget(selfID string, peers map[string]string, voters []string) (string, string) {
if len(voters) > 0 {
for _, id := range voters {
if id == "" || id == selfID {
continue
}
if addr := peers[id]; addr != "" {
return id, addr
}
}
return "", ""
}
for id, addr := range peers {
if id != "" && id != selfID && addr != "" {
return id, addr
}
}
return "", ""
}
4 changes: 4 additions & 0 deletions dist/helm/cefas/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ data:
data: /var/lib/cefas
http:
addr: ":{{ .Values.service.http }}"
lifecycle:
shutdownGracePeriod: {{ .Values.lifecycle.shutdownGracePeriod | quote }}
drainDelay: {{ .Values.lifecycle.drainDelay | quote }}
leadershipTransferTimeout: {{ .Values.lifecycle.leadershipTransferTimeout | quote }}
grpc:
addr: ":{{ .Values.service.grpc }}"
reflection: true
Expand Down
2 changes: 1 addition & 1 deletion dist/helm/cefas/templates/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ metadata:
spec:
type: ClusterIP
clusterIP: None
publishNotReadyAddresses: true
publishNotReadyAddresses: {{ .Values.service.headless.publishNotReadyAddresses }}
selector:
{{- include "cefas.selectorLabels" . | nindent 4 }}
ports:
Expand Down
4 changes: 3 additions & 1 deletion dist/helm/cefas/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
serviceAccountName: {{ include "cefas.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.securityContext | nindent 8 }}
terminationGracePeriodSeconds: 30
terminationGracePeriodSeconds: {{ .Values.lifecycle.terminationGracePeriodSeconds }}
containers:
- name: cefas
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
Expand Down Expand Up @@ -66,6 +66,8 @@ spec:
{{- toYaml .Values.livenessProbe | nindent 12 }}
readinessProbe:
{{- toYaml .Values.readinessProbe | nindent 12 }}
startupProbe:
{{- toYaml .Values.startupProbe | nindent 12 }}
volumeMounts:
- name: data
mountPath: /var/lib/cefas
Expand Down
24 changes: 21 additions & 3 deletions dist/helm/cefas/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ service:
http: 8080
grpc: 9090
mux: 9100
headless:
# Required for StatefulSet peer bootstrap: Pods can resolve each
# other before readiness passes. Client traffic uses the regular
# service above, which does not publish non-ready endpoints.
publishNotReadyAddresses: true

lifecycle:
terminationGracePeriodSeconds: 30
shutdownGracePeriod: 25s
drainDelay: 2s
leadershipTransferTimeout: 5s

# Multi-Raft sharding. shards>1 turns on the mux transport.
cluster:
Expand Down Expand Up @@ -85,17 +96,24 @@ securityContext:
runAsNonRoot: true
runAsUser: 65532

# Probes target the cheap /v1/Health endpoint.
livenessProbe:
httpGet:
path: /v1/Health
path: /livez
port: 8080
initialDelaySeconds: 15
periodSeconds: 30

readinessProbe:
httpGet:
path: /v1/Health
path: /readyz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 1

startupProbe:
httpGet:
path: /startupz
port: 8080
failureThreshold: 30
periodSeconds: 2
17 changes: 17 additions & 0 deletions internal/bootstrap/server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,23 @@ func OverlayRaftIdentityLeaseFlags(
}
}

// OverlayLifecycleFlags keeps shutdown/drain tuning separate from the
// large historical OverlayFlags signature.
func OverlayLifecycleFlags(
cfg *config.Config,
shutdownGracePeriod, drainDelay, leadershipTransferTimeout time.Duration,
) {
if shutdownGracePeriod > 0 {
cfg.Lifecycle.ShutdownGracePeriod = shutdownGracePeriod
}
if drainDelay >= 0 {
cfg.Lifecycle.DrainDelay = drainDelay
}
if leadershipTransferTimeout > 0 {
cfg.Lifecycle.LeadershipTransferTimeout = leadershipTransferTimeout
}
}

// SplitCSVFlag splits a comma-separated CLI flag value into a trimmed
// slice. Blank entries are dropped, so "a, ,b" yields ["a","b"].
func SplitCSVFlag(in string) []string {
Expand Down
15 changes: 15 additions & 0 deletions internal/bootstrap/server/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,21 @@ func TestOverlayRaftIdentityLeaseFlags(t *testing.T) {
}
}

func TestOverlayLifecycleFlags(t *testing.T) {
cfg := baseCfg()
OverlayLifecycleFlags(&cfg, 20*time.Second, 500*time.Millisecond, 3*time.Second)

if cfg.Lifecycle.ShutdownGracePeriod != 20*time.Second {
t.Errorf("ShutdownGracePeriod = %v", cfg.Lifecycle.ShutdownGracePeriod)
}
if cfg.Lifecycle.DrainDelay != 500*time.Millisecond {
t.Errorf("DrainDelay = %v", cfg.Lifecycle.DrainDelay)
}
if cfg.Lifecycle.LeadershipTransferTimeout != 3*time.Second {
t.Errorf("LeadershipTransferTimeout = %v", cfg.Lifecycle.LeadershipTransferTimeout)
}
}

func TestOverlayFlags_PeerSetGroup(t *testing.T) {
cfg := baseCfg()
args := zeroArgs()
Expand Down
11 changes: 11 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Config struct {
HTTP struct {
Addr string `yaml:"addr"`
} `yaml:"http"`
Lifecycle struct {
ShutdownGracePeriod time.Duration `yaml:"shutdownGracePeriod"`
DrainDelay time.Duration `yaml:"drainDelay"`
LeadershipTransferTimeout time.Duration `yaml:"leadershipTransferTimeout"`
} `yaml:"lifecycle"`
GRPC struct {
Addr string `yaml:"addr"`
Reflection bool `yaml:"reflection"`
Expand Down Expand Up @@ -157,6 +162,9 @@ func Defaults() Config {
var c Config
c.Data = "./cefas-data"
c.HTTP.Addr = ":8080"
c.Lifecycle.ShutdownGracePeriod = 25 * time.Second
c.Lifecycle.DrainDelay = 2 * time.Second
c.Lifecycle.LeadershipTransferTimeout = 5 * time.Second
c.Identity.ClockSkew = 30 * time.Second
c.Metrics.Enabled = true
c.Metrics.HotspotBuckets = 64
Expand Down Expand Up @@ -284,6 +292,9 @@ func ApplyEnv(cfg *Config) error {

cfg.Data = str("DATA", cfg.Data)
cfg.HTTP.Addr = str("HTTP_ADDR", cfg.HTTP.Addr)
cfg.Lifecycle.ShutdownGracePeriod = dur("LIFECYCLE_SHUTDOWN_GRACE_PERIOD", cfg.Lifecycle.ShutdownGracePeriod)
cfg.Lifecycle.DrainDelay = dur("LIFECYCLE_DRAIN_DELAY", cfg.Lifecycle.DrainDelay)
cfg.Lifecycle.LeadershipTransferTimeout = dur("LIFECYCLE_LEADERSHIP_TRANSFER_TIMEOUT", cfg.Lifecycle.LeadershipTransferTimeout)
cfg.GRPC.Addr = str("GRPC_ADDR", cfg.GRPC.Addr)
cfg.GRPC.Reflection = boolean("GRPC_REFLECTION", cfg.GRPC.Reflection)
cfg.GRPC.TLSCertPath = str("GRPC_TLS_CERT", cfg.GRPC.TLSCertPath)
Expand Down
Loading
Loading