Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS publisher support to reminder #4829

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions config/reminder-config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ logging:
level: "debug"

events:
sql_connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
driver: "sql"
# driver: "cloudevents-nats"
sql:
connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
# nats:
# url: "nats://localhost:4222"
# prefix: "minder"
# queue: "minder"
2 changes: 1 addition & 1 deletion internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func instantiateDriver(
return eventersql.BuildPostgreSQLDriver(ctx, cfg)
case constants.NATSDriver:
zerolog.Ctx(ctx).Info().Msg("Using NATS driver")
return nats.BuildNatsChannelDriver(cfg)
return nats.BuildNatsChannelDriver(&cfg.Nats)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other constructors take the entire cfg, so that's the model I followed here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has become a tedious problem to refactor. I have to change the signature of BuildNatsChannelDriver as cloudEventsNatsAdapter is not externally exposed.

Either I change the signature of every driver function to take in not serverconfig, but only the relevant config (in order to follow a pattern), which again isn't pretty as it does not truly achieve separation from serverconfig as functions like BuildPostgreSQLDriver would still rely on components purely in serverconfig (like cfg.SQLPubSub.AckDeadline) which isn't there in reminderconfig. Either it would result in breaking the pattern or some form of duplication of the instantiation code.

I see how this can slowly become just duplication of all instantiation code in order to keep minder and reminder separate, but I'm torn between keeping them separate (as minder is the main engine and reminder is a completely different service) v/s keeping them in the same code for ease of maintainability (and for de duplication - which again strikes the thought that this duplication would not have been there if reminder was in another language).

What do you think about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to re-use the implementation of the construction code in reminder, even as it runs as a separate service. (For example, most Kubernetes controllers use the same informers library and construction client to create their connection to the K8s apiserver.)

So, my suggestion would be to embed a serverconfig.EventConfig into the ReminderConfig, rather than attempting to mirror only selected options into the reminder config. This will mean that it's possible to have something meaningless like the Go channel in reminder; you can avoid that by adding a check in the top-level ReminderConfig.Validate that the event configuration isn't using the go module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This pattern will also give you access to the flag-driven eventer, and future improvements in flagging that I'm planning -- using a central flag server, rather than repeating the flag file for each binary.)

case constants.FlaggedDriver:
zerolog.Ctx(ctx).Info().Msg("Using Flagged driver")
return makeFlaggedDriver(ctx, cfg, flagClient)
Expand Down
39 changes: 20 additions & 19 deletions internal/events/nats/natschannel.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package nats provides a nants+cloudevents implementation of the eventer interface
// Package nats provides a nats+cloudevents implementation of the eventer interface
package nats

import (
Expand All @@ -17,23 +17,28 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/config"
)

// BuildNatsChannelDriver creates a new event driver using
// CloudEvents with the NATS-JetStream transport
func BuildNatsChannelDriver(cfg *serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
adapter := &cloudEventsNatsAdapter{cfg: &cfg.Nats}
func BuildNatsChannelDriver(cfg *config.NatsConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
if cfg == nil {
return nil, nil, nil, fmt.Errorf("NATS config is nil")
}

adapter := &cloudEventsNatsAdapter{cfg: cfg}
return adapter, adapter, func() {}, nil
}

// CloudEventsNatsPublisher actually consumes a _set_ of NATS topics,
// because CloudEvents-Jetstream has a separate Consumer for each topic
type cloudEventsNatsAdapter struct {
cfg *serverconfig.NatsConfig
cfg *config.NatsConfig
lock sync.Mutex
// Keep a cache of the topics we subscribe/publish to
topics map[string]topicState
Expand Down Expand Up @@ -114,25 +119,21 @@ func (c *cloudEventsNatsAdapter) ensureTopic(ctx context.Context, topic string,
return &state, nil
}

func (c *cloudEventsNatsAdapter) ensureStream(_ context.Context) error {
func (c *cloudEventsNatsAdapter) ensureStream(ctx context.Context) error {
conn, err := nats.Connect(c.cfg.URL)
if err != nil {
return err
}
defer conn.Close()
js, err := conn.JetStream()
js, err := jetstream.New(conn)
if err != nil {
return err
}
si, err := js.StreamInfo(c.cfg.Prefix)
if si == nil || err != nil && err.Error() == "stream not found" {
_, err = js.AddStream(&nats.StreamConfig{
Name: c.cfg.Prefix,
Subjects: []string{c.cfg.Prefix + ".>"},
})
return err
}
return nil
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: c.cfg.Prefix,
Subjects: []string{c.cfg.Prefix + ".>"},
})
return err
Comment on lines +132 to +136
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newer jetstream API, cleaner code, context support. https://github.com/nats-io/nats.go/blob/main/jetstream/README.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet.

}

// Subscribe implements message.Subscriber.
Expand Down Expand Up @@ -191,13 +192,13 @@ func (c *cloudEventsNatsAdapter) Publish(topic string, messages ...*message.Mess

state, err := c.ensureTopic(ctx, subject, "sender")
if err != nil {
return fmt.Errorf("Error creating topic %q: %w", subject, err)
return fmt.Errorf("error creating topic %q: %w", subject, err)
}

for _, msg := range messages {
err := sendEvent(ctx, subject, state.ceClient, msg)
if err != nil {
return fmt.Errorf("Error sending event to %q: %w", subject, err)
return fmt.Errorf("error sending event to %q: %w", subject, err)
}
}
return nil
Expand All @@ -214,7 +215,7 @@ func sendEvent(
// All our current payloads are encoded JSON; we need to unmarshal
payload := map[string]any{}
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return fmt.Errorf("Error unmarshalling payload: %w", err)
return fmt.Errorf("error unmarshalling payload: %w", err)
}

err := event.SetData("application/json", payload)
Expand Down
5 changes: 3 additions & 2 deletions internal/events/nats/natschannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"

"github.com/mindersec/minder/internal/events/common"
"github.com/mindersec/minder/pkg/config"
serverconfig "github.com/mindersec/minder/pkg/config/server"
)

Expand All @@ -27,7 +28,7 @@ func TestNatsChannel(t *testing.T) {
}
defer server.Shutdown()
cfg := serverconfig.EventConfig{
Nats: serverconfig.NatsConfig{
Nats: config.NatsConfig{
URL: server.ClientURL(),
Prefix: "test",
Queue: "minder",
Expand Down Expand Up @@ -121,7 +122,7 @@ loop:
}

func buildDriverPair(ctx context.Context, cfg serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, <-chan *message.Message, error) {
pub, sub, closer, err := BuildNatsChannelDriver(&cfg)
pub, sub, closer, err := BuildNatsChannelDriver(&cfg.Nats)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to build nats channel driver: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
natsinternal "github.com/mindersec/minder/internal/events/nats"
"github.com/mindersec/minder/pkg/eventer/constants"
)

func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
switch r.cfg.EventConfig.Driver {
case constants.NATSDriver:
return r.setupNATSPublisher(ctx)
case constants.SQLDriver:
return r.setupSQLPublisher(ctx)
default:
return nil, nil, fmt.Errorf("unknown publisher type: %s", r.cfg.EventConfig.Driver)
}
}
Comment on lines +20 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm nervous about having this diverge from internal/events/events.go. Can we use an eventer.Interface from eventer.New() here?

(I need to do some refactoring in that package to move eventer/interface into eventer, but I'll wait until this code is landed to reduce the amount of conflicts.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm nervous about having this diverge from internal/events/events.go

internal/events/events.go is all about serverconfig, it would diverge from it as this is reminderconfig. I don't think we should use the interfaces.Interface:

// Interface is a combination of the Publisher, Registrar, and Service interfaces.
// This is handy when spawning the eventer in a single function for easy setup.
type Interface interface {
	Publisher
	Registrar
	Service
}

as reminder only needs publishing capability, I'd be happy to implement interfaces.Publisher for reminder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to use the interfaces.Publisher in the reminder code, but it feels like it might be easiest if the reminder code expected an interfaces.Publisher (which is satisfied by the current eventing implementations), but the construction code used NewEventer to construct the interfaces.Publisher.


func (r *reminder) setupNATSPublisher(_ context.Context) (message.Publisher, common.DriverCloser, error) {
pub, _, cl, err := natsinternal.BuildNatsChannelDriver(&r.cfg.EventConfig.NatsConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create NATS publisher: %w", err)
}
return pub, cl, nil
}

func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
logger := zerolog.Ctx(ctx)

db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx)
db, _, err := r.cfg.EventConfig.SQLPubConfig.Connection.GetDBConnection(ctx)
if err != nil {
return nil, nil, fmt.Errorf("unable to connect to events database: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con
logger := zerolog.Ctx(ctx)
logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)

pub, cl, err := r.setupSQLPublisher(ctx)
pub, cl, err := r.getMessagePublisher(ctx)
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ func ReadKey(keypath string) ([]byte, error) {

return data, nil
}

// NatsConfig is the configuration when using NATS as the event driver
type NatsConfig struct {
// URL is the URL for the NATS server
URL string `mapstructure:"url" default:"nats://localhost:4222"`
// Prefix is the prefix for the NATS subjects to subscribe to
Prefix string `mapstructure:"prefix" default:"minder"`
// Queue is the name of the queue group to join when consuming messages
// queue groups allow multiple process to round-robin process messages.
Queue string `mapstructure:"queue" default:"minder"`
}
Comment on lines +152 to +162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to move this up to common (rather than just having a split between client and server configuration), let's put it in its own file. It feels kind of peculiar to have this in common while having the rest of the event configuration still in server, so I'd argue for moving the code back, particularly in the context of having smaller, more focused PRs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to move this up to common (rather than just having a split between client and server configuration), let's put it in its own file

That still breaks the pattern as all the structs in common don't have their own file.

so I'd argue for moving the code back,

To duplicate vs 'have common code'. Do you want me to duplicate the NatsConfig code in that case for reminderconfig

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a couple options here:

  1. The way that structs are all piled into common.go today is probably a historical mistake. I'm not asking you fix that mistake in this PR (because we should be making small, targeted changes), but we shouldn't make that mistake worse. By putting this in nats or eventing files, it will make the future expansion more clear.
  2. I think it's fine to have pkg/config/reminder depend on pkg/config/server, as long as we keep the graph acyclic. In particular, if we have common server-side configuration for e.g. flags and messaging, it feels like it would make sense to have those be shared between the two. Given that you can reasonably run a minder server without reminder, but the opposite is not very useful, I'd argue for having pkg/config/reminder import parts of the config from pkg/config/server. In the future, if we split the Minder server into e.g. rpc_server and event_processor, I'd suggest that both would want to use parts of the current ServerConfig, but would probably each have their own top-level struct for configuration (possibly in a separate module, or possibly in the pkg/config/server module).

I'm more concerned about leaking the dependencies on e.g. NATS into the minder client, which is statically linked and downloaded by many more users than the server. Right now, this take that dependency, but I could see add a helper to create a NATS connection to this library in the future.

14 changes: 8 additions & 6 deletions pkg/config/reminder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func TestValidateConfig(t *testing.T) {
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: reminder.EventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
SQLPubConfig: reminder.SQLPubConfig{
Connection: config.DatabaseConfig{
Port: 8080,
},
},
},
},
Expand Down Expand Up @@ -153,10 +155,10 @@ func TestSetViperDefaults(t *testing.T) {
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval")))
require.Equal(t, 100, v.GetInt("recurrence.batch_size"))
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed")))
require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser"))
require.Equal(t, "reminder", v.GetString("events.sql.connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser"))
}

// TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables
Expand Down
9 changes: 8 additions & 1 deletion pkg/config/reminder/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (

// EventConfig is the configuration for reminder's eventing system.
type EventConfig struct {
Driver string `mapstructure:"driver" default:"sql"`
SQLPubConfig SQLPubConfig `mapstructure:"sql"`
NatsConfig config.NatsConfig `mapstructure:"nats"`
}

// SQLPubConfig is the configuration for the SQL publisher
type SQLPubConfig struct {
// Connection is the configuration for the SQL event driver
//
// nolint: lll
Connection config.DatabaseConfig `mapstructure:"sql_connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"`
Connection config.DatabaseConfig `mapstructure:"connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"`
}
13 changes: 1 addition & 12 deletions pkg/config/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type EventConfig struct {
// Aggregator is the configuration for the event aggregator middleware
Aggregator AggregatorConfig `mapstructure:"aggregator"`
// Nats is the configuration when using NATS as the event driver
Nats NatsConfig `mapstructure:"nats"`
Nats config.NatsConfig `mapstructure:"nats"`
}

// GoChannelEventConfig is the configuration for the go channel event driver
Expand Down Expand Up @@ -58,17 +58,6 @@ type AggregatorConfig struct {
LockInterval int64 `mapstructure:"lock_interval" default:"30"`
}

// NatsConfig is the configuration when using NATS as the event driver
type NatsConfig struct {
// URL is the URL for the NATS server
URL string `mapstructure:"url" default:"nats://localhost:4222"`
// Prefix is the prefix for the NATS subjects to subscribe to
Prefix string `mapstructure:"prefix" default:"minder"`
// Queue is the name of the queue group to join when consuming messages
// queue groups allow multiple process to round-robin process messages.
Queue string `mapstructure:"queue" default:"minder"`
}

// FlagDriverConfig holds the configuration for selecting multiple publishing drivers
// when using feature flags to migrate from one publishing mechanism to another.
// When using the "flagged" driver, events will be read from _both_ drivers, but
Expand Down
Loading