Skip to content

Commit

Permalink
Migrate to reconciliation loop using controller-runtime (#208)
Browse files Browse the repository at this point in the history
* Setup optional controller-runtime manager in main

Removes the kubeconfig flag and instead uses ctrl.RegisterFlags(fs)
 and ctrl.GetConfig(). The controller-runtime currently registers the kubeconfig flag, which lead to a redefined flag error when registering it again.

* Add update permissions for resource finalizers

* Add envtest to Makefile

This is based on the Makefile of an operator-sdk based project.

* Update test to include envtest and run the internal/controller test suite

* Add account, consumer and stream controller stubs to be implemented

Controllers and tests are based on files generated by operator-sdk.
Adds a minimal test suite for the controllers with a etcd test env and a test nats jetStream server to test against.

* Add logs to Reconcile functions

* Add internal/controller to jetstreamSrc

* Register account, consumer and stream reconcilers

* Add jsClient to test suit variables

* Remove format from log string
  • Loading branch information
adriandieter authored and samuelattwood committed Dec 9, 2024
1 parent b751988 commit 895dc1c
Show file tree
Hide file tree
Showing 14 changed files with 870 additions and 159 deletions.
38 changes: 34 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ export GO111MODULE := on

SHELL=/usr/bin/env bash

ENVTEST_K8S_VERSION = 1.29.0

now := $(shell date -u +%Y-%m-%dT%H:%M:%S%z)
gitBranch := $(shell git rev-parse --abbrev-ref HEAD)
gitCommit := $(shell git rev-parse --short HEAD)
Expand All @@ -11,7 +13,7 @@ VERSION ?= version-not-set
linkerVars := -X main.BuildTime=$(now) -X main.GitInfo=$(gitBranch)-$(gitCommit)$(repoDirty) -X main.Version=$(VERSION)
drepo ?= natsio

jetstreamSrc := $(shell find cmd/jetstream-controller pkg/jetstream controllers/jetstream -name "*.go") pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go
jetstreamSrc := $(shell find cmd/jetstream-controller pkg/jetstream internal/controller controllers/jetstream -name "*.go") pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go

configReloaderSrc := $(shell find cmd/nats-server-config-reloader/ pkg/natsreloader/ -name "*.go")

Expand Down Expand Up @@ -169,10 +171,38 @@ fetch-modules:
.PHONY: build
build: jetstream-controller nats-server-config-reloader nats-boot-config

# Setup envtest tools based on a operator-sdk project makefile
LOCALBIN ?= $(shell pwd)/bin
$(LOCALBIN):
mkdir -p $(LOCALBIN)

# go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist
# $1 - target path with name of binary (ideally with version)
# $2 - package url which can be installed
# $3 - specific version of package
define go-install-tool
@[ -f $(1) ] || { \
set -e; \
package=$(2)@$(3) ;\
echo "Downloading $${package}" ;\
GOBIN=$(LOCALBIN) go install $${package} ;\
mv "$$(echo "$(1)" | sed "s/-$(3)$$//")" $(1) ;\
}
endef

ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
ENVTEST_VERSION ?= release-0.17

.PHONY: envtest
envtest: $(ENVTEST) ## Download setup-envtest locally if necessary.
$(ENVTEST): $(LOCALBIN)
$(call go-install-tool,$(ENVTEST),sigs.k8s.io/controller-runtime/tools/setup-envtest,$(ENVTEST_VERSION))


.PHONY: test
test:
go vet ./controllers/... ./pkg/natsreloader/...
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/...
test: envtest
go vet ./controllers/... ./pkg/natsreloader/... ./internal/controller/...
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/... ./internal/controller/...

.PHONY: clean
clean:
Expand Down
92 changes: 72 additions & 20 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@ import (
"errors"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/nats-io/nack/controllers/jetstream"
"github.com/nats-io/nack/internal/controller"
jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
clientset "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"
"os"
"os/signal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"syscall"
"time"
)

var (
Expand All @@ -49,7 +52,10 @@ func main() {

func run() error {
klog.InitFlags(nil)
kubeConfig := flag.String("kubeconfig", "", "Path to kubeconfig")

// Explicitly register controller-runtime flags
ctrl.RegisterFlags(nil)

namespace := flag.String("namespace", v1.NamespaceAll, "Restrict to a namespace")
version := flag.Bool("version", false, "Print the version and exit")
creds := flag.String("creds", "", "NATS Credentials")
Expand All @@ -62,6 +68,8 @@ func run() error {
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config")
cleanupPeriod := flag.Duration("cleanup-period", 30*time.Second, "Period to run object cleanup")
readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources")
controlLoop := flag.Bool("control-loop", false, "Experimental: Run controller with a full reconciliation control loop.")

flag.Parse()

if *version {
Expand All @@ -73,18 +81,30 @@ func run() error {
return errors.New("NATS Server URL is required")
}

var config *rest.Config
var err error
if *kubeConfig == "" {
config, err = rest.InClusterConfig()
if err != nil {
return err
config, err := ctrl.GetConfig()
if err != nil {
return fmt.Errorf("get kubernetes rest config: %w", err)
}

if *controlLoop {
klog.Warning("Starting jetStream controller in experimental control loop mode")
natsCfg := &controller.NatsConfig{
CRDConnect: *crdConnect,
ClientName: "jetstream-controller",
Credentials: *creds,
NKey: *nkey,
ServerURL: *server,
CA: *ca,
Certificate: *cert,
Key: *key,
TLSFirst: *tlsfirst,
}
} else {
config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
return err

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
}
return runControlLoop(config, natsCfg, controllerCfg)
}

// K8S API Client.
Expand Down Expand Up @@ -129,6 +149,38 @@ func run() error {
return ctrl.Run()
}

func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, controllerCfg *controller.Config) error {

// Setup scheme
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(jetstreamnatsiov1beta2.AddToScheme(scheme))

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Logger: klog.NewKlogr().WithName("controller-runtime"),
// TODO Add full configuration
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)
}

err = controller.RegisterAll(mgr, natsCfg, controllerCfg)
if err != nil {
return fmt.Errorf("register jetstream controllers: %w", err)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return fmt.Errorf("unable to set up health check: %w", err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
return fmt.Errorf("unable to set up ready check: %w", err)
}

klog.Info("starting manager")
return mgr.Start(ctrl.SetupSignalHandler())
}

func handleSignals(cancel context.CancelFunc) {
sigc := make(chan os.Signal, 2)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
Expand Down
8 changes: 8 additions & 0 deletions deploy/rbac.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ rules:
- patch
- update
- delete
- apiGroups:
- jetstream.nats.io
resources:
- streams/finalizers
- consumers/finalizers
- accounts/finalizers
verbs:
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
20 changes: 18 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ require (
github.com/nats-io/jsm.go v0.1.2
github.com/nats-io/nats-server/v2 v2.10.22
github.com/nats-io/nats.go v1.37.0
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.9.0
Expand All @@ -17,26 +19,33 @@ require (
k8s.io/client-go v0.31.2
k8s.io/code-generator v0.31.2
k8s.io/klog/v2 v2.130.1
sigs.k8s.io/controller-runtime v0.19.2
sigs.k8s.io/structured-merge-diff/v4 v4.4.3
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand All @@ -52,9 +61,16 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
Expand All @@ -63,13 +79,13 @@ require (
golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.27.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/gengo v0.0.0-20240911193312-2b36238f13e9 // indirect
k8s.io/apiextensions-apiserver v0.31.0 // indirect
k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078 // indirect
Expand Down
Loading

0 comments on commit 895dc1c

Please sign in to comment.