-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NATS publisher support to reminder #4829
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// Package nats provides a nants+cloudevents implementation of the eventer interface | ||
// Package nats provides a nats+cloudevents implementation of the eventer interface | ||
package nats | ||
|
||
import ( | ||
|
@@ -17,23 +17,28 @@ import ( | |
cloudevents "github.com/cloudevents/sdk-go/v2" | ||
"github.com/cloudevents/sdk-go/v2/client" | ||
"github.com/nats-io/nats.go" | ||
"github.com/nats-io/nats.go/jetstream" | ||
"github.com/rs/zerolog" | ||
|
||
"github.com/mindersec/minder/internal/events/common" | ||
serverconfig "github.com/mindersec/minder/pkg/config/server" | ||
"github.com/mindersec/minder/pkg/config" | ||
) | ||
|
||
// BuildNatsChannelDriver creates a new event driver using | ||
// CloudEvents with the NATS-JetStream transport | ||
func BuildNatsChannelDriver(cfg *serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) { | ||
adapter := &cloudEventsNatsAdapter{cfg: &cfg.Nats} | ||
func BuildNatsChannelDriver(cfg *config.NatsConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) { | ||
if cfg == nil { | ||
return nil, nil, nil, fmt.Errorf("NATS config is nil") | ||
} | ||
|
||
adapter := &cloudEventsNatsAdapter{cfg: cfg} | ||
return adapter, adapter, func() {}, nil | ||
} | ||
|
||
// CloudEventsNatsPublisher actually consumes a _set_ of NATS topics, | ||
// because CloudEvents-Jetstream has a separate Consumer for each topic | ||
type cloudEventsNatsAdapter struct { | ||
cfg *serverconfig.NatsConfig | ||
cfg *config.NatsConfig | ||
lock sync.Mutex | ||
// Keep a cache of the topics we subscribe/publish to | ||
topics map[string]topicState | ||
|
@@ -114,25 +119,21 @@ func (c *cloudEventsNatsAdapter) ensureTopic(ctx context.Context, topic string, | |
return &state, nil | ||
} | ||
|
||
func (c *cloudEventsNatsAdapter) ensureStream(_ context.Context) error { | ||
func (c *cloudEventsNatsAdapter) ensureStream(ctx context.Context) error { | ||
conn, err := nats.Connect(c.cfg.URL) | ||
if err != nil { | ||
return err | ||
} | ||
defer conn.Close() | ||
js, err := conn.JetStream() | ||
js, err := jetstream.New(conn) | ||
if err != nil { | ||
return err | ||
} | ||
si, err := js.StreamInfo(c.cfg.Prefix) | ||
if si == nil || err != nil && err.Error() == "stream not found" { | ||
_, err = js.AddStream(&nats.StreamConfig{ | ||
Name: c.cfg.Prefix, | ||
Subjects: []string{c.cfg.Prefix + ".>"}, | ||
}) | ||
return err | ||
} | ||
return nil | ||
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | ||
Name: c.cfg.Prefix, | ||
Subjects: []string{c.cfg.Prefix + ".>"}, | ||
}) | ||
return err | ||
Comment on lines
+132
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Newer jetstream API, cleaner code, context support. https://github.com/nats-io/nats.go/blob/main/jetstream/README.md There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet. |
||
} | ||
|
||
// Subscribe implements message.Subscriber. | ||
|
@@ -191,13 +192,13 @@ func (c *cloudEventsNatsAdapter) Publish(topic string, messages ...*message.Mess | |
|
||
state, err := c.ensureTopic(ctx, subject, "sender") | ||
if err != nil { | ||
return fmt.Errorf("Error creating topic %q: %w", subject, err) | ||
return fmt.Errorf("error creating topic %q: %w", subject, err) | ||
} | ||
|
||
for _, msg := range messages { | ||
err := sendEvent(ctx, subject, state.ceClient, msg) | ||
if err != nil { | ||
return fmt.Errorf("Error sending event to %q: %w", subject, err) | ||
return fmt.Errorf("error sending event to %q: %w", subject, err) | ||
} | ||
} | ||
return nil | ||
|
@@ -214,7 +215,7 @@ func sendEvent( | |
// All our current payloads are encoded JSON; we need to unmarshal | ||
payload := map[string]any{} | ||
if err := json.Unmarshal(msg.Payload, &payload); err != nil { | ||
return fmt.Errorf("Error unmarshalling payload: %w", err) | ||
return fmt.Errorf("error unmarshalling payload: %w", err) | ||
} | ||
|
||
err := event.SetData("application/json", payload) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,12 +13,33 @@ import ( | |
"github.com/rs/zerolog" | ||
|
||
"github.com/mindersec/minder/internal/events/common" | ||
natsinternal "github.com/mindersec/minder/internal/events/nats" | ||
"github.com/mindersec/minder/pkg/eventer/constants" | ||
) | ||
|
||
func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { | ||
switch r.cfg.EventConfig.Driver { | ||
case constants.NATSDriver: | ||
return r.setupNATSPublisher(ctx) | ||
case constants.SQLDriver: | ||
return r.setupSQLPublisher(ctx) | ||
default: | ||
return nil, nil, fmt.Errorf("unknown publisher type: %s", r.cfg.EventConfig.Driver) | ||
} | ||
} | ||
Comment on lines
+20
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm nervous about having this diverge from (I need to do some refactoring in that package to move There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
// Interface is a combination of the Publisher, Registrar, and Service interfaces.
// This is handy when spawning the eventer in a single function for easy setup.
type Interface interface {
Publisher
Registrar
Service
} as reminder only needs publishing capability, I'd be happy to implement interfaces.Publisher for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to use the |
||
|
||
func (r *reminder) setupNATSPublisher(_ context.Context) (message.Publisher, common.DriverCloser, error) { | ||
pub, _, cl, err := natsinternal.BuildNatsChannelDriver(&r.cfg.EventConfig.NatsConfig) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("failed to create NATS publisher: %w", err) | ||
} | ||
return pub, cl, nil | ||
} | ||
|
||
func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { | ||
logger := zerolog.Ctx(ctx) | ||
|
||
db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx) | ||
db, _, err := r.cfg.EventConfig.SQLPubConfig.Connection.GetDBConnection(ctx) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("unable to connect to events database: %w", err) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,3 +149,14 @@ func ReadKey(keypath string) ([]byte, error) { | |
|
||
return data, nil | ||
} | ||
|
||
// NatsConfig is the configuration when using NATS as the event driver | ||
type NatsConfig struct { | ||
// URL is the URL for the NATS server | ||
URL string `mapstructure:"url" default:"nats://localhost:4222"` | ||
// Prefix is the prefix for the NATS subjects to subscribe to | ||
Prefix string `mapstructure:"prefix" default:"minder"` | ||
// Queue is the name of the queue group to join when consuming messages | ||
// queue groups allow multiple process to round-robin process messages. | ||
Queue string `mapstructure:"queue" default:"minder"` | ||
} | ||
Comment on lines
+152
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to move this up to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That still breaks the pattern as all the structs in
To duplicate vs 'have common code'. Do you want me to duplicate the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there are a couple options here:
I'm more concerned about leaking the dependencies on e.g. NATS into the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the other constructors take the entire
cfg
, so that's the model I followed here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has become a tedious problem to refactor. I have to change the signature of
BuildNatsChannelDriver
ascloudEventsNatsAdapter
is not externally exposed.Either I change the signature of every driver function to take in not
serverconfig
, but only the relevant config (in order to follow a pattern), which again isn't pretty as it does not truly achieve separation fromserverconfig
as functions likeBuildPostgreSQLDriver
would still rely on components purely inserverconfig
(likecfg.SQLPubSub.AckDeadline
) which isn't there inreminderconfig
. Either it would result in breaking the pattern or some form of duplication of the instantiation code.I see how this can slowly become just duplication of all instantiation code in order to keep minder and reminder separate, but I'm torn between keeping them separate (as minder is the main engine and reminder is a completely different service) v/s keeping them in the same code for ease of maintainability (and for de duplication - which again strikes the thought that this duplication would not have been there if reminder was in another language).
What do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to re-use the implementation of the construction code in reminder, even as it runs as a separate service. (For example, most Kubernetes controllers use the same informers library and construction client to create their connection to the K8s apiserver.)
So, my suggestion would be to embed a
serverconfig.EventConfig
into theReminderConfig
, rather than attempting to mirror only selected options into the reminder config. This will mean that it's possible to have something meaningless like the Go channel in reminder; you can avoid that by adding a check in the top-levelReminderConfig.Validate
that the event configuration isn't using the go module.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This pattern will also give you access to the flag-driven eventer, and future improvements in flagging that I'm planning -- using a central flag server, rather than repeating the flag file for each binary.)