Skip to content

Commit

Permalink
feat(controller-runtime): Implement stream controller (#211)
Browse files Browse the repository at this point in the history
* Add account, consumer and stream controller stubs to be implemented

Controllers and tests are based on files generated by operator-sdk.
Adds a minimal test suite for the controllers with a etcd test env and a test nats jetStream server to test against.

* Add logs to Reconcile functions

* Add jsClient to test suit variables

* Remove format from log string

* Make upsertCondition public to be used in new controllers

* Implement basic cases for stream reconciliation

See TODOs on what still needs to be implemented.

* refactor to use shared base controller

* Support jetstream connection options in stream spec

* implement stream deletion

* update observedGeneration of status

* check Spec.PreventDelete before stream deletion

* remove base js client

Use a single use client on every connection.
This should be replaced by a client pool in the future.

* move asJsonString to jetstream_controller

* check namespace read only and prevent update mode

* Update comments and log

* Fix test docs and check precondition

* Add preventUpdate test cases

* Add tests for read-only or namespace restricted mode

* fix empty ca when no ca set

Setting  CAs: []string{*ca} resulted in  []string{""} when no CA was set, leading to an error when creating clients.

* simplify error message

* fix error loop when the underlying stream was deleted

* refactor each phase into separate method

* Fix errors during parallel reconciliation & Refactor tests

- Trigger only on generation changes
- Split initialization and create into separate calls to Reconcile

* make test description strings more uniform

* Update docs and log messages

* extract configuration to buildNatsConfig method

* fix checking for preventDelete in the update step

Instead check for preventUpdate. Introduced during refactor.

* fix k8s binaries not downloaded for tests

* add /bin to gitignore

* rename stream helper functions

Prefix with stream to prevent conflict with other resources.

* update naming as suggested

* fix assumed reason in log message

* Update todo comments marked with review

 - Add note on opts.Account
 - Add comment on possible feature to expose TLSFirst in the spec.

* separate CA config from client cert and key

* set streamName and consumerName fields once on logger

Reword log messages.
  • Loading branch information
adriandieter authored Dec 19, 2024
1 parent 895dc1c commit 0f218f5
Show file tree
Hide file tree
Showing 20 changed files with 1,440 additions and 125 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/nats-boot-config
/nats-boot-config.docker
/tools
/bin
/.idea
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ $(ENVTEST): $(LOCALBIN)
.PHONY: test
test: envtest
go vet ./controllers/... ./pkg/natsreloader/... ./internal/controller/...
$(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path ## Get k8s binaries
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/... ./internal/controller/...

.PHONY: clean
Expand Down
7 changes: 6 additions & 1 deletion cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,23 @@ func run() error {

if *controlLoop {
klog.Warning("Starting jetStream controller in experimental control loop mode")

natsCfg := &controller.NatsConfig{
CRDConnect: *crdConnect,
ClientName: "jetstream-controller",
Credentials: *creds,
NKey: *nkey,
ServerURL: *server,
CA: *ca,
CAs: []string{},
Certificate: *cert,
Key: *key,
TLSFirst: *tlsfirst,
}

if *ca != "" {
natsCfg.CAs = []string{*ca}
}

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
Expand Down
4 changes: 2 additions & 2 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf
sc := s.DeepCopy()

sc.Status.ObservedGeneration = s.Generation
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down Expand Up @@ -490,7 +490,7 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume
}

sc := s.DeepCopy()
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func processQueueNext(q workqueue.RateLimitingInterface, jmsClient jsmClientFunc
q.Forget(item)
}

func upsertCondition(cs []apis.Condition, next apis.Condition) []apis.Condition {
func UpsertCondition(cs []apis.Condition, next apis.Condition) []apis.Condition {
for i := 0; i < len(cs); i++ {
if cs[i].Type != next.Type {
continue
Expand Down
6 changes: 3 additions & 3 deletions controllers/jetstream/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestUpsertCondition(t *testing.T) {

var cs []apis.Condition

cs = upsertCondition(cs, apis.Condition{
cs = UpsertCondition(cs, apis.Condition{
Type: readyCondType,
Status: k8sapis.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand All @@ -202,7 +202,7 @@ func TestUpsertCondition(t *testing.T) {
t.Fatalf("got=%s; want=%s", got, want)
}

cs = upsertCondition(cs, apis.Condition{
cs = UpsertCondition(cs, apis.Condition{
Type: readyCondType,
Status: k8sapis.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand All @@ -218,7 +218,7 @@ func TestUpsertCondition(t *testing.T) {
t.Fatalf("got=%s; want=%s", got, want)
}

cs = upsertCondition(cs, apis.Condition{
cs = UpsertCondition(cs, apis.Condition{
Type: "Foo",
Status: k8sapis.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
4 changes: 2 additions & 2 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func setStreamErrored(ctx context.Context, s *apis.Stream, sif typed.StreamInter
}

sc := s.DeepCopy()
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down Expand Up @@ -600,7 +600,7 @@ func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) (
sc := s.DeepCopy()

sc.Status.ObservedGeneration = s.Generation
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
8 changes: 1 addition & 7 deletions internal/controller/account_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ package controller

import (
"context"
"github.com/nats-io/nats.go/jetstream"
"k8s.io/klog/v2"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// AccountReconciler reconciles a Account object
type AccountReconciler struct {
client.Client
Scheme *runtime.Scheme
Config *Config
JetStream jetstream.JetStream
JetStreamController
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down
3 changes: 1 addition & 2 deletions internal/controller/account_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ var _ = Describe("Account Controller", func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &AccountReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
baseController,
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
Expand Down
23 changes: 15 additions & 8 deletions internal/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type NatsConfig struct {
Credentials string
NKey string
ServerURL string
CA string
CAs []string
Certificate string
Key string
TLSFirst bool
Expand Down Expand Up @@ -46,19 +46,26 @@ func (o *NatsConfig) buildOptions() ([]nats.Option, error) {
opts = append(opts, nats.ClientCert(o.Certificate, o.Key))
}

if o.CA != "" {
opts = append(opts, nats.RootCAs(o.CA))
if o.CAs != nil && len(o.CAs) > 0 {
opts = append(opts, nats.RootCAs(o.CAs...))
}
}

return opts, nil
}

func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, error) {
type Closable interface {
Close()
}

// CreateJetStreamClient creates new Jetstream client with a connection based on the given NatsConfig.
// Returns a jetstream.Jetstream client and the Closable of the underlying connection.
// Close should be called when the client is no longer used.
func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, Closable, error) {

opts, err := cfg.buildOptions()
if err != nil {
return nil, fmt.Errorf("nats options: %w", err)
return nil, nil, fmt.Errorf("nats options: %w", err)
}

// Set pedantic option
Expand All @@ -74,12 +81,12 @@ func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream,

nc, err := nats.Connect(cfg.ServerURL, opts...)
if err != nil {
return nil, fmt.Errorf("nats connect: %w", err)
return nil, nil, fmt.Errorf("nats connect: %w", err)
}

js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("new jetstream: %w", err)
return nil, nil, fmt.Errorf("new jetstream: %w", err)
}
return js, nil
return js, nc, nil
}
8 changes: 1 addition & 7 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ package controller

import (
"context"
"github.com/nats-io/nats.go/jetstream"
"k8s.io/klog/v2"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ConsumerReconciler reconciles a Consumer object
type ConsumerReconciler struct {
client.Client
Scheme *runtime.Scheme
Config *Config
JetStream jetstream.JetStream
JetStreamController
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down
3 changes: 1 addition & 2 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ var _ = Describe("Consumer Controller", func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &ConsumerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
baseController,
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
Expand Down
43 changes: 43 additions & 0 deletions internal/controller/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package controller

import (
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"

"os"
"time"
)

func assertReadyStateMatches(condition api.Condition, status v1.ConditionStatus, reason string, message string, transitionTime time.Time) {
GinkgoHelper()

Expect(condition.Type).To(Equal(readyCondType))
Expect(condition.Status).To(Equal(status))
Expect(condition.Reason).To(Equal(reason))
Expect(condition.Message).To(ContainSubstring(message))

// Assert valid transition time
t, err := time.Parse(time.RFC3339Nano, condition.LastTransitionTime)
Expect(err).NotTo(HaveOccurred())
Expect(t).To(BeTemporally("~", transitionTime, time.Second))
}

func CreateTestServer() *server.Server {
opts := &natsserver.DefaultTestOptions
opts.JetStream = true
opts.Port = -1
opts.Debug = true

dir, err := os.MkdirTemp("", "nats-*")
Expect(err).NotTo(HaveOccurred())
opts.StoreDir = dir

ns := natsserver.RunServer(opts)
Expect(err).NotTo(HaveOccurred())

return ns
}
Loading

0 comments on commit 0f218f5

Please sign in to comment.