Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
407 changes: 407 additions & 0 deletions cmd/cefas-manager/main.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ ENV CGO_ENABLED=0
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH:-amd64} \
go build -trimpath -ldflags="-s -w" -o /out/cefasdb ./cmd/cefasdb
go build -trimpath -ldflags="-s -w" -o /out/cefasdb ./cmd/cefasdb && \
go build -trimpath -ldflags="-s -w" -o /out/cefas-manager ./cmd/cefas-manager

RUN mkdir -p /out/var/lib/cefas && touch /out/var/lib/cefas/.keep

Expand All @@ -44,6 +45,7 @@ LABEL org.opencontainers.image.source="https://github.com/CefasDb/cefasdb" \
org.opencontainers.image.version="${VERSION}"

COPY --from=build /out/cefasdb /usr/local/bin/cefasdb
COPY --from=build /out/cefas-manager /usr/local/bin/cefas-manager
COPY --from=build --chown=65532:65532 /out/var/lib/cefas /var/lib/cefas

# Default data directory is bind-mounted by docker-compose / k8s; we
Expand Down
71 changes: 71 additions & 0 deletions dist/helm/cefas/templates/manager-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{{- if .Values.manager.enabled -}}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "cefas.fullname" . }}-manager
labels:
{{- include "cefas.labels" . | nindent 4 }}
app.kubernetes.io/component: manager
spec:
replicas: {{ .Values.manager.replicaCount }}
selector:
matchLabels:
{{- include "cefas.selectorLabels" . | nindent 6 }}
app.kubernetes.io/component: manager
template:
metadata:
labels:
{{- include "cefas.selectorLabels" . | nindent 8 }}
app.kubernetes.io/component: manager
spec:
serviceAccountName: {{ include "cefas.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.securityContext | nindent 8 }}
containers:
- name: cefas-manager
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
command:
- /usr/local/bin/cefas-manager
args:
- controller
- --leader-election
- --leader-lease-name={{ include "cefas.fullname" . }}-manager
- --leader-lease-ttl={{ .Values.manager.leaderLeaseTTL }}
- --endpoint=$(CEFAS_ENDPOINT)
- --http-endpoint=$(CEFAS_HTTP_ENDPOINT)
- --namespace=$(POD_NAMESPACE)
- --selector=$(CEFAS_KUBE_SELECTOR)
- --repair-mode={{ .Values.manager.repairMode }}
- --interval={{ .Values.manager.interval }}
- --timeout={{ .Values.manager.timeout }}
- --audit-log={{ .Values.manager.auditLog }}
- --insecure=true
{{- if .Values.manager.approveFencing }}
- --approve-fencing
{{- end }}
env:
- name: CEFAS_ENDPOINT
value: "{{ include "cefas.fullname" . }}:{{ .Values.service.grpc }}"
- name: CEFAS_HTTP_ENDPOINT
value: "http://{{ include "cefas.fullname" . }}:{{ .Values.service.http }}"
- name: CEFAS_KUBE_SELECTOR
value: "app.kubernetes.io/name={{ include "cefas.name" . }},app.kubernetes.io/instance={{ .Release.Name }}"
- name: CEFAS_MANAGER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
{{- if .Values.manager.tokenSecretName }}
- name: CEFAS_TOKEN
valueFrom:
secretKeyRef:
name: {{ .Values.manager.tokenSecretName }}
key: {{ .Values.manager.tokenSecretKey }}
{{- end }}
resources:
{{- toYaml .Values.manager.resources | nindent 12 }}
{{- end }}
79 changes: 79 additions & 0 deletions dist/helm/cefas/templates/manager-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{{- if .Values.manager.enabled -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ include "cefas.fullname" . }}-manager
labels:
{{- include "cefas.labels" . | nindent 4 }}
app.kubernetes.io/component: manager
rules:
- apiGroups:
- ""
resources:
- pods
- endpoints
- persistentvolumeclaims
verbs:
- get
- list
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ include "cefas.fullname" . }}-manager
labels:
{{- include "cefas.labels" . | nindent 4 }}
app.kubernetes.io/component: manager
subjects:
- kind: ServiceAccount
name: {{ include "cefas.serviceAccountName" . }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {{ include "cefas.fullname" . }}-manager
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "cefas.fullname" . }}-manager
labels:
{{- include "cefas.labels" . | nindent 4 }}
app.kubernetes.io/component: manager
rules:
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "cefas.fullname" . }}-manager
labels:
{{- include "cefas.labels" . | nindent 4 }}
app.kubernetes.io/component: manager
subjects:
- kind: ServiceAccount
name: {{ include "cefas.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "cefas.fullname" . }}-manager
{{- end }}
2 changes: 2 additions & 0 deletions dist/helm/cefas/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ spec:
volumeClaimTemplates:
- metadata:
name: data
labels:
{{- include "cefas.selectorLabels" . | nindent 10 }}
spec:
accessModes: ["ReadWriteOnce"]
{{- if .Values.persistence.storageClass }}
Expand Down
19 changes: 19 additions & 0 deletions dist/helm/cefas/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@ serviceAccount:
create: true
name: ""

manager:
enabled: true
replicaCount: 2
repairMode: observe
interval: 30s
timeout: 30s
leaderLeaseTTL: 30s
approveFencing: false
auditLog: /tmp/cefas-manager-audit.jsonl
tokenSecretName: ""
tokenSecretKey: token
resources:
requests:
cpu: "50m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "512Mi"

resources:
requests:
cpu: "200m"
Expand Down
111 changes: 111 additions & 0 deletions internal/manager/cefas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package manager

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/CefasDb/cefasdb/internal/placement"
"github.com/CefasDb/cefasdb/pkg/client"
)

type Cefas interface {
Status(ctx context.Context) (client.ClusterStatus, error)
AuditPlacement(ctx context.Context, req placement.PlacementAuditRequest) (placement.PlacementAuditReport, error)
PlanPlacement(ctx context.Context, req client.PlacementPlanRequest) (client.PlacementPlan, error)
ApplyPlacement(ctx context.Context, req client.PlacementApplyRequest) (client.PlacementApplyResult, error)
}

type SDKCefas struct {
GRPC *client.Client
Audit *HTTPAuditClient
}

func (s *SDKCefas) Status(ctx context.Context) (client.ClusterStatus, error) {
if s == nil || s.GRPC == nil {
return client.ClusterStatus{}, fmt.Errorf("cefas gRPC client is not configured")
}
return s.GRPC.Status(ctx)
}

func (s *SDKCefas) AuditPlacement(ctx context.Context, req placement.PlacementAuditRequest) (placement.PlacementAuditReport, error) {
if s == nil || s.Audit == nil {
return placement.PlacementAuditReport{}, fmt.Errorf("cefas audit HTTP client is not configured")
}
return s.Audit.AuditPlacement(ctx, req)
}

func (s *SDKCefas) PlanPlacement(ctx context.Context, req client.PlacementPlanRequest) (client.PlacementPlan, error) {
if s == nil || s.GRPC == nil {
return client.PlacementPlan{}, fmt.Errorf("cefas gRPC client is not configured")
}
return s.GRPC.PlanPlacement(ctx, req)
}

func (s *SDKCefas) ApplyPlacement(ctx context.Context, req client.PlacementApplyRequest) (client.PlacementApplyResult, error) {
if s == nil || s.GRPC == nil {
return client.PlacementApplyResult{}, fmt.Errorf("cefas gRPC client is not configured")
}
return s.GRPC.ApplyPlacement(ctx, req)
}

type HTTPAuditClient struct {
base *url.URL
token string
httpClient *http.Client
}

func NewHTTPAuditClient(baseURL, bearer string, httpClient *http.Client) (*HTTPAuditClient, error) {
if httpClient == nil {
httpClient = http.DefaultClient
}
if baseURL == "" {
return nil, fmt.Errorf("audit base URL is required")
}
u, err := url.Parse(strings.TrimRight(baseURL, "/"))
if err != nil {
return nil, err
}
return &HTTPAuditClient{base: u, token: bearer, httpClient: httpClient}, nil
}

func (c *HTTPAuditClient) AuditPlacement(ctx context.Context, req placement.PlacementAuditRequest) (placement.PlacementAuditReport, error) {
if c == nil {
return placement.PlacementAuditReport{}, fmt.Errorf("audit client is nil")
}
req.IncludeRepairPlan = true
body, err := json.Marshal(req)
if err != nil {
return placement.PlacementAuditReport{}, err
}
u := *c.base
u.Path = strings.TrimRight(u.Path, "/") + "/v1/cluster/placement/audit"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), bytes.NewReader(body))
if err != nil {
return placement.PlacementAuditReport{}, err
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "application/json")
if c.token != "" {
httpReq.Header.Set("Authorization", "Bearer "+c.token)
}
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return placement.PlacementAuditReport{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return placement.PlacementAuditReport{}, fmt.Errorf("placement audit status=%d body=%s", resp.StatusCode, strings.TrimSpace(string(data)))
}
var report placement.PlacementAuditReport
if err := json.NewDecoder(resp.Body).Decode(&report); err != nil {
return placement.PlacementAuditReport{}, err
}
return report, nil
}
Loading
Loading