diff --git a/cmd/cefas-manager/main.go b/cmd/cefas-manager/main.go new file mode 100644 index 0000000..8f84453 --- /dev/null +++ b/cmd/cefas-manager/main.go @@ -0,0 +1,407 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/spf13/cobra" + + mgr "github.com/CefasDb/cefasdb/internal/manager" + "github.com/CefasDb/cefasdb/internal/placement" + "github.com/CefasDb/cefasdb/pkg/client" +) + +type config struct { + endpoint string + httpEndpoint string + token string + tokenFile string + insecure bool + timeout time.Duration + namespace string + selector string + kubeDisabled bool + output string + managerID string + leaderElection bool + leaderLease string + leaderTTL time.Duration + auditLog string + approveFencing bool + interval time.Duration + repairMode string +} + +func main() { + if err := rootCmd().Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func rootCmd() *cobra.Command { + cfg := defaultConfig() + cmd := &cobra.Command{Use: "cefas-manager", Short: "CefasDB health, repair, and reconciliation manager"} + cmd.PersistentPreRunE = func(_ *cobra.Command, _ []string) error { + if cfg.token == "" && cfg.tokenFile != "" { + data, err := os.ReadFile(cfg.tokenFile) + if err != nil { + return err + } + cfg.token = strings.TrimSpace(string(data)) + } + return nil + } + f := cmd.PersistentFlags() + f.StringVar(&cfg.endpoint, "endpoint", cfg.endpoint, "Cefas gRPC endpoint") + f.StringVar(&cfg.httpEndpoint, "http-endpoint", cfg.httpEndpoint, "Cefas HTTP endpoint for placement audit") + f.StringVar(&cfg.token, "token", cfg.token, "bearer token for cluster-admin APIs") + f.StringVar(&cfg.tokenFile, "token-file", cfg.tokenFile, "file containing bearer token") + f.BoolVar(&cfg.insecure, "insecure", cfg.insecure, "use plaintext gRPC") + f.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "per-operation timeout") + f.StringVar(&cfg.namespace, "namespace", cfg.namespace, "Kubernetes namespace") + f.StringVar(&cfg.selector, "selector", cfg.selector, "Kubernetes label selector for Cefas resources") + f.BoolVar(&cfg.kubeDisabled, "kube-disabled", cfg.kubeDisabled, "skip Kubernetes snapshot") + f.StringVar(&cfg.output, "output", cfg.output, "output format: json or text") + f.StringVar(&cfg.managerID, "manager-id", cfg.managerID, "leader-election holder identity") + f.BoolVar(&cfg.leaderElection, "leader-election", cfg.leaderElection, "acquire Kubernetes leader lease before active operations") + f.StringVar(&cfg.leaderLease, "leader-lease-name", cfg.leaderLease, "Kubernetes lease name for manager leadership") + f.DurationVar(&cfg.leaderTTL, "leader-lease-ttl", cfg.leaderTTL, "Kubernetes leader lease TTL") + f.StringVar(&cfg.auditLog, "audit-log", cfg.auditLog, "JSONL audit log path for repair execution") + cmd.AddCommand(doctorCmd(&cfg), repairCmd(&cfg), controllerCmd(&cfg)) + return cmd +} + +func doctorCmd(cfg *config) *cobra.Command { + return &cobra.Command{ + Use: "doctor", + Short: "Report CefasDB and Kubernetes health", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), cfg.timeout) + defer cancel() + clients, err := buildClients(ctx, cfg) + if err != nil { + return err + } + defer clients.close() + report, err := mgr.Doctor(ctx, mgr.DoctorOptions{ + Cefas: clients.cefas, + Kubernetes: clients.kube, + Kube: mgr.KubeSnapshotOptions{Namespace: cfg.namespace, Selector: cfg.selector}, + Audit: placement.PlacementAuditRequest{IncludeRepairPlan: true}, + }) + if err != nil { + return err + } + return writeOutput(cmd, cfg.output, report) + }, + } +} + +func repairCmd(cfg *config) *cobra.Command { + var dryRun bool + var execute bool + c := &cobra.Command{ + Use: "repair", + Short: "Plan or execute guarded repairs", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + if dryRun && execute { + return fmt.Errorf("--dry-run and --execute are mutually exclusive") + } + if !dryRun && !execute { + dryRun = true + } + ctx, cancel := context.WithTimeout(cmd.Context(), cfg.timeout) + defer cancel() + clients, err := buildClients(ctx, cfg) + if err != nil { + return err + } + defer clients.close() + report, err := mgr.Doctor(ctx, mgr.DoctorOptions{ + Cefas: clients.cefas, + Kubernetes: clients.kube, + Kube: mgr.KubeSnapshotOptions{Namespace: cfg.namespace, Selector: cfg.selector}, + Audit: placement.PlacementAuditRequest{IncludeRepairPlan: true}, + }) + if err != nil { + return err + } + leader := mgr.LeaderLease{} + if execute { + if !cfg.leaderElection { + return fmt.Errorf("repair --execute requires --leader-election") + } + leader, err = clients.acquireLeader(ctx) + if err != nil { + return err + } + } + opts := mgr.RepairOptions{Cefas: clients.cefas, Report: report, Leader: leader, ApproveFencing: cfg.approveFencing, AuditLogPath: cfg.auditLog, Timeout: cfg.timeout} + var result mgr.RepairResult + if execute { + result, err = mgr.ExecuteRepair(ctx, opts) + if err != nil { + _ = writeOutput(cmd, cfg.output, result) + return err + } + } else { + result = mgr.DryRunRepair(opts) + } + return writeOutput(cmd, cfg.output, result) + }, + } + c.Flags().BoolVar(&dryRun, "dry-run", false, "emit an ordered repair plan without mutations") + c.Flags().BoolVar(&execute, "execute", false, "execute supported repair actions after all guards pass") + c.Flags().BoolVar(&cfg.approveFencing, "approve-fencing", cfg.approveFencing, "confirm fencing/provider state for sensitive actions") + return c +} + +func controllerCmd(cfg *config) *cobra.Command { + c := &cobra.Command{ + Use: "controller", + Short: "Run the leader-elected manager loop", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + cfg.leaderElection = true + ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM) + defer stop() + clients, err := buildClients(ctx, cfg) + if err != nil { + return err + } + defer clients.close() + ticker := time.NewTicker(cfg.interval) + defer ticker.Stop() + for { + if err := runControllerTick(ctx, cmd, cfg, clients); err != nil { + fmt.Fprintf(cmd.ErrOrStderr(), "controller tick: %v\n", err) + } + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + } + }, + } + c.Flags().DurationVar(&cfg.interval, "interval", cfg.interval, "controller reconciliation interval") + c.Flags().StringVar(&cfg.repairMode, "repair-mode", cfg.repairMode, "observe, dry-run, or execute") + c.Flags().BoolVar(&cfg.approveFencing, "approve-fencing", cfg.approveFencing, "confirm fencing/provider state for sensitive actions in execute mode") + return c +} + +func runControllerTick(ctx context.Context, cmd *cobra.Command, cfg *config, clients runtimeClients) error { + leader, err := clients.acquireLeader(ctx) + if err != nil { + return err + } + if !leader.Acquired { + return writeJSONLine(cmd.OutOrStdout(), map[string]any{"at": time.Now().UTC(), "event": "standby", "holder": leader.Holder}) + } + tickCtx, cancel := context.WithTimeout(ctx, cfg.timeout) + defer cancel() + report, err := mgr.Doctor(tickCtx, mgr.DoctorOptions{ + Cefas: clients.cefas, + Kubernetes: clients.kube, + Kube: mgr.KubeSnapshotOptions{Namespace: cfg.namespace, Selector: cfg.selector}, + Audit: placement.PlacementAuditRequest{IncludeRepairPlan: true}, + }) + if err != nil { + return err + } + switch cfg.repairMode { + case "observe": + return writeJSONLine(cmd.OutOrStdout(), map[string]any{"at": time.Now().UTC(), "event": "doctor", "classification": report.Classification, "signals": len(report.Signals)}) + case "dry-run": + return writeJSONLine(cmd.OutOrStdout(), mgr.DryRunRepair(mgr.RepairOptions{Report: report, Leader: leader, AuditLogPath: cfg.auditLog})) + case "execute": + result, err := mgr.ExecuteRepair(tickCtx, mgr.RepairOptions{Cefas: clients.cefas, Report: report, Leader: leader, ApproveFencing: cfg.approveFencing, AuditLogPath: cfg.auditLog, Timeout: cfg.timeout}) + _ = writeJSONLine(cmd.OutOrStdout(), result) + return err + default: + return fmt.Errorf("invalid --repair-mode %q", cfg.repairMode) + } +} + +type runtimeClients struct { + grpc *client.Client + cefas *mgr.SDKCefas + kube mgr.Kubernetes + lead mgr.LeaderElector +} + +func (c runtimeClients) close() { + if c.grpc != nil { + _ = c.grpc.Close() + } +} + +func (c runtimeClients) acquireLeader(ctx context.Context) (mgr.LeaderLease, error) { + if c.lead == nil { + return mgr.LeaderLease{}, fmt.Errorf("leader election is not configured") + } + return c.lead.Acquire(ctx) +} + +func buildClients(ctx context.Context, cfg *config) (runtimeClients, error) { + opts := []client.Option{} + if cfg.insecure { + opts = append(opts, client.WithPlaintext()) + } + if cfg.token != "" { + opts = append(opts, client.WithBearer(cfg.token)) + } + grpcClient, err := client.Dial(ctx, cfg.endpoint, opts...) + if err != nil { + return runtimeClients{}, err + } + audit, err := mgr.NewHTTPAuditClient(cfg.httpEndpoint, cfg.token, http.DefaultClient) + if err != nil { + _ = grpcClient.Close() + return runtimeClients{}, err + } + clients := runtimeClients{grpc: grpcClient, cefas: &mgr.SDKCefas{GRPC: grpcClient, Audit: audit}} + if !cfg.kubeDisabled || cfg.leaderElection { + kubeClient, namespace, err := mgr.NewInClusterKubeClient() + if err != nil { + if cfg.leaderElection { + _ = grpcClient.Close() + return runtimeClients{}, err + } + } else { + if cfg.namespace == "" { + cfg.namespace = namespace + } + clients.kube = kubeClient + if cfg.leaderElection { + clients.lead = &mgr.KubeLeaderElector{ + Client: kubeClient, + Opts: mgr.LeaderElectionOptions{ + Namespace: cfg.namespace, + Name: cfg.leaderLease, + HolderID: cfg.managerID, + TTL: cfg.leaderTTL, + Labels: map[string]string{"app.kubernetes.io/component": "cefas-manager"}, + }, + } + } + } + } + return clients, nil +} + +func defaultConfig() config { + host, _ := os.Hostname() + if host == "" { + host = "cefas-manager" + } + return config{ + endpoint: envDefault("CEFAS_ENDPOINT", "127.0.0.1:9090"), + httpEndpoint: envDefault("CEFAS_HTTP_ENDPOINT", "http://127.0.0.1:8080"), + token: os.Getenv("CEFAS_TOKEN"), + tokenFile: os.Getenv("CEFAS_TOKEN_FILE"), + insecure: envBoolDefault("CEFAS_INSECURE", true), + timeout: envDurationDefault("CEFAS_MANAGER_TIMEOUT", 30*time.Second), + namespace: os.Getenv("POD_NAMESPACE"), + selector: envDefault("CEFAS_KUBE_SELECTOR", "app.kubernetes.io/name=cefas"), + output: "json", + managerID: envDefault("CEFAS_MANAGER_ID", fmt.Sprintf("%s-%d", host, os.Getpid())), + leaderElection: envBoolDefault("CEFAS_MANAGER_LEADER_ELECTION", false), + leaderLease: envDefault("CEFAS_MANAGER_LEADER_LEASE", "cefas-manager"), + leaderTTL: envDurationDefault("CEFAS_MANAGER_LEADER_TTL", 30*time.Second), + auditLog: os.Getenv("CEFAS_MANAGER_AUDIT_LOG"), + interval: envDurationDefault("CEFAS_MANAGER_INTERVAL", 30*time.Second), + repairMode: envDefault("CEFAS_MANAGER_REPAIR_MODE", "observe"), + } +} + +func writeOutput(cmd *cobra.Command, format string, v any) error { + switch format { + case "", "json": + enc := json.NewEncoder(cmd.OutOrStdout()) + enc.SetIndent("", " ") + return enc.Encode(v) + case "text": + return writeText(cmd, v) + default: + return fmt.Errorf("invalid --output %q", format) + } +} + +func writeText(cmd *cobra.Command, v any) error { + switch t := v.(type) { + case mgr.DoctorReport: + fmt.Fprintf(cmd.OutOrStdout(), "classification: %s\n", t.Classification) + for _, sig := range t.Signals { + fmt.Fprintf(cmd.OutOrStdout(), "- %s %s %s: %s\n", sig.Class, sig.Component, sig.Name, sig.Status) + if sig.Detail != "" { + fmt.Fprintf(cmd.OutOrStdout(), " %s\n", sig.Detail) + } + } + return nil + case mgr.RepairResult: + fmt.Fprintf(cmd.OutOrStdout(), "mode: %s\nclassification: %s\nactions: %d\n", t.Plan.Mode, t.Plan.Classification, len(t.Plan.Actions)) + if t.Error != "" { + fmt.Fprintf(cmd.OutOrStdout(), "error: %s\n", t.Error) + } + for _, action := range t.Plan.Actions { + fmt.Fprintf(cmd.OutOrStdout(), "- %s %s supported=%t sensitive=%t\n", action.ID, action.Type, action.Supported, action.Sensitive) + } + return nil + default: + return writeOutput(cmd, "json", v) + } +} + +func writeJSONLine(w interface{ Write([]byte) (int, error) }, v any) error { + data, err := json.Marshal(v) + if err != nil { + return err + } + _, err = w.Write(append(data, '\n')) + return err +} + +func envDefault(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envBoolDefault(key string, fallback bool) bool { + raw := os.Getenv(key) + if raw == "" { + return fallback + } + switch strings.ToLower(raw) { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return fallback + } +} + +func envDurationDefault(key string, fallback time.Duration) time.Duration { + raw := os.Getenv(key) + if raw == "" { + return fallback + } + v, err := time.ParseDuration(raw) + if err != nil { + return fallback + } + return v +} diff --git a/deploy/Dockerfile b/deploy/Dockerfile index b0e87d3..5d295c3 100644 --- a/deploy/Dockerfile +++ b/deploy/Dockerfile @@ -26,7 +26,8 @@ ENV CGO_ENABLED=0 RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH:-amd64} \ - go build -trimpath -ldflags="-s -w" -o /out/cefasdb ./cmd/cefasdb + go build -trimpath -ldflags="-s -w" -o /out/cefasdb ./cmd/cefasdb && \ + go build -trimpath -ldflags="-s -w" -o /out/cefas-manager ./cmd/cefas-manager RUN mkdir -p /out/var/lib/cefas && touch /out/var/lib/cefas/.keep @@ -44,6 +45,7 @@ LABEL org.opencontainers.image.source="https://github.com/CefasDb/cefasdb" \ org.opencontainers.image.version="${VERSION}" COPY --from=build /out/cefasdb /usr/local/bin/cefasdb +COPY --from=build /out/cefas-manager /usr/local/bin/cefas-manager COPY --from=build --chown=65532:65532 /out/var/lib/cefas /var/lib/cefas # Default data directory is bind-mounted by docker-compose / k8s; we diff --git a/dist/helm/cefas/templates/manager-deployment.yaml b/dist/helm/cefas/templates/manager-deployment.yaml new file mode 100644 index 0000000..c21d7e3 --- /dev/null +++ b/dist/helm/cefas/templates/manager-deployment.yaml @@ -0,0 +1,71 @@ +{{- if .Values.manager.enabled -}} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "cefas.fullname" . }}-manager + labels: + {{- include "cefas.labels" . | nindent 4 }} + app.kubernetes.io/component: manager +spec: + replicas: {{ .Values.manager.replicaCount }} + selector: + matchLabels: + {{- include "cefas.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: manager + template: + metadata: + labels: + {{- include "cefas.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: manager + spec: + serviceAccountName: {{ include "cefas.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.securityContext | nindent 8 }} + containers: + - name: cefas-manager + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + command: + - /usr/local/bin/cefas-manager + args: + - controller + - --leader-election + - --leader-lease-name={{ include "cefas.fullname" . }}-manager + - --leader-lease-ttl={{ .Values.manager.leaderLeaseTTL }} + - --endpoint=$(CEFAS_ENDPOINT) + - --http-endpoint=$(CEFAS_HTTP_ENDPOINT) + - --namespace=$(POD_NAMESPACE) + - --selector=$(CEFAS_KUBE_SELECTOR) + - --repair-mode={{ .Values.manager.repairMode }} + - --interval={{ .Values.manager.interval }} + - --timeout={{ .Values.manager.timeout }} + - --audit-log={{ .Values.manager.auditLog }} + - --insecure=true + {{- if .Values.manager.approveFencing }} + - --approve-fencing + {{- end }} + env: + - name: CEFAS_ENDPOINT + value: "{{ include "cefas.fullname" . }}:{{ .Values.service.grpc }}" + - name: CEFAS_HTTP_ENDPOINT + value: "http://{{ include "cefas.fullname" . }}:{{ .Values.service.http }}" + - name: CEFAS_KUBE_SELECTOR + value: "app.kubernetes.io/name={{ include "cefas.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" + - name: CEFAS_MANAGER_ID + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + {{- if .Values.manager.tokenSecretName }} + - name: CEFAS_TOKEN + valueFrom: + secretKeyRef: + name: {{ .Values.manager.tokenSecretName }} + key: {{ .Values.manager.tokenSecretKey }} + {{- end }} + resources: + {{- toYaml .Values.manager.resources | nindent 12 }} +{{- end }} diff --git a/dist/helm/cefas/templates/manager-rbac.yaml b/dist/helm/cefas/templates/manager-rbac.yaml new file mode 100644 index 0000000..4d8b0ea --- /dev/null +++ b/dist/helm/cefas/templates/manager-rbac.yaml @@ -0,0 +1,79 @@ +{{- if .Values.manager.enabled -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: {{ include "cefas.fullname" . }}-manager + labels: + {{- include "cefas.labels" . | nindent 4 }} + app.kubernetes.io/component: manager +rules: + - apiGroups: + - "" + resources: + - pods + - endpoints + - persistentvolumeclaims + verbs: + - get + - list + - watch + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - list + - watch + - create + - update + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ include "cefas.fullname" . }}-manager + labels: + {{- include "cefas.labels" . | nindent 4 }} + app.kubernetes.io/component: manager +subjects: + - kind: ServiceAccount + name: {{ include "cefas.serviceAccountName" . }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ include "cefas.fullname" . }}-manager +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "cefas.fullname" . }}-manager + labels: + {{- include "cefas.labels" . | nindent 4 }} + app.kubernetes.io/component: manager +rules: + - apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ include "cefas.fullname" . }}-manager + labels: + {{- include "cefas.labels" . | nindent 4 }} + app.kubernetes.io/component: manager +subjects: + - kind: ServiceAccount + name: {{ include "cefas.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ include "cefas.fullname" . }}-manager +{{- end }} diff --git a/dist/helm/cefas/templates/statefulset.yaml b/dist/helm/cefas/templates/statefulset.yaml index 6f425aa..66240d4 100644 --- a/dist/helm/cefas/templates/statefulset.yaml +++ b/dist/helm/cefas/templates/statefulset.yaml @@ -84,6 +84,8 @@ spec: volumeClaimTemplates: - metadata: name: data + labels: + {{- include "cefas.selectorLabels" . | nindent 10 }} spec: accessModes: ["ReadWriteOnce"] {{- if .Values.persistence.storageClass }} diff --git a/dist/helm/cefas/values.yaml b/dist/helm/cefas/values.yaml index cdac4e9..0c0fe40 100644 --- a/dist/helm/cefas/values.yaml +++ b/dist/helm/cefas/values.yaml @@ -15,6 +15,25 @@ serviceAccount: create: true name: "" +manager: + enabled: true + replicaCount: 2 + repairMode: observe + interval: 30s + timeout: 30s + leaderLeaseTTL: 30s + approveFencing: false + auditLog: /tmp/cefas-manager-audit.jsonl + tokenSecretName: "" + tokenSecretKey: token + resources: + requests: + cpu: "50m" + memory: "128Mi" + limits: + cpu: "500m" + memory: "512Mi" + resources: requests: cpu: "200m" diff --git a/internal/manager/cefas.go b/internal/manager/cefas.go new file mode 100644 index 0000000..4945d37 --- /dev/null +++ b/internal/manager/cefas.go @@ -0,0 +1,111 @@ +package manager + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/CefasDb/cefasdb/internal/placement" + "github.com/CefasDb/cefasdb/pkg/client" +) + +type Cefas interface { + Status(ctx context.Context) (client.ClusterStatus, error) + AuditPlacement(ctx context.Context, req placement.PlacementAuditRequest) (placement.PlacementAuditReport, error) + PlanPlacement(ctx context.Context, req client.PlacementPlanRequest) (client.PlacementPlan, error) + ApplyPlacement(ctx context.Context, req client.PlacementApplyRequest) (client.PlacementApplyResult, error) +} + +type SDKCefas struct { + GRPC *client.Client + Audit *HTTPAuditClient +} + +func (s *SDKCefas) Status(ctx context.Context) (client.ClusterStatus, error) { + if s == nil || s.GRPC == nil { + return client.ClusterStatus{}, fmt.Errorf("cefas gRPC client is not configured") + } + return s.GRPC.Status(ctx) +} + +func (s *SDKCefas) AuditPlacement(ctx context.Context, req placement.PlacementAuditRequest) (placement.PlacementAuditReport, error) { + if s == nil || s.Audit == nil { + return placement.PlacementAuditReport{}, fmt.Errorf("cefas audit HTTP client is not configured") + } + return s.Audit.AuditPlacement(ctx, req) +} + +func (s *SDKCefas) PlanPlacement(ctx context.Context, req client.PlacementPlanRequest) (client.PlacementPlan, error) { + if s == nil || s.GRPC == nil { + return client.PlacementPlan{}, fmt.Errorf("cefas gRPC client is not configured") + } + return s.GRPC.PlanPlacement(ctx, req) +} + +func (s *SDKCefas) ApplyPlacement(ctx context.Context, req client.PlacementApplyRequest) (client.PlacementApplyResult, error) { + if s == nil || s.GRPC == nil { + return client.PlacementApplyResult{}, fmt.Errorf("cefas gRPC client is not configured") + } + return s.GRPC.ApplyPlacement(ctx, req) +} + +type HTTPAuditClient struct { + base *url.URL + token string + httpClient *http.Client +} + +func NewHTTPAuditClient(baseURL, bearer string, httpClient *http.Client) (*HTTPAuditClient, error) { + if httpClient == nil { + httpClient = http.DefaultClient + } + if baseURL == "" { + return nil, fmt.Errorf("audit base URL is required") + } + u, err := url.Parse(strings.TrimRight(baseURL, "/")) + if err != nil { + return nil, err + } + return &HTTPAuditClient{base: u, token: bearer, httpClient: httpClient}, nil +} + +func (c *HTTPAuditClient) AuditPlacement(ctx context.Context, req placement.PlacementAuditRequest) (placement.PlacementAuditReport, error) { + if c == nil { + return placement.PlacementAuditReport{}, fmt.Errorf("audit client is nil") + } + req.IncludeRepairPlan = true + body, err := json.Marshal(req) + if err != nil { + return placement.PlacementAuditReport{}, err + } + u := *c.base + u.Path = strings.TrimRight(u.Path, "/") + "/v1/cluster/placement/audit" + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), bytes.NewReader(body)) + if err != nil { + return placement.PlacementAuditReport{}, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Accept", "application/json") + if c.token != "" { + httpReq.Header.Set("Authorization", "Bearer "+c.token) + } + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return placement.PlacementAuditReport{}, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return placement.PlacementAuditReport{}, fmt.Errorf("placement audit status=%d body=%s", resp.StatusCode, strings.TrimSpace(string(data))) + } + var report placement.PlacementAuditReport + if err := json.NewDecoder(resp.Body).Decode(&report); err != nil { + return placement.PlacementAuditReport{}, err + } + return report, nil +} diff --git a/internal/manager/doctor.go b/internal/manager/doctor.go new file mode 100644 index 0000000..986cff9 --- /dev/null +++ b/internal/manager/doctor.go @@ -0,0 +1,265 @@ +package manager + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/CefasDb/cefasdb/internal/placement" + "github.com/CefasDb/cefasdb/pkg/client" +) + +type DoctorOptions struct { + Cefas Cefas + Kubernetes Kubernetes + Kube KubeSnapshotOptions + Audit placement.PlacementAuditRequest + Now func() time.Time +} + +func Doctor(ctx context.Context, opts DoctorOptions) (DoctorReport, error) { + now := time.Now + if opts.Now != nil { + now = opts.Now + } + report := DoctorReport{GeneratedAt: now().UTC(), Classification: HealthHealthy} + if opts.Cefas == nil { + report.addSignal(Signal{Component: "cefas", Name: "client", Status: "missing", Class: HealthUnsafe, Detail: "cefas client is not configured"}) + return report, nil + } + status, err := opts.Cefas.Status(ctx) + if err != nil { + report.addSignal(Signal{Component: "cefas", Name: "cluster_status", Status: "unreachable", Class: HealthUnsafe, Detail: err.Error()}) + return report, nil + } + report.Cluster = &status + evaluateClusterStatus(&report, status) + + auditReq := opts.Audit + auditReq.IncludeRepairPlan = true + audit, err := opts.Cefas.AuditPlacement(ctx, auditReq) + if err != nil { + report.addSignal(Signal{Component: "cefas", Name: "placement_audit", Status: "unavailable", Class: HealthDegraded, Detail: err.Error()}) + } else { + report.PlacementAudit = &audit + evaluatePlacementAudit(&report, audit) + } + + if opts.Kubernetes == nil { + report.addSignal(Signal{Component: "kubernetes", Name: "snapshot", Status: "skipped", Class: HealthDegraded, Detail: "kubernetes client is not configured"}) + return report, nil + } + snap, err := opts.Kubernetes.Snapshot(ctx, opts.Kube) + if err != nil { + report.addSignal(Signal{Component: "kubernetes", Name: "snapshot", Status: "unavailable", Class: HealthDegraded, Detail: err.Error()}) + return report, nil + } + report.Kubernetes = &snap + evaluateKubernetes(&report, status, snap, report.GeneratedAt) + return report, nil +} + +func (r *DoctorReport) addSignal(sig Signal) { + r.Signals = append(r.Signals, sig) + r.Classification = worstHealth(r.Classification, sig.Class) +} + +func evaluateClusterStatus(report *DoctorReport, st client.ClusterStatus) { + if st.Mode == "" { + report.addSignal(Signal{Component: "cefas", Name: "mode", Status: "unknown", Class: HealthDegraded}) + } + if st.ShardCount == 0 && len(st.Shards) == 0 { + report.addSignal(Signal{Component: "cefas", Name: "placement", Status: "empty", Class: HealthUnsafe, Detail: "cluster reports no shards"}) + return + } + for _, shard := range st.Shards { + if len(shard.Voters) == 0 { + report.addSignal(Signal{Component: "cefas", Name: fmt.Sprintf("shard/%d/quorum", shard.ID), Status: "no_voters", Class: HealthUnsafe, Detail: "shard has no voting replicas"}) + continue + } + if len(shard.Voters) < 3 { + report.addSignal(Signal{ + Component: "cefas", + Name: fmt.Sprintf("shard/%d/quorum", shard.ID), + Status: "low_replication_factor", + Class: HealthDegraded, + Detail: fmt.Sprintf("shard has %d voters; RF=3 is the expected resilient default", len(shard.Voters)), + Metadata: map[string]any{"voters": shard.Voters}, + }) + } + if shard.State != "" && shard.State != "active" { + report.addSignal(Signal{Component: "cefas", Name: fmt.Sprintf("shard/%d/state", shard.ID), Status: shard.State, Class: HealthRepairable, Detail: "non-active shard state requires reconciliation"}) + } + } + for _, node := range st.Nodes { + switch node.State { + case "", "active": + case "draining": + report.addSignal(Signal{Component: "cefas", Name: "node/" + node.ID, Status: "draining", Class: HealthRepairable, Detail: strings.Join(nodeActiveReferences(st, node.ID), "; ")}) + case "decommissioned": + report.addSignal(Signal{Component: "cefas", Name: "node/" + node.ID, Status: "decommissioned", Class: HealthHealthy}) + default: + report.addSignal(Signal{Component: "cefas", Name: "node/" + node.ID, Status: node.State, Class: HealthRepairable, Detail: "unknown placement node state"}) + } + } + if report.Classification == HealthHealthy { + report.addSignal(Signal{Component: "cefas", Name: "cluster_status", Status: "ok", Class: HealthHealthy}) + } +} + +func evaluatePlacementAudit(report *DoctorReport, audit placement.PlacementAuditReport) { + if audit.Truncated { + report.addSignal(Signal{Component: "cefas", Name: "placement_audit", Status: "truncated", Class: HealthUnsafe, Detail: "audit hit max issue limit; repair plan is incomplete"}) + return + } + if audit.ConsistencyVerdict == "pass" && len(audit.Issues) == 0 { + report.addSignal(Signal{Component: "cefas", Name: "placement_audit", Status: "pass", Class: HealthHealthy}) + return + } + for _, issue := range audit.Issues { + class := HealthRepairable + if issue.Severity == placement.PlacementAuditSeverityWarning { + class = HealthDegraded + } + report.addSignal(Signal{ + Component: "cefas", + Name: "placement_audit/" + issue.Kind, + Status: issue.Severity, + Class: class, + Detail: issue.Detail, + Metadata: map[string]any{"table": issue.Table, "keyHex": issue.KeyHex, "shardId": issue.ShardID}, + }) + } +} + +func evaluateKubernetes(report *DoctorReport, st client.ClusterStatus, snap KubernetesSnapshot, now time.Time) { + if len(snap.Pods) == 0 { + report.addSignal(Signal{Component: "kubernetes", Name: "pods", Status: "empty", Class: HealthDegraded, Detail: "no pods matched the manager selector"}) + } + for _, pod := range snap.Pods { + if !podReady(pod) { + report.addSignal(Signal{Component: "kubernetes", Name: "pod/" + pod.Metadata.Name, Status: pod.Status.Phase, Class: HealthRepairable, Detail: podFailureDetail(pod), Metadata: map[string]any{"nodeName": pod.Spec.NodeName}}) + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.RestartCount >= 5 { + report.addSignal(Signal{Component: "kubernetes", Name: "pod/" + pod.Metadata.Name + "/container/" + cs.Name, Status: "high_restarts", Class: HealthRepairable, Detail: fmt.Sprintf("restartCount=%d", cs.RestartCount)}) + } + if cs.State.Waiting != nil && crashLoopReason(cs.State.Waiting.Reason) { + report.addSignal(Signal{Component: "kubernetes", Name: "pod/" + pod.Metadata.Name + "/container/" + cs.Name, Status: cs.State.Waiting.Reason, Class: HealthUnsafe, Detail: cs.State.Waiting.Message}) + } + } + } + for _, node := range snap.Nodes { + if !nodeReady(node) { + report.addSignal(Signal{Component: "kubernetes", Name: "node/" + node.Metadata.Name, Status: "not_ready", Class: HealthDegraded, Detail: nodeFailureDetail(node)}) + } + } + for _, pvc := range snap.PVCs { + if pvc.Status.Phase != "" && pvc.Status.Phase != "Bound" { + report.addSignal(Signal{Component: "kubernetes", Name: "pvc/" + pvc.Metadata.Name, Status: pvc.Status.Phase, Class: HealthUnsafe, Detail: "persistent volume claim is not bound"}) + } + } + for _, ep := range snap.Endpoints { + notReady := 0 + ready := 0 + for _, subset := range ep.Subsets { + ready += len(subset.Addresses) + notReady += len(subset.NotReadyAddresses) + } + if ready == 0 || notReady > 0 { + class := HealthDegraded + if ready == 0 { + class = HealthUnsafe + } + report.addSignal(Signal{Component: "kubernetes", Name: "endpoints/" + ep.Metadata.Name, Status: "not_ready", Class: class, Detail: fmt.Sprintf("ready=%d notReady=%d", ready, notReady)}) + } + } + clusterNodeIDs := clusterNodeSet(st) + for _, lease := range snap.Leases { + if leaseExpired(lease, now) { + class := HealthRepairable + if clusterNodeIDs[lease.Spec.HolderIdentity] { + class = HealthUnsafe + } + report.addSignal(Signal{Component: "kubernetes", Name: "lease/" + lease.Metadata.Name, Status: "expired", Class: class, Detail: fmt.Sprintf("holder=%s", lease.Spec.HolderIdentity)}) + } + } + if report.Classification == HealthHealthy { + report.addSignal(Signal{Component: "kubernetes", Name: "snapshot", Status: "ok", Class: HealthHealthy}) + } +} + +func nodeActiveReferences(st client.ClusterStatus, nodeID string) []string { + var refs []string + for _, sh := range st.Shards { + if containsString(sh.Voters, nodeID) { + refs = append(refs, fmt.Sprintf("shard %d voter", sh.ID)) + } + if containsString(sh.NonVoters, nodeID) { + refs = append(refs, fmt.Sprintf("shard %d non-voter", sh.ID)) + } + if sh.LeaderHint == nodeID { + refs = append(refs, fmt.Sprintf("shard %d leaderHint", sh.ID)) + } + } + sort.Strings(refs) + return refs +} + +func containsString(items []string, target string) bool { + for _, item := range items { + if item == target { + return true + } + } + return false +} + +func podFailureDetail(pod Pod) string { + var parts []string + for _, cond := range pod.Status.Conditions { + if cond.Status != "True" && (cond.Reason != "" || cond.Message != "") { + parts = append(parts, strings.TrimSpace(cond.Type+" "+cond.Reason+" "+cond.Message)) + } + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.State.Waiting != nil { + parts = append(parts, strings.TrimSpace(cs.Name+" waiting "+cs.State.Waiting.Reason+" "+cs.State.Waiting.Message)) + } + if cs.State.Terminated != nil { + parts = append(parts, strings.TrimSpace(fmt.Sprintf("%s terminated %s exit=%d %s", cs.Name, cs.State.Terminated.Reason, cs.State.Terminated.ExitCode, cs.State.Terminated.Message))) + } + } + if len(parts) == 0 { + return "pod is not ready" + } + return strings.Join(parts, "; ") +} + +func nodeFailureDetail(node Node) string { + for _, cond := range node.Status.Conditions { + if cond.Type == "Ready" && cond.Status != "True" { + return strings.TrimSpace(cond.Reason + " " + cond.Message) + } + } + return "node is not ready" +} + +func crashLoopReason(reason string) bool { + switch reason { + case "CrashLoopBackOff", "RunContainerError", "CreateContainerConfigError", "ImagePullBackOff", "ErrImagePull": + return true + default: + return false + } +} + +func clusterNodeSet(st client.ClusterStatus) map[string]bool { + out := map[string]bool{} + for _, node := range st.Nodes { + out[node.ID] = true + } + return out +} diff --git a/internal/manager/kube.go b/internal/manager/kube.go new file mode 100644 index 0000000..df99013 --- /dev/null +++ b/internal/manager/kube.go @@ -0,0 +1,438 @@ +package manager + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "strings" + "time" +) + +const serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount" + +type Kubernetes interface { + Snapshot(ctx context.Context, opts KubeSnapshotOptions) (KubernetesSnapshot, error) +} + +type KubeSnapshotOptions struct { + Namespace string + Selector string +} + +type KubernetesSnapshot struct { + Namespace string `json:"namespace"` + Selector string `json:"selector,omitempty"` + ListedAt time.Time `json:"listedAt"` + Pods []Pod `json:"pods,omitempty"` + Nodes []Node `json:"nodes,omitempty"` + Leases []Lease `json:"leases,omitempty"` + Endpoints []Endpoint `json:"endpoints,omitempty"` + PVCs []PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"` +} + +type ObjectMeta struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + UID string `json:"uid,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + CreationTimestamp string `json:"creationTimestamp,omitempty"` +} + +type Pod struct { + Metadata ObjectMeta `json:"metadata"` + Spec PodSpec `json:"spec,omitempty"` + Status PodStatus `json:"status,omitempty"` +} + +type PodSpec struct { + NodeName string `json:"nodeName,omitempty"` +} + +type PodStatus struct { + Phase string `json:"phase,omitempty"` + PodIP string `json:"podIP,omitempty"` + HostIP string `json:"hostIP,omitempty"` + Conditions []PodCondition `json:"conditions,omitempty"` + ContainerStatuses []ContainerStatus `json:"containerStatuses,omitempty"` +} + +type PodCondition struct { + Type string `json:"type,omitempty"` + Status string `json:"status,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` +} + +type ContainerStatus struct { + Name string `json:"name,omitempty"` + Ready bool `json:"ready,omitempty"` + RestartCount int `json:"restartCount,omitempty"` + State ContainerState `json:"state,omitempty"` +} + +type ContainerState struct { + Waiting *ContainerStateWaiting `json:"waiting,omitempty"` + Running *ContainerStateRunning `json:"running,omitempty"` + Terminated *ContainerStateTerminated `json:"terminated,omitempty"` +} + +type ContainerStateWaiting struct { + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` +} + +type ContainerStateRunning struct { + StartedAt string `json:"startedAt,omitempty"` +} + +type ContainerStateTerminated struct { + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` + ExitCode int `json:"exitCode,omitempty"` + FinishedAt string `json:"finishedAt,omitempty"` +} + +type Node struct { + Metadata ObjectMeta `json:"metadata"` + Status NodeStatus `json:"status,omitempty"` +} + +type NodeStatus struct { + Conditions []NodeCondition `json:"conditions,omitempty"` +} + +type NodeCondition struct { + Type string `json:"type,omitempty"` + Status string `json:"status,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` +} + +type Lease struct { + Metadata ObjectMeta `json:"metadata"` + Spec LeaseSpec `json:"spec,omitempty"` +} + +type LeaseSpec struct { + HolderIdentity string `json:"holderIdentity,omitempty"` + LeaseDurationSeconds int `json:"leaseDurationSeconds,omitempty"` + AcquireTime string `json:"acquireTime,omitempty"` + RenewTime string `json:"renewTime,omitempty"` + LeaseTransitions int `json:"leaseTransitions,omitempty"` +} + +type Endpoint struct { + Metadata ObjectMeta `json:"metadata"` + Subsets []EndpointSubset `json:"subsets,omitempty"` +} + +type EndpointSubset struct { + Addresses []EndpointAddress `json:"addresses,omitempty"` + NotReadyAddresses []EndpointAddress `json:"notReadyAddresses,omitempty"` +} + +type EndpointAddress struct { + IP string `json:"ip,omitempty"` + Hostname string `json:"hostname,omitempty"` + TargetRef *ObjectReference `json:"targetRef,omitempty"` +} + +type ObjectReference struct { + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + UID string `json:"uid,omitempty"` +} + +type PersistentVolumeClaim struct { + Metadata ObjectMeta `json:"metadata"` + Status PVCStatus `json:"status,omitempty"` +} + +type PVCStatus struct { + Phase string `json:"phase,omitempty"` +} + +type HTTPKubeClient struct { + base *url.URL + httpClient *http.Client + token string + now func() time.Time +} + +func NewInClusterKubeClient() (*HTTPKubeClient, string, error) { + host := os.Getenv("KUBERNETES_SERVICE_HOST") + port := os.Getenv("KUBERNETES_SERVICE_PORT") + if host == "" || port == "" { + return nil, "", errors.New("KUBERNETES_SERVICE_HOST/PORT not set") + } + tokenBytes, err := os.ReadFile(path.Join(serviceAccountDir, "token")) + if err != nil { + return nil, "", fmt.Errorf("read service account token: %w", err) + } + nsBytes, err := os.ReadFile(path.Join(serviceAccountDir, "namespace")) + if err != nil { + return nil, "", fmt.Errorf("read service account namespace: %w", err) + } + pool := x509.NewCertPool() + if caBytes, err := os.ReadFile(path.Join(serviceAccountDir, "ca.crt")); err == nil { + pool.AppendCertsFromPEM(caBytes) + } + u, err := url.Parse("https://" + netJoinHostPort(host, port)) + if err != nil { + return nil, "", err + } + return &HTTPKubeClient{ + base: u, + httpClient: &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{ + RootCAs: pool, + MinVersion: tls.VersionTLS12, + }}}, + token: strings.TrimSpace(string(tokenBytes)), + now: time.Now, + }, strings.TrimSpace(string(nsBytes)), nil +} + +func NewHTTPKubeClient(baseURL, bearer string, httpClient *http.Client) (*HTTPKubeClient, error) { + if httpClient == nil { + httpClient = http.DefaultClient + } + u, err := url.Parse(strings.TrimRight(baseURL, "/")) + if err != nil { + return nil, err + } + return &HTTPKubeClient{base: u, httpClient: httpClient, token: bearer, now: time.Now}, nil +} + +func (c *HTTPKubeClient) Snapshot(ctx context.Context, opts KubeSnapshotOptions) (KubernetesSnapshot, error) { + if opts.Namespace == "" { + return KubernetesSnapshot{}, errors.New("namespace is required") + } + snap := KubernetesSnapshot{Namespace: opts.Namespace, Selector: opts.Selector, ListedAt: c.clock().UTC()} + var err error + if snap.Pods, err = c.listPods(ctx, opts.Namespace, opts.Selector); err != nil { + return snap, err + } + if snap.Endpoints, err = c.listEndpoints(ctx, opts.Namespace, opts.Selector); err != nil { + return snap, err + } + if snap.PVCs, err = c.listPVCs(ctx, opts.Namespace, opts.Selector); err != nil { + return snap, err + } + if snap.Leases, err = c.listLeases(ctx, opts.Namespace, leaseSelector(opts.Selector)); err != nil { + return snap, err + } + if snap.Nodes, err = c.listNodes(ctx); err != nil { + return snap, err + } + return snap, nil +} + +func (c *HTTPKubeClient) listPods(ctx context.Context, namespace, selector string) ([]Pod, error) { + var out struct { + Items []Pod `json:"items"` + } + err := c.getJSON(ctx, namespacePath(namespace, "pods"), selector, &out) + return out.Items, err +} + +func (c *HTTPKubeClient) listEndpoints(ctx context.Context, namespace, selector string) ([]Endpoint, error) { + var out struct { + Items []Endpoint `json:"items"` + } + err := c.getJSON(ctx, namespacePath(namespace, "endpoints"), selector, &out) + return out.Items, err +} + +func (c *HTTPKubeClient) listPVCs(ctx context.Context, namespace, selector string) ([]PersistentVolumeClaim, error) { + var out struct { + Items []PersistentVolumeClaim `json:"items"` + } + err := c.getJSON(ctx, namespacePath(namespace, "persistentvolumeclaims"), selector, &out) + return out.Items, err +} + +func (c *HTTPKubeClient) listLeases(ctx context.Context, namespace, selector string) ([]Lease, error) { + var out struct { + Items []Lease `json:"items"` + } + err := c.getJSON(ctx, "/apis/coordination.k8s.io/v1/namespaces/"+url.PathEscape(namespace)+"/leases", selector, &out) + return out.Items, err +} + +func (c *HTTPKubeClient) listNodes(ctx context.Context) ([]Node, error) { + var out struct { + Items []Node `json:"items"` + } + err := c.getJSON(ctx, "/api/v1/nodes", "", &out) + return out.Items, err +} + +func (c *HTTPKubeClient) getJSON(ctx context.Context, apiPath, selector string, out any) error { + u := c.apiURL(apiPath) + if selector != "" { + q := u.Query() + q.Set("labelSelector", selector) + u.RawQuery = q.Encode() + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return err + } + return c.doJSON(req, http.StatusOK, out) +} + +func (c *HTTPKubeClient) doJSON(req *http.Request, wantStatus int, out any) error { + if c.token != "" { + req.Header.Set("Authorization", "Bearer "+c.token) + } + req.Header.Set("Accept", "application/json") + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != wantStatus { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("kubernetes %s %s: status=%d body=%s", req.Method, req.URL.Path, resp.StatusCode, strings.TrimSpace(string(body))) + } + if out == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(out) +} + +func (c *HTTPKubeClient) apiURL(apiPath string) *url.URL { + u := *c.base + u.Path = apiPath + return &u +} + +func namespacePath(namespace, resource string) string { + return "/api/v1/namespaces/" + url.PathEscape(namespace) + "/" + resource +} + +func leaseSelector(selector string) string { + for _, part := range strings.Split(selector, ",") { + part = strings.TrimSpace(part) + if strings.HasPrefix(part, "app.kubernetes.io/name=") { + return part + } + } + return "" +} + +func (c *HTTPKubeClient) clock() time.Time { + if c.now != nil { + return c.now() + } + return time.Now() +} + +func podReady(p Pod) bool { + if p.Status.Phase != "Running" { + return false + } + for _, cond := range p.Status.Conditions { + if cond.Type == "Ready" { + return cond.Status == "True" + } + } + return false +} + +func nodeReady(n Node) bool { + for _, cond := range n.Status.Conditions { + if cond.Type == "Ready" { + return cond.Status == "True" + } + } + return false +} + +func leaseExpired(l Lease, now time.Time) bool { + if l.Spec.LeaseDurationSeconds <= 0 { + return false + } + raw := l.Spec.RenewTime + if raw == "" { + raw = l.Spec.AcquireTime + } + if raw == "" { + return false + } + t, err := time.Parse(time.RFC3339Nano, raw) + if err != nil { + return false + } + return now.After(t.Add(time.Duration(l.Spec.LeaseDurationSeconds) * time.Second)) +} + +func (c *HTTPKubeClient) getLease(ctx context.Context, namespace, name string) (Lease, int, error) { + var lease Lease + apiPath := "/apis/coordination.k8s.io/v1/namespaces/" + url.PathEscape(namespace) + "/leases/" + url.PathEscape(name) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.apiURL(apiPath).String(), nil) + if err != nil { + return lease, 0, err + } + if c.token != "" { + req.Header.Set("Authorization", "Bearer "+c.token) + } + req.Header.Set("Accept", "application/json") + resp, err := c.httpClient.Do(req) + if err != nil { + return lease, 0, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return lease, resp.StatusCode, nil + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return lease, resp.StatusCode, fmt.Errorf("kubernetes GET lease %s/%s: status=%d body=%s", namespace, name, resp.StatusCode, strings.TrimSpace(string(body))) + } + if err := json.NewDecoder(resp.Body).Decode(&lease); err != nil { + return lease, resp.StatusCode, err + } + return lease, resp.StatusCode, nil +} + +func (c *HTTPKubeClient) putLease(ctx context.Context, namespace string, lease Lease, create bool) (Lease, error) { + var out Lease + apiPath := "/apis/coordination.k8s.io/v1/namespaces/" + url.PathEscape(namespace) + "/leases" + method := http.MethodPost + want := http.StatusCreated + if !create { + apiPath += "/" + url.PathEscape(lease.Metadata.Name) + method = http.MethodPut + want = http.StatusOK + } + body, err := json.Marshal(lease) + if err != nil { + return out, err + } + req, err := http.NewRequestWithContext(ctx, method, c.apiURL(apiPath).String(), bytes.NewReader(body)) + if err != nil { + return out, err + } + req.Header.Set("Content-Type", "application/json") + return out, c.doJSON(req, want, &out) +} + +func netJoinHostPort(host, port string) string { + if strings.Contains(host, ":") && !strings.HasPrefix(host, "[") { + return "[" + host + "]:" + port + } + return host + ":" + port +} diff --git a/internal/manager/leader.go b/internal/manager/leader.go new file mode 100644 index 0000000..532a436 --- /dev/null +++ b/internal/manager/leader.go @@ -0,0 +1,130 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "time" +) + +type LeaderElector interface { + Acquire(ctx context.Context) (LeaderLease, error) +} + +type LeaderElectionOptions struct { + Namespace string + Name string + HolderID string + TTL time.Duration + Labels map[string]string +} + +type LeaderLease struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Holder string `json:"holder,omitempty"` + Acquired bool `json:"acquired"` + ExpiresAt time.Time `json:"expiresAt,omitempty"` +} + +type KubeLeaderElector struct { + Client *HTTPKubeClient + Opts LeaderElectionOptions +} + +func (e *KubeLeaderElector) Acquire(ctx context.Context) (LeaderLease, error) { + if e == nil || e.Client == nil { + return LeaderLease{}, errors.New("kubernetes leader elector is not configured") + } + opts := e.Opts + if opts.Namespace == "" { + return LeaderLease{}, errors.New("leader namespace is required") + } + if opts.Name == "" { + return LeaderLease{}, errors.New("leader lease name is required") + } + if opts.HolderID == "" { + return LeaderLease{}, errors.New("leader holder id is required") + } + if opts.TTL <= 0 { + opts.TTL = 30 * time.Second + } + now := e.Client.clock().UTC() + current, status, err := e.Client.getLease(ctx, opts.Namespace, opts.Name) + if err != nil { + return LeaderLease{}, err + } + if status == 0 || status == httpStatusNotFound { + created := Lease{ + Metadata: ObjectMeta{Name: opts.Name, Namespace: opts.Namespace, Labels: cloneStringMap(opts.Labels)}, + Spec: LeaseSpec{ + HolderIdentity: opts.HolderID, + LeaseDurationSeconds: int(opts.TTL / time.Second), + AcquireTime: now.Format(time.RFC3339Nano), + RenewTime: now.Format(time.RFC3339Nano), + }, + } + out, err := e.Client.putLease(ctx, opts.Namespace, created, true) + if err != nil { + return LeaderLease{}, err + } + return leaderLeaseFromKube(out, now, true), nil + } + expired := leaseExpired(current, now) + if current.Spec.HolderIdentity != "" && current.Spec.HolderIdentity != opts.HolderID && !expired { + return leaderLeaseFromKube(current, now, false), nil + } + renewed := current + if renewed.Metadata.Labels == nil { + renewed.Metadata.Labels = map[string]string{} + } + for k, v := range opts.Labels { + renewed.Metadata.Labels[k] = v + } + if renewed.Spec.HolderIdentity != opts.HolderID { + renewed.Spec.LeaseTransitions++ + renewed.Spec.AcquireTime = now.Format(time.RFC3339Nano) + } + renewed.Spec.HolderIdentity = opts.HolderID + renewed.Spec.LeaseDurationSeconds = int(opts.TTL / time.Second) + renewed.Spec.RenewTime = now.Format(time.RFC3339Nano) + out, err := e.Client.putLease(ctx, opts.Namespace, renewed, false) + if err != nil { + return LeaderLease{}, fmt.Errorf("renew leader lease: %w", err) + } + return leaderLeaseFromKube(out, now, true), nil +} + +const httpStatusNotFound = 404 + +func leaderLeaseFromKube(l Lease, now time.Time, acquired bool) LeaderLease { + expires := time.Time{} + raw := l.Spec.RenewTime + if raw == "" { + raw = l.Spec.AcquireTime + } + if t, err := time.Parse(time.RFC3339Nano, raw); err == nil && l.Spec.LeaseDurationSeconds > 0 { + expires = t.Add(time.Duration(l.Spec.LeaseDurationSeconds) * time.Second) + } + if !expires.IsZero() && now.After(expires) { + acquired = false + } + return LeaderLease{ + Namespace: l.Metadata.Namespace, + Name: l.Metadata.Name, + Holder: l.Spec.HolderIdentity, + Acquired: acquired, + ExpiresAt: expires, + } +} + +func cloneStringMap(in map[string]string) map[string]string { + if len(in) == 0 { + return nil + } + out := make(map[string]string, len(in)) + for k, v := range in { + out[k] = v + } + return out +} diff --git a/internal/manager/manager_test.go b/internal/manager/manager_test.go new file mode 100644 index 0000000..9910887 --- /dev/null +++ b/internal/manager/manager_test.go @@ -0,0 +1,281 @@ +package manager + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/CefasDb/cefasdb/internal/placement" + "github.com/CefasDb/cefasdb/pkg/client" +) + +type fakeCefas struct { + status client.ClusterStatus + audit placement.PlacementAuditReport + statusErr error + auditErr error + planResult client.PlacementPlan + applyResult client.PlacementApplyResult + planCalls int + applyCalls int +} + +func (f *fakeCefas) Status(context.Context) (client.ClusterStatus, error) { + return f.status, f.statusErr +} + +func (f *fakeCefas) AuditPlacement(context.Context, placement.PlacementAuditRequest) (placement.PlacementAuditReport, error) { + return f.audit, f.auditErr +} + +func (f *fakeCefas) PlanPlacement(context.Context, client.PlacementPlanRequest) (client.PlacementPlan, error) { + f.planCalls++ + return f.planResult, nil +} + +func (f *fakeCefas) ApplyPlacement(context.Context, client.PlacementApplyRequest) (client.PlacementApplyResult, error) { + f.applyCalls++ + return f.applyResult, nil +} + +type fakeKube struct { + snap KubernetesSnapshot + err error +} + +func (f fakeKube) Snapshot(context.Context, KubeSnapshotOptions) (KubernetesSnapshot, error) { + return f.snap, f.err +} + +func TestDoctorClassifiesKubernetesAndPlacementFailures(t *testing.T) { + now := time.Date(2026, 6, 25, 12, 0, 0, 0, time.UTC) + fc := &fakeCefas{ + status: healthyStatus(), + audit: placement.PlacementAuditReport{ + ConsistencyVerdict: "fail", + Issues: []placement.PlacementAuditIssue{{ + Kind: placement.PlacementAuditKindGap, + Severity: placement.PlacementAuditSeverityError, + Detail: "gap", + }}, + RepairPlan: &placement.PlacementRepairPlan{Actions: []placement.PlacementRepairAction{{Action: "review_placement_gap", Detail: "review gap"}}}, + }, + } + report, err := Doctor(context.Background(), DoctorOptions{ + Cefas: fc, + Kubernetes: fakeKube{snap: KubernetesSnapshot{ + Namespace: "default", + Pods: []Pod{{ + Metadata: ObjectMeta{Name: "cefas-0"}, + Status: PodStatus{ + Phase: "Running", + Conditions: []PodCondition{{Type: "Ready", Status: "False", Reason: "ContainersNotReady"}}, + ContainerStatuses: []ContainerStatus{{ + Name: "cefas", + State: ContainerState{Waiting: &ContainerStateWaiting{Reason: "CrashLoopBackOff", Message: "crashed"}}, + }}, + }, + }}, + Nodes: []Node{{Metadata: ObjectMeta{Name: "m1"}, Status: NodeStatus{Conditions: []NodeCondition{{Type: "Ready", Status: "True"}}}}}, + Endpoints: []Endpoint{{Metadata: ObjectMeta{Name: "cefas"}, Subsets: []EndpointSubset{{Addresses: []EndpointAddress{{IP: "10.0.0.1"}}}}}}, + PVCs: []PersistentVolumeClaim{{Metadata: ObjectMeta{Name: "data-cefas-0"}, Status: PVCStatus{Phase: "Bound"}}}, + }}, + Now: func() time.Time { return now }, + }) + if err != nil { + t.Fatal(err) + } + if report.Classification != HealthUnsafe { + t.Fatalf("classification = %s, want %s", report.Classification, HealthUnsafe) + } + if !hasSignal(report, "kubernetes", "CrashLoopBackOff") { + t.Fatalf("report did not include crashloop signal: %+v", report.Signals) + } + if !hasSignal(report, "cefas", placement.PlacementAuditKindGap) { + t.Fatalf("report did not include placement gap signal: %+v", report.Signals) + } +} + +func TestDryRunPlansDrainWithoutMutation(t *testing.T) { + fc := &fakeCefas{status: drainingStatus(), audit: passAudit()} + report, err := Doctor(context.Background(), DoctorOptions{Cefas: fc}) + if err != nil { + t.Fatal(err) + } + result := DryRunRepair(RepairOptions{Report: report}) + if len(result.Plan.Actions) != 1 { + t.Fatalf("actions = %+v, want one drain action", result.Plan.Actions) + } + if got := result.Plan.Actions[0].Type; got != "placement_drain" { + t.Fatalf("action type = %s, want placement_drain", got) + } + if fc.planCalls != 0 || fc.applyCalls != 0 { + t.Fatalf("dry-run mutated: planCalls=%d applyCalls=%d", fc.planCalls, fc.applyCalls) + } +} + +func TestExecuteRequiresLeaderElection(t *testing.T) { + fc := &fakeCefas{status: drainingStatus(), audit: passAudit()} + report, err := Doctor(context.Background(), DoctorOptions{Cefas: fc}) + if err != nil { + t.Fatal(err) + } + _, err = ExecuteRepair(context.Background(), RepairOptions{Cefas: fc, Report: report}) + if err == nil || !strings.Contains(err.Error(), "manager_leader_election") { + t.Fatalf("error = %v, want leader election precondition failure", err) + } + if fc.planCalls != 0 || fc.applyCalls != 0 { + t.Fatalf("execute mutated before preconditions passed: planCalls=%d applyCalls=%d", fc.planCalls, fc.applyCalls) + } +} + +func TestExecuteAppliesSupportedDrain(t *testing.T) { + fc := &fakeCefas{ + status: drainingStatus(), + audit: passAudit(), + planResult: client.PlacementPlan{ + Operation: "drain", + BeforeEpoch: 7, + AfterEpoch: 8, + ApplySupported: true, + }, + applyResult: client.PlacementApplyResult{Operation: "drain", BeforeEpoch: 7, AfterEpoch: 8}, + } + report, err := Doctor(context.Background(), DoctorOptions{Cefas: fc}) + if err != nil { + t.Fatal(err) + } + result, err := ExecuteRepair(context.Background(), RepairOptions{ + Cefas: fc, + Report: report, + Leader: LeaderLease{Acquired: true, Holder: "manager-1"}, + }) + if err != nil { + t.Fatal(err) + } + if fc.planCalls != 1 || fc.applyCalls != 1 { + t.Fatalf("calls = plan:%d apply:%d, want 1/1", fc.planCalls, fc.applyCalls) + } + if len(result.Applied) != 1 || result.Applied[0].Status != "applied" { + t.Fatalf("applied = %+v", result.Applied) + } +} + +func TestKubeLeaderElectorAcquiresAndRespectsHolder(t *testing.T) { + now := time.Date(2026, 6, 25, 12, 0, 0, 0, time.UTC) + var stored Lease + exists := false + mux := http.NewServeMux() + mux.HandleFunc("/apis/coordination.k8s.io/v1/namespaces/default/leases/cefas-manager", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + if !exists { + http.NotFound(w, r) + return + } + _ = json.NewEncoder(w).Encode(stored) + case http.MethodPut: + if err := json.NewDecoder(r.Body).Decode(&stored); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + exists = true + _ = json.NewEncoder(w).Encode(stored) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } + }) + mux.HandleFunc("/apis/coordination.k8s.io/v1/namespaces/default/leases", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if err := json.NewDecoder(r.Body).Decode(&stored); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + exists = true + w.WriteHeader(http.StatusCreated) + _ = json.NewEncoder(w).Encode(stored) + }) + server := httptest.NewServer(mux) + defer server.Close() + + kube, err := NewHTTPKubeClient(server.URL, "", server.Client()) + if err != nil { + t.Fatal(err) + } + kube.now = func() time.Time { return now } + first := &KubeLeaderElector{Client: kube, Opts: LeaderElectionOptions{Namespace: "default", Name: "cefas-manager", HolderID: "manager-1", TTL: 30 * time.Second}} + lease, err := first.Acquire(context.Background()) + if err != nil { + t.Fatal(err) + } + if !lease.Acquired || lease.Holder != "manager-1" { + t.Fatalf("first lease = %+v", lease) + } + second := &KubeLeaderElector{Client: kube, Opts: LeaderElectionOptions{Namespace: "default", Name: "cefas-manager", HolderID: "manager-2", TTL: 30 * time.Second}} + lease, err = second.Acquire(context.Background()) + if err != nil { + t.Fatal(err) + } + if lease.Acquired || lease.Holder != "manager-1" { + t.Fatalf("second lease before expiry = %+v", lease) + } + kube.now = func() time.Time { return now.Add(31 * time.Second) } + lease, err = second.Acquire(context.Background()) + if err != nil { + t.Fatal(err) + } + if !lease.Acquired || lease.Holder != "manager-2" { + t.Fatalf("second lease after expiry = %+v", lease) + } +} + +func TestDoctorHandlesUnreachableCluster(t *testing.T) { + report, err := Doctor(context.Background(), DoctorOptions{Cefas: &fakeCefas{statusErr: errors.New("down")}}) + if err != nil { + t.Fatal(err) + } + if report.Classification != HealthUnsafe { + t.Fatalf("classification = %s, want unsafe", report.Classification) + } +} + +func healthyStatus() client.ClusterStatus { + return client.ClusterStatus{ + Mode: "raft", + ShardCount: 1, + Shards: []client.ShardPlacement{{ID: 0, State: "active", Voters: []string{"n1", "n2", "n3"}}}, + Nodes: []client.NodeDescriptor{ + {ID: "n1", State: "active"}, + {ID: "n2", State: "active"}, + {ID: "n3", State: "active"}, + }, + } +} + +func drainingStatus() client.ClusterStatus { + st := healthyStatus() + st.Nodes[0].State = "draining" + st.Shards[0].Voters = []string{"n1", "n2", "n3"} + return st +} + +func passAudit() placement.PlacementAuditReport { + return placement.PlacementAuditReport{ConsistencyVerdict: "pass", RepairPlan: &placement.PlacementRepairPlan{}} +} + +func hasSignal(report DoctorReport, component, statusPart string) bool { + for _, sig := range report.Signals { + if sig.Component == component && (strings.Contains(sig.Status, statusPart) || strings.Contains(sig.Name, statusPart) || strings.Contains(sig.Detail, statusPart)) { + return true + } + } + return false +} diff --git a/internal/manager/repair.go b/internal/manager/repair.go new file mode 100644 index 0000000..bd74adf --- /dev/null +++ b/internal/manager/repair.go @@ -0,0 +1,306 @@ +package manager + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/CefasDb/cefasdb/pkg/client" +) + +type RepairOptions struct { + Cefas Cefas + Report DoctorReport + Mode string + Leader LeaderLease + ApproveFencing bool + AuditLogPath string + Timeout time.Duration + Now func() time.Time +} + +func BuildRepairPlan(opts RepairOptions) RepairPlan { + now := time.Now + if opts.Now != nil { + now = opts.Now + } + mode := opts.Mode + if mode == "" { + mode = "dry-run" + } + plan := RepairPlan{GeneratedAt: now().UTC(), Mode: mode, Classification: opts.Report.Classification} + clusterReady := opts.Report.Cluster != nil + plan.Preconditions = append(plan.Preconditions, Precondition{Name: "cluster_status_read", Status: boolPrecondition(clusterReady), Detail: detailIf(!clusterReady, "doctor could not read Cefas cluster status")}) + auditComplete := opts.Report.PlacementAudit == nil || !opts.Report.PlacementAudit.Truncated + plan.Preconditions = append(plan.Preconditions, Precondition{Name: "placement_audit_complete", Status: boolPrecondition(auditComplete), Detail: detailIf(!auditComplete, "placement audit is truncated; repair plan is incomplete")}) + if mode == "execute" { + plan.Preconditions = append(plan.Preconditions, Precondition{Name: "manager_leader_election", Status: boolPrecondition(opts.Leader.Acquired), Detail: detailIf(!opts.Leader.Acquired, fmt.Sprintf("leader lease is held by %q", opts.Leader.Holder))}) + } else { + plan.Preconditions = append(plan.Preconditions, Precondition{Name: "manager_leader_election", Status: PreconditionWarning, Detail: "not required for dry-run output"}) + } + plan.Actions = append(plan.Actions, placementAuditActions(opts.Report)...) + if opts.Report.Cluster != nil { + plan.Actions = append(plan.Actions, placementNodeActions(*opts.Report.Cluster)...) + } + if opts.Report.Kubernetes != nil { + plan.Actions = append(plan.Actions, kubernetesRepairActions(*opts.Report.Kubernetes, plan.GeneratedAt)...) + } + if hasSensitiveAction(plan.Actions) { + status := PreconditionWarning + detail := "sensitive actions require --approve-fencing before execution" + if mode == "execute" { + status = boolPrecondition(opts.ApproveFencing) + detail = detailIf(!opts.ApproveFencing, detail) + } + plan.Preconditions = append(plan.Preconditions, Precondition{Name: "fencing_approved", Status: status, Detail: detail}) + } + return plan +} + +func ExecuteRepair(ctx context.Context, opts RepairOptions) (RepairResult, error) { + opts.Mode = "execute" + plan := BuildRepairPlan(opts) + result := RepairResult{Plan: plan, AuditLogPath: opts.AuditLogPath} + if err := appendAudit(opts.AuditLogPath, "plan", "", plan, ""); err != nil { + return result, err + } + if err := preconditionsPassed(plan); err != nil { + result.Error = err.Error() + result.StoppedAt = "preconditions" + _ = appendAudit(opts.AuditLogPath, "stop", "preconditions", nil, err.Error()) + return result, err + } + if opts.Cefas == nil { + err := errors.New("cefas client is required for execute") + result.Error = err.Error() + result.StoppedAt = "cefas_client" + _ = appendAudit(opts.AuditLogPath, "stop", "cefas_client", nil, err.Error()) + return result, err + } + timeoutMS := int((5 * time.Second) / time.Millisecond) + if opts.Timeout > 0 { + timeoutMS = int(opts.Timeout / time.Millisecond) + } + for _, action := range plan.Actions { + if action.Sensitive && !opts.ApproveFencing { + err := fmt.Errorf("action %s is fencing-sensitive and --approve-fencing was not set", action.ID) + result.Error = err.Error() + result.StoppedAt = action.ID + _ = appendAudit(opts.AuditLogPath, "stop", action.ID, action, err.Error()) + return result, err + } + if !action.Supported { + err := fmt.Errorf("action %s requires manual review: %s", action.ID, action.Description) + result.Error = err.Error() + result.StoppedAt = action.ID + _ = appendAudit(opts.AuditLogPath, "stop", action.ID, action, err.Error()) + return result, err + } + if action.Placement == nil { + err := fmt.Errorf("action %s is marked supported but has no placement request", action.ID) + result.Error = err.Error() + result.StoppedAt = action.ID + _ = appendAudit(opts.AuditLogPath, "stop", action.ID, action, err.Error()) + return result, err + } + _ = appendAudit(opts.AuditLogPath, "start", action.ID, action, "") + placementPlan, err := opts.Cefas.PlanPlacement(ctx, *action.Placement) + if err != nil { + result.Error = err.Error() + result.StoppedAt = action.ID + _ = appendAudit(opts.AuditLogPath, "stop", action.ID, action, err.Error()) + return result, err + } + if !placementPlan.ApplySupported { + err := fmt.Errorf("placement plan for action %s is not apply-supported", action.ID) + result.Error = err.Error() + result.StoppedAt = action.ID + _ = appendAudit(opts.AuditLogPath, "stop", action.ID, placementPlan, err.Error()) + return result, err + } + applyResult, err := opts.Cefas.ApplyPlacement(ctx, client.PlacementApplyRequest{Plan: placementPlan, ExpectedEpoch: placementPlan.BeforeEpoch, TimeoutMS: timeoutMS}) + actionResult := ActionResult{ActionID: action.ID, Status: "applied", Plan: &placementPlan, Result: &applyResult} + if err != nil { + actionResult.Status = "failed" + actionResult.Detail = err.Error() + result.Applied = append(result.Applied, actionResult) + result.Error = err.Error() + result.StoppedAt = action.ID + _ = appendAudit(opts.AuditLogPath, "stop", action.ID, actionResult, err.Error()) + return result, err + } + result.Applied = append(result.Applied, actionResult) + _ = appendAudit(opts.AuditLogPath, "applied", action.ID, actionResult, "") + } + return result, nil +} + +func DryRunRepair(opts RepairOptions) RepairResult { + opts.Mode = "dry-run" + return RepairResult{Plan: BuildRepairPlan(opts), AuditLogPath: opts.AuditLogPath} +} + +func placementAuditActions(report DoctorReport) []RepairAction { + if report.PlacementAudit == nil || report.PlacementAudit.RepairPlan == nil { + return nil + } + var actions []RepairAction + for i, action := range report.PlacementAudit.RepairPlan.Actions { + actions = append(actions, RepairAction{ + ID: fmt.Sprintf("placement-audit-%03d", i+1), + Type: "placement_audit_manual_repair", + Description: action.Detail, + Supported: report.PlacementAudit.RepairPlan.ApplySupported, + Sensitive: true, + Preconditions: []string{"cluster_status_read", "placement_audit_complete", "fencing_approved"}, + Payload: map[string]any{"action": action}, + }) + } + return actions +} + +func placementNodeActions(st client.ClusterStatus) []RepairAction { + var actions []RepairAction + for _, node := range st.Nodes { + if node.State != "draining" { + continue + } + refs := nodeActiveReferences(st, node.ID) + if len(refs) > 0 { + actions = append(actions, RepairAction{ + ID: "placement-drain-" + sanitizeID(node.ID), + Type: "placement_drain", + Description: "continue draining placement references away from " + node.ID, + Supported: true, + Preconditions: []string{"cluster_status_read", "manager_leader_election"}, + Placement: &client.PlacementPlanRequest{Operation: "drain", NodeID: node.ID}, + Payload: map[string]any{"activeReferences": refs}, + }) + continue + } + actions = append(actions, RepairAction{ + ID: "placement-decommission-" + sanitizeID(node.ID), + Type: "placement_decommission", + Description: "mark drained node " + node.ID + " as decommissioned", + Supported: true, + Sensitive: true, + Preconditions: []string{"cluster_status_read", "manager_leader_election", "fencing_approved"}, + Placement: &client.PlacementPlanRequest{Operation: "decommission", NodeID: node.ID}, + }) + } + return actions +} + +func kubernetesRepairActions(snap KubernetesSnapshot, now time.Time) []RepairAction { + var actions []RepairAction + for _, pod := range snap.Pods { + if podReady(pod) { + continue + } + actions = append(actions, RepairAction{ + ID: "kubernetes-pod-review-" + sanitizeID(pod.Metadata.Name), + Type: "kubernetes_manual_review", + Description: "pod is not ready; verify replacement/fencing before any data-plane repair", + Supported: false, + Sensitive: true, + Preconditions: []string{"fencing_approved"}, + Payload: map[string]any{"pod": pod.Metadata.Name, "phase": pod.Status.Phase, "nodeName": pod.Spec.NodeName, "namespace": pod.Metadata.Namespace}, + }) + } + for _, lease := range snap.Leases { + if !leaseExpired(lease, now) { + continue + } + actions = append(actions, RepairAction{ + ID: "identity-lease-review-" + sanitizeID(lease.Metadata.Name), + Type: "identity_lease_manual_review", + Description: "identity lease expired; confirm the old holder is fenced before deleting or replacing it", + Supported: false, + Sensitive: true, + Preconditions: []string{"fencing_approved"}, + Payload: map[string]any{"lease": lease.Metadata.Name, "holder": lease.Spec.HolderIdentity}, + }) + } + return actions +} + +func preconditionsPassed(plan RepairPlan) error { + for _, pre := range plan.Preconditions { + if pre.Status == PreconditionFail { + if pre.Detail != "" { + return fmt.Errorf("precondition %s failed: %s", pre.Name, pre.Detail) + } + return fmt.Errorf("precondition %s failed", pre.Name) + } + } + return nil +} + +func boolPrecondition(ok bool) PreconditionStatus { + if ok { + return PreconditionPass + } + return PreconditionFail +} + +func detailIf(ok bool, detail string) string { + if ok { + return detail + } + return "" +} + +func hasSensitiveAction(actions []RepairAction) bool { + for _, action := range actions { + if action.Sensitive { + return true + } + } + return false +} + +func sanitizeID(v string) string { + v = strings.TrimSpace(v) + if v == "" { + return "unknown" + } + var b strings.Builder + for _, r := range v { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' || r == '.' { + b.WriteRune(r) + continue + } + b.WriteByte('-') + } + return b.String() +} + +type auditEntry struct { + At time.Time `json:"at"` + Stage string `json:"stage"` + ActionID string `json:"actionId,omitempty"` + Object any `json:"object,omitempty"` + Error string `json:"error,omitempty"` +} + +func appendAudit(path, stage, actionID string, object any, errText string) error { + if path == "" { + return nil + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600) + if err != nil { + return err + } + defer f.Close() + entry := auditEntry{At: time.Now().UTC(), Stage: stage, ActionID: actionID, Object: object, Error: errText} + data, err := json.Marshal(entry) + if err != nil { + return err + } + _, err = f.Write(append(data, '\n')) + return err +} diff --git a/internal/manager/types.go b/internal/manager/types.go new file mode 100644 index 0000000..5957cdc --- /dev/null +++ b/internal/manager/types.go @@ -0,0 +1,106 @@ +package manager + +import ( + "time" + + "github.com/CefasDb/cefasdb/internal/placement" + "github.com/CefasDb/cefasdb/pkg/client" +) + +type HealthClass string + +const ( + HealthHealthy HealthClass = "healthy" + HealthDegraded HealthClass = "degraded" + HealthRepairable HealthClass = "repairable" + HealthUnsafe HealthClass = "unsafe" +) + +type Signal struct { + Component string `json:"component"` + Name string `json:"name"` + Status string `json:"status"` + Class HealthClass `json:"class"` + Detail string `json:"detail,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +type DoctorReport struct { + GeneratedAt time.Time `json:"generatedAt"` + Classification HealthClass `json:"classification"` + Cluster *client.ClusterStatus `json:"cluster,omitempty"` + Kubernetes *KubernetesSnapshot `json:"kubernetes,omitempty"` + PlacementAudit *placement.PlacementAuditReport `json:"placementAudit,omitempty"` + Signals []Signal `json:"signals"` +} + +type PreconditionStatus string + +const ( + PreconditionPass PreconditionStatus = "pass" + PreconditionFail PreconditionStatus = "fail" + PreconditionWarning PreconditionStatus = "warning" +) + +type Precondition struct { + Name string `json:"name"` + Status PreconditionStatus `json:"status"` + Detail string `json:"detail,omitempty"` +} + +type RepairAction struct { + ID string `json:"id"` + Type string `json:"type"` + Description string `json:"description"` + Supported bool `json:"supported"` + Sensitive bool `json:"sensitive"` + Preconditions []string `json:"preconditions,omitempty"` + Placement *client.PlacementPlanRequest `json:"placement,omitempty"` + Payload map[string]any `json:"payload,omitempty"` +} + +type RepairPlan struct { + GeneratedAt time.Time `json:"generatedAt"` + Mode string `json:"mode"` + Classification HealthClass `json:"classification"` + Preconditions []Precondition `json:"preconditions"` + Actions []RepairAction `json:"actions"` +} + +type ActionResult struct { + ActionID string `json:"actionId"` + Status string `json:"status"` + Detail string `json:"detail,omitempty"` + Plan *client.PlacementPlan `json:"plan,omitempty"` + Result *client.PlacementApplyResult `json:"result,omitempty"` +} + +type RepairResult struct { + Plan RepairPlan `json:"plan"` + Applied []ActionResult `json:"applied,omitempty"` + AuditLogPath string `json:"auditLogPath,omitempty"` + StoppedAt string `json:"stoppedAt,omitempty"` + Error string `json:"error,omitempty"` +} + +func worstHealth(a, b HealthClass) HealthClass { + if healthRank(b) > healthRank(a) { + return b + } + return a +} + +func healthRank(v HealthClass) int { + switch v { + case HealthUnsafe: + return 4 + case HealthRepairable: + return 3 + case HealthDegraded: + return 2 + case HealthHealthy: + return 1 + default: + return 0 + } +}