From a08c51cbc1cd21392af3d35ed0f5b68afda1e029 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 17:41:18 +0100 Subject: [PATCH 1/9] feat(config): add config watcher The new generic config watcher is able to watch configs residing in plain files, files mounted via k8s ConfigMap volumes, and k8s ConfigMaps directly (via informers) when configured. --- scheduler/pkg/config/watcher.go | 288 ++++++++++++++++++++++++++++++++ scheduler/pkg/util/config.go | 14 ++ scheduler/pkg/util/deepcopy.go | 14 ++ scheduler/pkg/util/defaults.go | 13 ++ 4 files changed, 329 insertions(+) create mode 100644 scheduler/pkg/config/watcher.go create mode 100644 scheduler/pkg/util/config.go create mode 100644 scheduler/pkg/util/deepcopy.go create mode 100644 scheduler/pkg/util/defaults.go diff --git a/scheduler/pkg/config/watcher.go b/scheduler/pkg/config/watcher.go new file mode 100644 index 0000000000..26dc8a8bf6 --- /dev/null +++ b/scheduler/pkg/config/watcher.go @@ -0,0 +1,288 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package config + +import ( + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "sync" + + "github.com/fsnotify/fsnotify" + log "github.com/sirupsen/logrus" + yaml "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "knative.dev/pkg/configmap" + "knative.dev/pkg/configmap/informer" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" +) + +type ConfigUpdateProcessor[T any, PT util.ConfigHandle[T]] func(config PT, logger log.FieldLogger) error + +type ConfigWatcher[T any, PT util.ConfigHandle[T]] struct { + logger log.FieldLogger + mu sync.RWMutex + config PT + configFilePath string + listeners []chan<- T + namespace string + useConfigMapInformer bool + configMapName string + configMapFileName string + watcher *fsnotify.Watcher + fileWatcherDone chan struct{} + configMapWatcherDone chan struct{} + configUpdateProcessor ConfigUpdateProcessor[T, PT] +} + +func NewConfigWatcher[T any, PT util.ConfigHandle[T]](configPath string, configMapFileName string, namespace string, watchK8sConfigMap bool, configMapName string, clientset kubernetes.Interface, configUpdateProcessor ConfigUpdateProcessor[T, PT], logger log.FieldLogger) (*ConfigWatcher[T, PT], error) { + configHandler := &ConfigWatcher[T, PT]{ + logger: logger.WithField("source", "ConfigWatcher"), + namespace: namespace, + useConfigMapInformer: watchK8sConfigMap, + configMapName: configMapName, + configMapFileName: configMapFileName, + configUpdateProcessor: configUpdateProcessor, + } + + if configPath != "" { + isDir, err := configPathisDir(configPath) + if err != nil { + return nil, fmt.Errorf("config path %s: %w", configPath, err) + } + if isDir { + configPath = path.Join(configPath, configMapFileName) + } + logger.Infof("Init config from path %s", configPath) + err = configHandler.initConfigFromPath(configPath) + if err != nil { + return nil, err + } + _, filename := path.Split(configPath) + if filename != "" && filename != configMapFileName { + logger.Warnf("Watched local config file name %s does not match config map file name %s. This means the config may get updates from two different sources", filename, configMapFileName) + } + } + + err := configHandler.initWatcher(configPath, clientset) + if err != nil { + return nil, err + } + + return configHandler, nil +} + +func configPathisDir(configPath string) (bool, error) { + fileInfo, err := os.Stat(configPath) + if err != nil { + return false, err + } + return fileInfo.IsDir(), nil +} + +func (cw *ConfigWatcher[T, PT]) initConfigFromPath(configPath string) error { + m, err := configmap.Load(path.Dir(configPath)) + if err != nil { + return err + } + + _, configFileName := path.Split(configPath) + if v, ok := m[configFileName]; ok { + err = cw.UpdateConfig([]byte(v), configFileName) + if err != nil { + return err + } + cw.configFilePath = path.Clean(configPath) + return nil + } + return fmt.Errorf("configuration watcher failed to find file %s", configPath) +} + +func (cw *ConfigWatcher[T, PT]) initWatcher(configPath string, clientset kubernetes.Interface) error { + logger := cw.logger.WithField("func", "initWatcher") + if cw.useConfigMapInformer && clientset != nil { // Watch k8s config map + err := cw.watchConfigMap(clientset) + if err != nil { + return err + } + } else if configPath != "" { // Watch local file + err := cw.watchFile(cw.configFilePath) + if err != nil { + return err + } + } else { + logger.Warnf("No config available on initialization") + } + return nil +} + +func (cw *ConfigWatcher[T, PT]) Close() error { + if cw.fileWatcherDone != nil { + close(cw.fileWatcherDone) + } + if cw.configMapWatcherDone != nil { + close(cw.configMapWatcherDone) + } + var err error + if cw.watcher != nil { + err = cw.watcher.Close() + } + for _, c := range cw.listeners { + close(c) + } + return err +} + +func (cw *ConfigWatcher[T, PT]) AddListener(c chan<- T) { + cw.mu.Lock() + defer cw.mu.Unlock() + cw.listeners = append(cw.listeners, c) +} + +func (cw *ConfigWatcher[T, PT]) GetConfiguration() T { + cw.mu.RLock() + defer cw.mu.RUnlock() + if cw.config != nil { + return cw.config.DeepCopy() + } else { + return cw.config.Default() + } +} + +func (cw *ConfigWatcher[T, PT]) UpdateConfig(configData []byte, filename string) error { + logger := cw.logger.WithField("func", "updateConfig") + + cw.mu.Lock() + defer cw.mu.Unlock() + + config := new(T) + canonicalExt := strings.Trim(strings.ToLower(path.Ext(filename)), " ") + if canonicalExt == ".yaml" { + err := yaml.Unmarshal(configData, &config) + if err != nil { + return err + } + } else { + // assume json if not yaml, irrespective of file extension + err := json.Unmarshal(configData, &config) + if err != nil { + return err + } + } + + // The config update processor is passed a pointer to the config so that it can validate and + // modify it as needed based on application logic. Any changes to the config are made while + // holding the config watcher (write) lock. + if cw.configUpdateProcessor != nil { + err := cw.configUpdateProcessor(config, logger) + if err != nil { + return err + } + } + + cw.config = config + return nil +} + +// Watch the config file passed for changes, reload and signal listeners when it does +func (cw *ConfigWatcher[T, PT]) watchFile(filePath string) error { + logger := cw.logger.WithField("func", "watchFile") + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.Error(err, "Failed to create watcher") + return err + } + cw.watcher = watcher + cw.fileWatcherDone = make(chan struct{}) + + configDir, _ := filepath.Split(filePath) + knownConfigFile, _ := filepath.EvalSymlinks(filePath) + + go func() { + for { + select { + case event := <-watcher.Events: + isCreate := event.Op&fsnotify.Create != 0 + isWrite := event.Op&fsnotify.Write != 0 + isRemove := event.Op&fsnotify.Remove != 0 + + // when running in k8s, the file is a symlink that gets replaced on update + currentConfigFile, _ := filepath.EvalSymlinks(filePath) + + existingFileChanged := filepath.Clean(event.Name) == filePath && (isWrite || isCreate) + configSymlinkChanged := currentConfigFile != "" && currentConfigFile != knownConfigFile + + if existingFileChanged || configSymlinkChanged { + knownConfigFile = currentConfigFile + b, err := os.ReadFile(filePath) + if err != nil { + logger.WithError(err).Errorf("Failed to read %s", filePath) + } else { + err := cw.UpdateConfig(b, filePath) + if err != nil { + logger.WithError(err).Errorf("Failed to update config %s", filePath) + } else { + cw.mu.RLock() + for _, ch := range cw.listeners { + ch <- cw.config.DeepCopy() + } + cw.mu.RUnlock() + } + } + } else if filepath.Clean(event.Name) == filePath && isRemove { + return + } + case err := <-watcher.Errors: + logger.Error(err, "watcher error") + case <-cw.fileWatcherDone: + return + } + } + }() + + if err = watcher.Add(configDir); err != nil { + cw.logger.Errorf("Failed to add file path %s to config watcher", filePath) + return err + } + cw.logger.Infof("Starting to watch config file %s", filePath) + + return nil +} + +func (cw *ConfigWatcher[T, PT]) watchConfigMap(clientset kubernetes.Interface) error { + logger := cw.logger.WithField("func", "watchConfigMap") + + watcher := informer.NewInformedWatcher(clientset, cw.namespace) + watcher.Watch(cw.configMapName, func(updated *corev1.ConfigMap) { + filename := cw.configMapFileName + if data, ok := updated.Data[filename]; ok { + err := cw.UpdateConfig([]byte(data), cw.configMapName) + if err != nil { + logger.Errorf("Failed to update config with data in configmap %s.%s/%s", cw.configMapName, cw.namespace, filename) + } else { + cw.mu.RLock() + for _, ch := range cw.listeners { + ch <- cw.config.DeepCopy() + } + cw.mu.RUnlock() + } + } + }) + cw.configMapWatcherDone = make(chan struct{}) + err := watcher.Start(cw.configMapWatcherDone) + if err != nil { + return err + } + return nil +} diff --git a/scheduler/pkg/util/config.go b/scheduler/pkg/util/config.go new file mode 100644 index 0000000000..11a0707dc1 --- /dev/null +++ b/scheduler/pkg/util/config.go @@ -0,0 +1,14 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package util + +type ConfigHandle[T any] interface { + DeepCopier[T] + Defaulter[T] +} diff --git a/scheduler/pkg/util/deepcopy.go b/scheduler/pkg/util/deepcopy.go new file mode 100644 index 0000000000..08a3fe8c6d --- /dev/null +++ b/scheduler/pkg/util/deepcopy.go @@ -0,0 +1,14 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package util + +type DeepCopier[T any] interface { + DeepCopy() T + *T +} diff --git a/scheduler/pkg/util/defaults.go b/scheduler/pkg/util/defaults.go new file mode 100644 index 0000000000..c6ef435d07 --- /dev/null +++ b/scheduler/pkg/util/defaults.go @@ -0,0 +1,13 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package util + +type Defaulter[T any] interface { + Default() T +} From b5437454fbc41dc86e382b405a54562dfa053abf Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 17:47:15 +0100 Subject: [PATCH 2/9] feat(agent): update agent config to use new config watcher --- scheduler/cmd/agent/main.go | 2 +- scheduler/pkg/agent/config/config.go | 245 ++++-------------- scheduler/pkg/agent/config/config_test.go | 80 +----- scheduler/pkg/agent/rclone/rclone.go | 5 +- scheduler/pkg/agent/rclone/rclone_config.go | 17 +- .../pkg/agent/rclone/rclone_config_test.go | 2 +- scheduler/pkg/agent/rclone/rclone_test.go | 2 +- 7 files changed, 76 insertions(+), 277 deletions(-) diff --git a/scheduler/cmd/agent/main.go b/scheduler/cmd/agent/main.go index 28d7e65a7e..8fa58799de 100644 --- a/scheduler/cmd/agent/main.go +++ b/scheduler/cmd/agent/main.go @@ -183,7 +183,7 @@ func main() { } defer func() { _ = agentConfigHandler.Close() - logger.Info("Closed agent handler") + logger.Info("Closed agent config watcher") }() // Create Rclone client diff --git a/scheduler/pkg/agent/config/config.go b/scheduler/pkg/agent/config/config.go index c4a4c97369..a3fb0000fe 100644 --- a/scheduler/pkg/agent/config/config.go +++ b/scheduler/pkg/agent/config/config.go @@ -10,19 +10,12 @@ the Change License after the Change Date as each is defined in accordance with t package config import ( - "fmt" - "os" - "path" - "sync" "time" - "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" - yaml "gopkg.in/yaml.v2" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" - "knative.dev/pkg/configmap" - "knative.dev/pkg/configmap/informer" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/config" ) const ( @@ -31,6 +24,18 @@ const ( ServiceReadyRetryMaxInterval = 30 * time.Second ) +var ( + DefaultAgentConfiguration = AgentConfiguration{ + Rclone: &RcloneConfiguration{ + ConfigSecrets: []string{}, + Config: []string{}, + }, + Kafka: &KafkaConfiguration{ + Active: false, + }, + } +) + type AgentConfiguration struct { Rclone *RcloneConfiguration `json:"rclone,omitempty" yaml:"rclone,omitempty"` Kafka *KafkaConfiguration `json:"kafka,omitempty" yaml:"kafka,omitempty"` @@ -46,203 +51,67 @@ type KafkaConfiguration struct { Broker string `json:"broker,omitempty" yaml:"broker,omitempty"` } -type AgentConfigHandler struct { - logger log.FieldLogger - mu sync.RWMutex - config *AgentConfiguration - listeners []chan<- AgentConfiguration - watcher *fsnotify.Watcher - fileWatcherDone chan struct{} - namespace string - configFilePath string - configMapWatcherDone chan struct{} -} - -func NewAgentConfigHandler(configPath string, namespace string, logger log.FieldLogger, clientset kubernetes.Interface) (*AgentConfigHandler, error) { - configHandler := &AgentConfigHandler{ - logger: logger.WithField("source", "AgentConfigHandler"), - namespace: namespace, - } - if configPath != "" { - logger.Infof("Init config from path %s", configPath) - err := configHandler.initConfigFromPath(configPath) - if err != nil { - return nil, err - } - } +type AgentConfigHandler = config.ConfigWatcher[AgentConfiguration, *AgentConfiguration] - err := configHandler.initWatcher(configPath, namespace, clientset) - if err != nil { - return nil, err - } +func (ac *AgentConfiguration) DeepCopy() AgentConfiguration { + var rcloneCopy *RcloneConfiguration + var kafkaCopy *KafkaConfiguration - return configHandler, nil -} - -func (a *AgentConfigHandler) initConfigFromPath(configPath string) error { - m, err := configmap.Load(configPath) - if err != nil { - return err - } - - if v, ok := m[AgentConfigYamlFilename]; ok { - err = a.updateConfig([]byte(v)) - if err != nil { - return err + if ac.Rclone != nil { + // Maintain nil slices if settings are not present. + // This is important because json.Marshal treats nil and empty slices differently. + var cs []string + if len(ac.Rclone.ConfigSecrets) > 0 { + cs = make([]string, len(ac.Rclone.ConfigSecrets)) + copy(cs, ac.Rclone.ConfigSecrets) } - a.configFilePath = path.Join(configPath, AgentConfigYamlFilename) - return nil - } - return fmt.Errorf("Failed to find config file %s", AgentConfigYamlFilename) -} - -func (a *AgentConfigHandler) initWatcher(configPath string, namespace string, clientset kubernetes.Interface) error { - logger := a.logger.WithField("func", "initWatcher") - if namespace != "" { // Running in k8s - err := a.watchConfigMap(clientset) - if err != nil { - return err + var cfg []string + if len(ac.Rclone.Config) > 0 { + cfg = make([]string, len(ac.Rclone.Config)) + copy(cfg, ac.Rclone.Config) } - } else if configPath != "" { // Watch local file - err := a.watchFile(a.configFilePath) - if err != nil { - return err + rcloneDeepCopy := RcloneConfiguration{ + ConfigSecrets: cs, + Config: cfg, } + rcloneCopy = &rcloneDeepCopy } else { - logger.Warnf("No config available on initialization") + rcloneCopy = nil } - return nil -} -func (a *AgentConfigHandler) Close() error { - if a == nil { - return nil - } - a.logger.Info("Starting graceful shutdown") - if a.fileWatcherDone != nil { - close(a.fileWatcherDone) - } - if a.configMapWatcherDone != nil { - close(a.configMapWatcherDone) - } - if a.watcher != nil { - return a.watcher.Close() + if ac.Kafka != nil { + kafkaDeepCopy := *ac.Kafka + kafkaCopy = &kafkaDeepCopy + } else { + kafkaCopy = nil } - for _, c := range a.listeners { - close(c) + + return AgentConfiguration{ + Rclone: rcloneCopy, + Kafka: kafkaCopy, } - a.logger.Infof("Finished graceful shutdown") - return nil } -func (a *AgentConfigHandler) AddListener(c chan<- AgentConfiguration) *AgentConfiguration { - a.mu.Lock() - defer a.mu.Unlock() - a.listeners = append(a.listeners, c) - return a.config +func (ac *AgentConfiguration) Default() AgentConfiguration { + return DefaultAgentConfiguration.DeepCopy() } -func (a *AgentConfigHandler) GetConfiguration() *AgentConfiguration { - a.mu.RLock() - defer a.mu.RUnlock() - return a.config +func NewAgentConfigHandler(configPath string, namespace string, logger log.FieldLogger, clientset kubernetes.Interface) (*AgentConfigHandler, error) { + return config.NewConfigWatcher( + configPath, + AgentConfigYamlFilename, + namespace, + false, // watch mounted config file rather than using k8s informer on the config map + ConfigMapName, + clientset, + onConfigUpdate, + logger.WithField("source", "AgentConfigHandler"), + ) } -func (a *AgentConfigHandler) updateConfig(configData []byte) error { - logger := a.logger.WithField("func", "updateConfig") - logger.Infof("Updating config %s", configData) - - a.mu.Lock() - defer a.mu.Unlock() - - config := AgentConfiguration{} - err := yaml.Unmarshal(configData, &config) - if err != nil { - return err - } - +func onConfigUpdate(config *AgentConfiguration, logger log.FieldLogger) error { if config.Rclone != nil { logger.Infof("Rclone Config loaded %v", config.Rclone) } - - a.config = &config - return nil -} - -// Watch the config file passed for changes and reload and signal listeners when it does -func (a *AgentConfigHandler) watchFile(filePath string) error { - logger := a.logger.WithField("func", "watchFile") - watcher, err := fsnotify.NewWatcher() - if err != nil { - logger.Error(err, "Failed to create watcher") - return err - } - a.watcher = watcher - a.fileWatcherDone = make(chan struct{}) - - go func() { - for { - select { - case event := <-watcher.Events: - logger.Infof("Processing event %v", event) - isCreate := event.Op&fsnotify.Create != 0 - isWrite := event.Op&fsnotify.Write != 0 - if isCreate || isWrite { - b, err := os.ReadFile(filePath) - if err != nil { - logger.WithError(err).Errorf("Failed to read %s", filePath) - } else { - err := a.updateConfig(b) - if err != nil { - logger.WithError(err).Errorf("Failed to update config %s", filePath) - } else { - a.mu.RLock() - for _, ch := range a.listeners { - ch <- *a.config - } - a.mu.RUnlock() - } - } - } - case err := <-watcher.Errors: - logger.Error(err, "watcher error") - case <-a.fileWatcherDone: - return - } - } - }() - - if err = watcher.Add(filePath); err != nil { - a.logger.Errorf("Failed add filePath %s to watcher", filePath) - return err - } - a.logger.Infof("Start to watch config file %s", filePath) - - return nil -} - -func (a *AgentConfigHandler) watchConfigMap(clientset kubernetes.Interface) error { - logger := a.logger.WithField("func", "watchConfigMap") - - watcher := informer.NewInformedWatcher(clientset, a.namespace) - watcher.Watch(ConfigMapName, func(updated *corev1.ConfigMap) { - if data, ok := updated.Data[AgentConfigYamlFilename]; ok { - err := a.updateConfig([]byte(data)) - if err != nil { - logger.Errorf("Failed to update configmap from data in %s", AgentConfigYamlFilename) - } else { - a.mu.RLock() - for _, ch := range a.listeners { - ch <- *a.config - } - a.mu.RUnlock() - } - } - }) - a.configMapWatcherDone = make(chan struct{}) - err := watcher.Start(a.configMapWatcherDone) - if err != nil { - return err - } return nil } diff --git a/scheduler/pkg/agent/config/config_test.go b/scheduler/pkg/agent/config/config_test.go index 2ee367d7c6..e94d8e58fe 100644 --- a/scheduler/pkg/agent/config/config_test.go +++ b/scheduler/pkg/agent/config/config_test.go @@ -10,7 +10,6 @@ the Change License after the Change Date as each is defined in accordance with t package config import ( - "context" "encoding/json" "os" "path" @@ -18,9 +17,6 @@ import ( . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" ) func TestLoadConfigRcloneSecrets(t *testing.T) { @@ -36,12 +32,12 @@ func TestLoadConfigRcloneSecrets(t *testing.T) { } tests := []test{ { - name: "yaml", + name: "test.json", config: `{"rclone":{"config_secrets":["a","b"]}}`, expected: []string{"a", "b"}, }, { - name: "json", + name: "test.yaml", config: `rclone: config_secrets: - a @@ -49,7 +45,7 @@ func TestLoadConfigRcloneSecrets(t *testing.T) { expected: []string{"a", "b"}, }, { - name: "badJson", + name: "badJson.json", config: `{"rclone":{"config_secrets":["a","b"]}`, expected: []string{"a", "b"}, err: true, @@ -59,12 +55,12 @@ func TestLoadConfigRcloneSecrets(t *testing.T) { t.Run(test.name, func(t *testing.T) { configHandler, err := NewAgentConfigHandler("", "", logger, nil) g.Expect(err).To(BeNil()) - err = configHandler.updateConfig([]byte(test.config)) + err = configHandler.UpdateConfig([]byte(test.config), test.name) if test.err { g.Expect(err).ToNot(BeNil()) } else { g.Expect(err).To(BeNil()) - g.Expect(configHandler.config.Rclone.ConfigSecrets).To(Equal(test.expected)) + g.Expect(configHandler.GetConfiguration().Rclone.ConfigSecrets).To(Equal(test.expected)) } }) } @@ -107,75 +103,13 @@ func TestWatchFile(t *testing.T) { defer func() { _ = configHandler.Close() }() g.Expect(err).To(BeNil()) - configHandler.mu.RLock() - g.Expect(configHandler.config).To(Equal(test.contents1)) - configHandler.mu.RUnlock() + g.Expect(configHandler.GetConfiguration()).To(Equal(*test.contents1)) b, err = json.Marshal(test.contents2) g.Expect(err).To(BeNil()) err = os.WriteFile(configFile, b, 0644) g.Expect(err).To(BeNil()) - g.Eventually(configHandler.GetConfiguration).Should(Equal(test.contents2)) - }) - } -} - -func TestWatchConfigMap(t *testing.T) { - t.Logf("Started") - logger := log.New() - log.SetLevel(log.DebugLevel) - g := NewGomegaWithT(t) - type test struct { - name string - configMapV1 *v1.ConfigMap - expectedSecretsV1 []string - configMapV2 *v1.ConfigMap - expectedSecretsV2 []string - errorOnInit bool - } - namespace := "seldon-mesh" - tests := []test{ - { - name: "simple", - configMapV1: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: ConfigMapName, Namespace: namespace}, - Data: map[string]string{ - AgentConfigYamlFilename: "{\"rclone\":{\"config_secrets\":[\"rclone-gs-public\"]}}", - }, - }, - expectedSecretsV1: []string{"rclone-gs-public"}, - configMapV2: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: ConfigMapName, Namespace: namespace}, - Data: map[string]string{ - AgentConfigYamlFilename: "{\"rclone\":{\"config_secrets\":[\"rclone-gs-public2\"]}}", - }, - }, - expectedSecretsV2: []string{"rclone-gs-public2"}, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fakeClientset := fake.NewSimpleClientset(test.configMapV1) - configHandler, err := NewAgentConfigHandler("", namespace, logger, fakeClientset) - defer func() { _ = configHandler.Close() }() - getSecrets := func() []string { - c := configHandler.GetConfiguration() - if c != nil { - return c.Rclone.ConfigSecrets - } - return []string{} - } - g.Expect(err).To(BeNil()) - if !test.errorOnInit { - g.Expect(configHandler.config).ToNot(BeNil()) - g.Eventually(getSecrets).Should(Equal(test.expectedSecretsV1)) - // update - _, err = fakeClientset.CoreV1().ConfigMaps(namespace).Update(context.Background(), test.configMapV2, metav1.UpdateOptions{}) - g.Expect(err).To(BeNil()) - g.Eventually(getSecrets).Should(Equal(test.expectedSecretsV2)) - } else { - g.Expect(configHandler.config).To(BeNil()) - } + g.Eventually(configHandler.GetConfiguration).Should(Equal(*test.contents2)) }) } } diff --git a/scheduler/pkg/agent/rclone/rclone.go b/scheduler/pkg/agent/rclone/rclone.go index fc7e3710e0..43e32343e7 100644 --- a/scheduler/pkg/agent/rclone/rclone.go +++ b/scheduler/pkg/agent/rclone/rclone.go @@ -135,7 +135,8 @@ func (r *RCloneClient) StartConfigListener() error { go r.listenForConfigUpdates() // Add ourself as listener on channel and handle initial config logger.Info("Loading initial rclone configuration") - err := r.loadRcloneConfiguration(r.configHandler.AddListener(r.configChan)) + r.configHandler.AddListener(r.configChan) + err := r.loadRcloneConfiguration(r.configHandler.GetConfiguration()) if err != nil { logger.WithError(err).Errorf("Failed to load rclone defaults") return err @@ -148,7 +149,7 @@ func (r *RCloneClient) listenForConfigUpdates() { for config := range r.configChan { logger.Info("Received config update") go func() { - err := r.loadRcloneConfiguration(&config) + err := r.loadRcloneConfiguration(config) if err != nil { logger.WithError(err).Error("Failed to load rclone defaults") } diff --git a/scheduler/pkg/agent/rclone/rclone_config.go b/scheduler/pkg/agent/rclone/rclone_config.go index 6ad1072752..7b24f89eb8 100644 --- a/scheduler/pkg/agent/rclone/rclone_config.go +++ b/scheduler/pkg/agent/rclone/rclone_config.go @@ -17,14 +17,9 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) -func (r *RCloneClient) loadRcloneConfiguration(config *config.AgentConfiguration) error { +func (r *RCloneClient) loadRcloneConfiguration(config config.AgentConfiguration) error { logger := r.logger.WithField("func", "loadRcloneConfiguration") - if config == nil { - logger.Warn("nil config passed") - return nil - } - // Load any secrets that have Rclone config addedFromSecrets, err := r.loadRcloneSecretsConfiguration(config) if err != nil { @@ -55,11 +50,11 @@ func (r *RCloneClient) loadRcloneConfiguration(config *config.AgentConfiguration return nil } -func (r *RCloneClient) loadRcloneRawConfiguration(config *config.AgentConfiguration) ([]string, error) { +func (r *RCloneClient) loadRcloneRawConfiguration(config config.AgentConfiguration) ([]string, error) { logger := r.logger.WithField("func", "loadRcloneRawConfiguration") var rcloneNamesAdded []string - if len(config.Rclone.Config) > 0 { + if config.Rclone != nil && len(config.Rclone.Config) > 0 { logger.Infof("found %d Rclone configs", len(config.Rclone.Config)) for _, config := range config.Rclone.Config { @@ -77,7 +72,7 @@ func (r *RCloneClient) loadRcloneRawConfiguration(config *config.AgentConfigurat return rcloneNamesAdded, nil } -func (r *RCloneClient) deleteUnusedRcloneConfiguration(config *config.AgentConfiguration, rcloneNamesAdded []string) error { +func (r *RCloneClient) deleteUnusedRcloneConfiguration(config config.AgentConfiguration, rcloneNamesAdded []string) error { logger := r.logger.WithField("func", "deleteUnusedRcloneConfiguration") existingRemotes, err := r.ListRemotes() @@ -107,12 +102,12 @@ func (r *RCloneClient) deleteUnusedRcloneConfiguration(config *config.AgentConfi return nil } -func (r *RCloneClient) loadRcloneSecretsConfiguration(config *config.AgentConfiguration) ([]string, error) { +func (r *RCloneClient) loadRcloneSecretsConfiguration(config config.AgentConfiguration) ([]string, error) { logger := r.logger.WithField("func", "loadRcloneSecretsConfiguration") var rcloneNamesAdded []string // Load any secrets that have Rclone config - if len(config.Rclone.ConfigSecrets) > 0 { + if config.Rclone != nil && len(config.Rclone.ConfigSecrets) > 0 { secretClientSet, err := k8s.CreateClientset() if err != nil { return nil, err diff --git a/scheduler/pkg/agent/rclone/rclone_config_test.go b/scheduler/pkg/agent/rclone/rclone_config_test.go index b2d1a14981..881a68132c 100644 --- a/scheduler/pkg/agent/rclone/rclone_config_test.go +++ b/scheduler/pkg/agent/rclone/rclone_config_test.go @@ -147,7 +147,7 @@ func TestLoadRcloneConfig(t *testing.T) { httpmock.NewStringResponder(200, "{}")) g.Expect(err).To(gomega.BeNil()) - err = rcloneClient.loadRcloneConfiguration(test.agentConfiguration) + err = rcloneClient.loadRcloneConfiguration(*test.agentConfiguration) if test.error { g.Expect(err).ToNot(gomega.BeNil()) } else { diff --git a/scheduler/pkg/agent/rclone/rclone_test.go b/scheduler/pkg/agent/rclone/rclone_test.go index 2ba9b8eb92..56035a7081 100644 --- a/scheduler/pkg/agent/rclone/rclone_test.go +++ b/scheduler/pkg/agent/rclone/rclone_test.go @@ -116,7 +116,7 @@ func TestRcloneCopy(t *testing.T) { body: "{}", createLocalFolder: true, expectError: true, - expectedCallCount: 2, // for config resync + expectedCallCount: 3, // for config resync }, { name: "noFiles", From 20fca02f086833f3cd94e434615c3d47b202f658 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 19:54:30 +0100 Subject: [PATCH 3/9] feat(scheduler): introduce scaling configuration and watcher --- scheduler/cmd/scheduler/main.go | 40 ++-- scheduler/pkg/kafka/dataflow/server.go | 70 ++++++- scheduler/pkg/kafka/dataflow/server_test.go | 2 +- scheduler/pkg/scaling/config/config.go | 192 ++++++++++++++++++++ scheduler/pkg/server/control_plane_test.go | 2 +- scheduler/pkg/server/server.go | 78 ++++++++ scheduler/pkg/server/server_status_test.go | 2 +- scheduler/pkg/server/server_test.go | 6 +- scheduler/pkg/util/loadbalancer.go | 17 ++ 9 files changed, 388 insertions(+), 21 deletions(-) create mode 100644 scheduler/pkg/scaling/config/config.go diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index 85d4e9ba44..9676b8c7a5 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -16,7 +16,6 @@ import ( "math/rand" "os" "os/signal" - "strconv" "syscall" "time" @@ -35,6 +34,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache" health_probe "github.com/seldonio/seldon-core/scheduler/v2/pkg/health-probe" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/dataflow" + scaling_config "github.com/seldonio/seldon-core/scheduler/v2/pkg/scaling/config" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner" schedulerServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/server" @@ -66,6 +66,7 @@ var ( autoscalingModelEnabled bool autoscalingServerEnabled bool kafkaConfigPath string + scalingConfigPath string schedulerReadyTimeoutSeconds uint deletedResourceTTLSeconds uint serverPackingEnabled bool @@ -127,6 +128,13 @@ func init() { flag.BoolVar(&allowPlaintxt, "allow-plaintxt", true, "Allow plain text scheduler server") // Autoscaling + // Scaling config path + flag.StringVar( + &scalingConfigPath, + "scaling-config-path", + "/mnt/config/scaling.json", + "Path to scaling configuration file", + ) flag.BoolVar(&autoscalingModelEnabled, "enable-model-autoscaling", false, "Enable native model autoscaling feature") flag.BoolVar(&autoscalingServerEnabled, "enable-server-autoscaling", true, "Enable native server autoscaling feature") @@ -278,20 +286,25 @@ func main() { if err != nil { logger.WithError(err).Fatal("Failed to load Kafka config") } - - numPartitions, err := strconv.Atoi(kafkaConfigMap.Topics["numPartitions"].(string)) + scalingConfigHdl, err := scaling_config.NewScalingConfigHandler(scalingConfigPath, namespace, logger) if err != nil { - logger.WithError(err).Fatal("Failed to parse numPartitions from Kafka config. Defaulting to 1") - numPartitions = 1 + logger.WithError(err).Fatalf("Failed to load Scaling config from %s", scalingConfigPath) } + defer func() { + _ = scalingConfigHdl.Close() + logger.Info("Closed scheduler scaling config watcher") + }() + + maxShardCountMultiplier := scalingConfigHdl.GetConfiguration().Pipelines.MaxShardCountMultiplier - dataFlowLoadBalancer := util.NewRingLoadBalancer(numPartitions) - log.Info("Using ring load balancer for data flow with numPartitions: ", numPartitions) + dataFlowLoadBalancer := util.NewRingLoadBalancer(maxShardCountMultiplier) + log.Info("Using ring load balancer for data flow with numPartitions: ", maxShardCountMultiplier) - cs, err := dataflow.NewChainerServer(logger, eventHub, ps, namespace, dataFlowLoadBalancer, kafkaConfigMap) + cs, err := dataflow.NewChainerServer(logger, eventHub, ps, namespace, dataFlowLoadBalancer, kafkaConfigMap, scalingConfigHdl) if err != nil { logger.WithError(err).Fatal("Failed to start data engine chainer server") } + defer cs.Stop() go func() { err := cs.StartGrpcServer(chainerPort) if err != nil { @@ -334,9 +347,9 @@ func main() { eventHub, ) - // scheduler <-> controller grpc - modelGwLoadBalancer := util.NewRingLoadBalancer(numPartitions) - pipelineGWLoadBalancer := util.NewRingLoadBalancer(numPartitions) + // scheduler <-> controller and {pipeline,model-gw} grpc + modelGwLoadBalancer := util.NewRingLoadBalancer(maxShardCountMultiplier) + pipelineGWLoadBalancer := util.NewRingLoadBalancer(maxShardCountMultiplier) s := schedulerServer.NewSchedulerServer( logger, ss, es, ps, sched, eventHub, sync, schedulerServer.SchedulerServerConfig{ @@ -347,8 +360,10 @@ func main() { kafkaConfigMap.ConsumerGroupIdPrefix, modelGwLoadBalancer, pipelineGWLoadBalancer, + scalingConfigHdl, *tlsOptions, ) + defer s.Stop() err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort) if err != nil { @@ -390,10 +405,9 @@ func main() { s.StopSendExperimentEvents() s.StopSendPipelineEvents() s.StopSendControlPlaneEvents() - cs.StopSendPipelineEvents() as.StopAgentStreams() - log.Info("Shutdown services") + log.Info("All services have shut down cleanly") } type probe struct { diff --git a/scheduler/pkg/kafka/dataflow/server.go b/scheduler/pkg/kafka/dataflow/server.go index 9944f55589..7884831141 100644 --- a/scheduler/pkg/kafka/dataflow/server.go +++ b/scheduler/pkg/kafka/dataflow/server.go @@ -28,6 +28,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka" cr "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/conflict-resolution" + scaling_config "github.com/seldonio/seldon-core/scheduler/v2/pkg/scaling/config" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -48,6 +49,11 @@ type ChainerServer struct { loadBalancer util.LoadBalancer conflictResolutioner *cr.ConflictResolutioner[pipeline.PipelineStatus] chainerMutex sync.Map + configUpdatesMutex sync.Mutex + scalingConfigUpdates chan scaling_config.ScalingConfig + currentScalingConfig scaling_config.ScalingConfig + done chan struct{} + grpcServer *grpc.Server chainer.UnimplementedChainerServer health.UnimplementedHealthCheckServiceServer } @@ -58,8 +64,15 @@ type ChainerSubscription struct { fin chan bool } -func NewChainerServer(logger log.FieldLogger, eventHub *coordinator.EventHub, pipelineHandler pipeline.PipelineHandler, - namespace string, loadBalancer util.LoadBalancer, kafkaConfig *kafka_config.KafkaConfig) (*ChainerServer, error) { +func NewChainerServer( + logger log.FieldLogger, + eventHub *coordinator.EventHub, + pipelineHandler pipeline.PipelineHandler, + namespace string, + loadBalancer util.LoadBalancer, + kafkaConfig *kafka_config.KafkaConfig, + scalingConfigHdl *scaling_config.ScalingConfigHandler, +) (*ChainerServer, error) { conflictResolutioner := cr.NewConflictResolution[pipeline.PipelineStatus](logger) topicNamer, err := kafka.NewTopicNamer(namespace, kafkaConfig.TopicPrefix) if err != nil { @@ -74,6 +87,8 @@ func NewChainerServer(logger log.FieldLogger, eventHub *coordinator.EventHub, pi loadBalancer: loadBalancer, conflictResolutioner: conflictResolutioner, chainerMutex: sync.Map{}, + scalingConfigUpdates: make(chan scaling_config.ScalingConfig), + done: make(chan struct{}), } eventHub.RegisterPipelineEventHandler( @@ -82,9 +97,32 @@ func NewChainerServer(logger log.FieldLogger, eventHub *coordinator.EventHub, pi c.logger, c.handlePipelineEvent, ) + + if scalingConfigHdl != nil { + c.currentScalingConfig = scalingConfigHdl.GetConfiguration() + scalingConfigHdl.AddListener(c.scalingConfigUpdates) + go c.handleScalingConfigChanges() + } else { + c.currentScalingConfig = scaling_config.DefaultScalingConfig + } + + c.configUpdatesMutex.Lock() + scaling_config.LogWhenUsingDefaultScalingConfig(&c.currentScalingConfig, c.logger) + c.configUpdatesMutex.Unlock() + return c, nil } +func (c *ChainerServer) Stop() { + if c.grpcServer != nil { + c.grpcServer.GracefulStop() + c.logger.Info("Scheduler closing gRPC server managing connections from dataflow-engine replicas") + } + c.logger.Info("Stop watching for scaling config changes") + close(c.done) + c.StopSendPipelineEvents() +} + func (c *ChainerServer) StartGrpcServer(agentPort uint) error { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", agentPort)) if err != nil { @@ -99,6 +137,7 @@ func (c *ChainerServer) StartGrpcServer(agentPort uint) error { grpcServer := grpc.NewServer(grpcOptions...) chainer.RegisterChainerServer(grpcServer, c) health.RegisterHealthCheckServiceServer(grpcServer, c) + c.grpcServer = grpcServer c.logger.Printf("Chainer server running on %d", agentPort) return grpcServer.Serve(lis) @@ -489,6 +528,33 @@ func (c *ChainerServer) rebalance() { } } +func (c *ChainerServer) handleScalingConfigChanges() { + logger := c.logger.WithField("func", "handleScalingConfigChanges") + for { + select { + case newConfig := <-c.scalingConfigUpdates: + if newConfig.Pipelines == nil { + continue + } + c.configUpdatesMutex.Lock() + if newConfig.Pipelines.MaxShardCountMultiplier != c.currentScalingConfig.Pipelines.MaxShardCountMultiplier { + logger.Info("Updating mapping of Pipelines onto dataflow-engine replicas following scaling config change") + // lock Mutex to avoid updating load balancer if a concurrent rebalance is in progress + c.mu.Lock() + c.currentScalingConfig = newConfig + scaling_config.LogWhenUsingDefaultScalingConfig(&c.currentScalingConfig, logger) + c.loadBalancer.UpdatePartitions(newConfig.Pipelines.MaxShardCountMultiplier) + c.mu.Unlock() + // rebalance all pipelines onto available dataflow-engine replicas according to new config + c.rebalance() + } + c.configUpdatesMutex.Unlock() + case <-c.done: + return + } + } +} + func (c *ChainerServer) handlePipelineEvent(event coordinator.PipelineEventMsg) { logger := c.logger.WithField("func", "handlePipelineEvent") if event.ExperimentUpdate { diff --git a/scheduler/pkg/kafka/dataflow/server_test.go b/scheduler/pkg/kafka/dataflow/server_test.go index 4aae731f0f..58e9d72076 100644 --- a/scheduler/pkg/kafka/dataflow/server_test.go +++ b/scheduler/pkg/kafka/dataflow/server_test.go @@ -870,7 +870,7 @@ func createTestScheduler(t *testing.T, serverName string) (*ChainerServer, *coor kc, _ := kafka_config.NewKafkaConfig(configFilePath, "debug") b := util.NewRingLoadBalancer(1) b.AddServer(serverName) - s, _ := NewChainerServer(logger, eventHub, pipelineServer, "test-ns", b, kc) + s, _ := NewChainerServer(logger, eventHub, pipelineServer, "test-ns", b, kc, nil) return s, eventHub } diff --git a/scheduler/pkg/scaling/config/config.go b/scheduler/pkg/scaling/config/config.go new file mode 100644 index 0000000000..87638c2c5c --- /dev/null +++ b/scheduler/pkg/scaling/config/config.go @@ -0,0 +1,192 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package config + +import ( + log "github.com/sirupsen/logrus" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/config" +) + +const ( + ScalingConfigYamlFilename = "scaling.yaml" + ConfigMapName = "seldon-scaling" +) + +var ( + DefaultPipelineScalingConfig = PipelineScalingConfig{ + MaxShardCountMultiplier: 1, + isDefault: true, + } + DefaultModelScalingConfig = ModelScalingConfig{ + Enable: false, + isDefault: true, + } + DefaultServerScalingConfig = ServerScalingConfig{ + Enable: true, + ScaleDownPackingEnabled: false, + ScaleDownPackingPercentage: 0, + isDefault: true, + } + DefaultScalingConfig = ScalingConfig{ + Models: &DefaultModelScalingConfig, + Servers: &DefaultServerScalingConfig, + Pipelines: &DefaultPipelineScalingConfig, + isDefault: true, + } +) + +type ScalingConfig struct { + Models *ModelScalingConfig `json:"models,omitempty" yaml:"models,omitempty"` + Servers *ServerScalingConfig `json:"servers,omitempty" yaml:"servers,omitempty"` + // Scaling config impacting pipeline-gateway, dataflow-engine and model-gateway + Pipelines *PipelineScalingConfig `json:"pipelines,omitempty" yaml:"pipelines,omitempty"` + isDefault bool +} + +type ModelScalingConfig struct { + Enable bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + isDefault bool +} + +type ServerScalingConfig struct { + Enable bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + ScaleDownPackingEnabled bool `json:"scaleDownPackingEnabled,omitempty" yaml:"scaleDownPackingEnabled,omitempty"` + ScaleDownPackingPercentage int `json:"scaleDownPackingPercentage,omitempty" yaml:"scaleDownPackingPercentage,omitempty"` + isDefault bool +} + +type PipelineScalingConfig struct { + // MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + // replicas of pipeline components. + // + // - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + // `maxShardCountMultiplier * number of pipelines` + // - For model-gateway, the max number of replicas is + // `maxShardCountMultiplier * number of consumers` + // + // It doesn't make sense to set this to a value larger than the number of partitions for kafka + // topics used in the Core 2 install. + MaxShardCountMultiplier int `json:"maxShardCountMultiplier,omitempty" yaml:"maxShardCountMultiplier,omitempty"` + isDefault bool +} + +func (sc *ScalingConfig) DeepCopy() ScalingConfig { + var modelsCopy *ModelScalingConfig + var serversCopy *ServerScalingConfig + var pipelinesCopy *PipelineScalingConfig + + var modelsDeepCopy ModelScalingConfig + if sc.Models != nil { + modelsDeepCopy = *sc.Models + } else { + modelsDeepCopy = ModelScalingConfig(DefaultModelScalingConfig) + } + modelsCopy = &modelsDeepCopy + + var serversDeepCopy ServerScalingConfig + if sc.Servers != nil { + serversDeepCopy = *sc.Servers + } else { + serversDeepCopy = ServerScalingConfig(DefaultServerScalingConfig) + } + serversCopy = &serversDeepCopy + + var pipelinesDeepCopy PipelineScalingConfig + if sc.Pipelines != nil { + pipelinesDeepCopy = *sc.Pipelines + } else { + pipelinesDeepCopy = PipelineScalingConfig(DefaultPipelineScalingConfig) + } + pipelinesCopy = &pipelinesDeepCopy + + res := ScalingConfig{ + Models: modelsCopy, + Servers: serversCopy, + Pipelines: pipelinesCopy, + isDefault: sc.isDefault, + } + return res +} + +func (sc *ScalingConfig) Default() ScalingConfig { + return DefaultScalingConfig.DeepCopy() +} + +func (sc *ScalingConfig) IsDefault() bool { + return sc.isDefault || + (sc.Models.IsDefault() && sc.Servers.IsDefault() && sc.Pipelines.IsDefault()) +} + +func (msc *ModelScalingConfig) IsDefault() bool { + return msc.isDefault +} + +func (ssc *ServerScalingConfig) IsDefault() bool { + return ssc.isDefault +} + +func (psc *PipelineScalingConfig) IsDefault() bool { + return psc.isDefault +} + +func LogWhenUsingDefaultScalingConfig(scalingConfig *ScalingConfig, logger log.FieldLogger) { + if scalingConfig.IsDefault() { + logger.Infof("Using default scaling config") + } else { + if scalingConfig.Models != nil && scalingConfig.Models.IsDefault() { + logger.Infof("Using default model scaling config") + } + if scalingConfig.Servers != nil && scalingConfig.Servers.IsDefault() { + logger.Infof("Using default server scaling config") + } + if scalingConfig.Pipelines != nil && scalingConfig.Pipelines.IsDefault() { + logger.Infof("Using default pipeline scaling config") + } + } +} + +type ScalingConfigHandler = config.ConfigWatcher[ScalingConfig, *ScalingConfig] + +func NewScalingConfigHandler(configPath string, namespace string, logger log.FieldLogger) (*ScalingConfigHandler, error) { + return config.NewConfigWatcher( + configPath, + ScalingConfigYamlFilename, + namespace, + false, // watch mounted config file rather than using k8s informer on the config map + ConfigMapName, + nil, + onConfigUpdate, + logger.WithField("source", "ScalingConfigHandler"), + ) +} + +func onConfigUpdate(config *ScalingConfig, logger log.FieldLogger) error { + // Any missing top-level config sections (Models, Server, Pipelines) are set to their defaults. + // However, setting an empty section is treated differently, with all the fields being + // considered explicitly set to the zero value of their datatype. + // + // We also ensure minimal validation of values, so that (for example) when the zero value of + // the type is not valid, we set it to the default value. + if config.Pipelines == nil { + config.Pipelines = &DefaultPipelineScalingConfig + } else { + if config.Pipelines.MaxShardCountMultiplier <= 0 { + config.Pipelines.MaxShardCountMultiplier = DefaultPipelineScalingConfig.MaxShardCountMultiplier + } + } + if config.Models == nil { + config.Models = &DefaultModelScalingConfig + } + if config.Servers == nil { + config.Servers = &DefaultServerScalingConfig + } + return nil +} diff --git a/scheduler/pkg/server/control_plane_test.go b/scheduler/pkg/server/control_plane_test.go index de6d6141be..b51132a4b3 100644 --- a/scheduler/pkg/server/control_plane_test.go +++ b/scheduler/pkg/server/control_plane_test.go @@ -135,7 +135,7 @@ func TestSubscribeControlPlane(t *testing.T) { modelGwLoadBalancer := util.NewRingLoadBalancer(1) pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( - logger, nil, nil, nil, nil, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}, + logger, nil, nil, nil, nil, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}, ) sync.Signals(1) diff --git a/scheduler/pkg/server/server.go b/scheduler/pkg/server/server.go index 13ade6d43e..08e9577d86 100644 --- a/scheduler/pkg/server/server.go +++ b/scheduler/pkg/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" cr "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/conflict-resolution" + scaling_config "github.com/seldonio/seldon-core/scheduler/v2/pkg/scaling/config" scheduler2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment" @@ -77,6 +78,11 @@ type SchedulerServer struct { config SchedulerServerConfig modelGwLoadBalancer *util.RingLoadBalancer pipelineGWLoadBalancer *util.RingLoadBalancer + scalingConfigUpdates chan scaling_config.ScalingConfig + currentScalingConfig *scaling_config.ScalingConfig + mu sync.Mutex + done chan struct{} + grpcServer *grpc.Server consumerGroupConfig *ConsumerGroupConfig eventHub *coordinator.EventHub tlsOptions seldontls.TLSOptions @@ -194,6 +200,7 @@ func (s *SchedulerServer) startServer(port uint, secure bool) error { opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler())) opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep)) grpcServer := grpc.NewServer(opts...) + s.grpcServer = grpcServer pb.RegisterSchedulerServer(grpcServer, s) health.RegisterHealthCheckServiceServer(grpcServer, s) @@ -257,6 +264,7 @@ func NewSchedulerServer( consumerGroupIdPrefix string, modelGwLoadBalancer *util.RingLoadBalancer, pipelineGWLoadBalancer *util.RingLoadBalancer, + scalingConfigHdl *scaling_config.ScalingConfigHandler, tlsOptions seldontls.TLSOptions, ) *SchedulerServer { loggerWithField := logger.WithField("source", "SchedulerServer") @@ -301,6 +309,8 @@ func NewSchedulerServer( config: config, modelGwLoadBalancer: modelGwLoadBalancer, pipelineGWLoadBalancer: pipelineGWLoadBalancer, + scalingConfigUpdates: make(chan scaling_config.ScalingConfig), + done: make(chan struct{}), consumerGroupConfig: consumerGroupConfig, eventHub: eventHub, tlsOptions: tlsOptions, @@ -337,9 +347,77 @@ func NewSchedulerServer( s.handleServerEvents, ) + if scalingConfigHdl != nil { + initScalingConfig := scalingConfigHdl.GetConfiguration() + s.currentScalingConfig = &initScalingConfig + scalingConfigHdl.AddListener(s.scalingConfigUpdates) + go s.handleScalingConfigChanges() + } else { + s.currentScalingConfig = &scaling_config.DefaultScalingConfig + } + + s.mu.Lock() + scaling_config.LogWhenUsingDefaultScalingConfig(s.currentScalingConfig, loggerWithField) + s.mu.Unlock() + return s } +func (s *SchedulerServer) Stop() { + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + s.logger.Info("Scheduler closing gRPC server managing connections from controller and gateways") + } + close(s.done) +} + +func (s *SchedulerServer) handleScalingConfigChanges() { + logger := s.logger.WithField("func", "handleScalingConfigChanges") + for { + select { + case newScalingConfig := <-s.scalingConfigUpdates: + if newScalingConfig.Pipelines == nil { + continue + } + s.mu.Lock() + if newScalingConfig.Pipelines.MaxShardCountMultiplier != s.currentScalingConfig.Pipelines.MaxShardCountMultiplier { + logger.Info("Updating mapping of Pipelines and Models onto pipeline-gateway and model-gateway replicas following scaling config change") + wg := sync.WaitGroup{} + wg.Add(2) + s.currentScalingConfig = &newScalingConfig + scaling_config.LogWhenUsingDefaultScalingConfig(s.currentScalingConfig, logger) + go func() { + // lock Mutex to avoid updating load balancer if a concurrent rebalance is in progress + s.pipelineEventStream.mu.Lock() + s.pipelineGWLoadBalancer.UpdatePartitions(newScalingConfig.Pipelines.MaxShardCountMultiplier) + s.pipelineEventStream.mu.Unlock() + + // There is a chance that another concurrent rebalance will start here (applying the + // updated partitions), but it means we'll just do one extra rebalance that will + // distribute the pipelines in the exact same way (given no other infra changes) + // Given that config changes should be infrequent, this should be ok. + + // rebalance all pipelines onto available pipeline-gw replicas according to new config + s.pipelineGwRebalance() + wg.Done() + }() + + go func() { + s.modelEventStream.mu.Lock() + s.modelGwLoadBalancer.UpdatePartitions(newScalingConfig.Pipelines.MaxShardCountMultiplier) + s.modelEventStream.mu.Unlock() + s.modelGwRebalance() + wg.Done() + }() + wg.Wait() + } + s.mu.Unlock() + case <-s.done: + return + } + } +} + func (s *SchedulerServer) ServerNotify(ctx context.Context, req *pb.ServerNotifyRequest) (*pb.ServerNotifyResponse, error) { logger := s.logger.WithField("func", "ServerNotify") // numExpectedReplicas is only used when we are doing the first sync diff --git a/scheduler/pkg/server/server_status_test.go b/scheduler/pkg/server/server_status_test.go index 75915bd37f..19f0d240c2 100644 --- a/scheduler/pkg/server/server_status_test.go +++ b/scheduler/pkg/server/server_status_test.go @@ -1328,7 +1328,7 @@ func createTestSchedulerImpl(t *testing.T, config SchedulerServerConfig) (*Sched s := NewSchedulerServer( logger, schedulerStore, experimentServer, pipelineServer, scheduler, eventHub, synchroniser.NewSimpleSynchroniser(time.Duration(10*time.Millisecond)), config, - "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}, + "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}, ) return s, eventHub diff --git a/scheduler/pkg/server/server_test.go b/scheduler/pkg/server/server_test.go index a90fac7c69..97506d011c 100644 --- a/scheduler/pkg/server/server_test.go +++ b/scheduler/pkg/server/server_test.go @@ -73,7 +73,7 @@ func TestLoadModel(t *testing.T) { pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( logger, schedulerStore, experimentServer, pipelineServer, - scheduler, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}) + scheduler, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}) sync.Signals(1) mockAgent := &mockAgentHandler{} @@ -379,7 +379,7 @@ func TestUnloadModel(t *testing.T) { pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( logger, schedulerStore, experimentServer, pipelineServer, scheduler, eventHub, - sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}) + sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}) sync.Signals(1) return s, mockAgent, eventHub } @@ -719,7 +719,7 @@ func TestServerNotify(t *testing.T) { pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( logger, schedulerStore, nil, nil, scheduler, eventHub, - sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}) + sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}) return s, sync } diff --git a/scheduler/pkg/util/loadbalancer.go b/scheduler/pkg/util/loadbalancer.go index c5bd07095f..6a7a39bcef 100644 --- a/scheduler/pkg/util/loadbalancer.go +++ b/scheduler/pkg/util/loadbalancer.go @@ -10,17 +10,21 @@ the Change License after the Change Date as each is defined in accordance with t package util import ( + "sync" + "github.com/serialx/hashring" ) type LoadBalancer interface { AddServer(serverName string) RemoveServer(serverName string) + UpdatePartitions(numPartitions int) GetServersForKey(key string) []string } type RingLoadBalancer struct { ring *hashring.HashRing + mu sync.RWMutex nodes map[string]bool replicationFactor int numPartitions int @@ -34,19 +38,32 @@ func NewRingLoadBalancer(numPartitions int) *RingLoadBalancer { } } +func (lb *RingLoadBalancer) UpdatePartitions(numPartitions int) { + lb.mu.Lock() + defer lb.mu.Unlock() + lb.numPartitions = numPartitions + lb.replicationFactor = min(len(lb.nodes), lb.numPartitions) +} + func (lb *RingLoadBalancer) AddServer(serverName string) { + lb.mu.Lock() + defer lb.mu.Unlock() lb.ring = lb.ring.AddNode(serverName) lb.nodes[serverName] = true lb.replicationFactor = min(len(lb.nodes), lb.numPartitions) } func (lb *RingLoadBalancer) RemoveServer(serverName string) { + lb.mu.Lock() + defer lb.mu.Unlock() lb.ring = lb.ring.RemoveNode(serverName) delete(lb.nodes, serverName) lb.replicationFactor = min(len(lb.nodes), lb.numPartitions) } func (lb *RingLoadBalancer) GetServersForKey(key string) []string { + lb.mu.RLock() + defer lb.mu.RUnlock() nodes, _ := lb.ring.GetNodes(key, lb.replicationFactor) return nodes } From 45feeb0bd8e0701d75d0d0c0ec8ae51e97c2049c Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 21:58:01 +0100 Subject: [PATCH 4/9] feat(operator): add scaling ConfigMap from SeldonConfig settings The operator reads the SeldonConfig and updates a ConfigMap called `seldon-scaling`, which is then mounted by components and watched for changes. The scaling settings from SeldonConfig are also used to validate the replica count of components in SeldonRuntime. --- .../apis/mlops/v1alpha1/seldonconfig_types.go | 46 +++++++++++++++++++ operator/config/seldonconfigs/default.yaml | 6 +++ .../seldon/configmap_reconciler.go | 28 +++++++++-- .../reconcilers/seldon/runtime_reconciler.go | 17 ++++--- 4 files changed, 85 insertions(+), 12 deletions(-) diff --git a/operator/apis/mlops/v1alpha1/seldonconfig_types.go b/operator/apis/mlops/v1alpha1/seldonconfig_types.go index c78b46f470..198eed5166 100644 --- a/operator/apis/mlops/v1alpha1/seldonconfig_types.go +++ b/operator/apis/mlops/v1alpha1/seldonconfig_types.go @@ -41,6 +41,8 @@ type SeldonConfigSpec struct { } type SeldonConfiguration struct { + // Control scaling parameters for various components + ScalingConfig ScalingConfig `json:"scalingConfig,omitempty"` TracingConfig TracingConfig `json:"tracingConfig,omitempty"` KafkaConfig KafkaConfig `json:"kafkaConfig,omitempty"` AgentConfig AgentConfiguration `json:"agentConfig,omitempty"` @@ -79,6 +81,37 @@ type TracingConfig struct { Ratio string `json:"ratio,omitempty"` } +type ScalingConfig struct { + Models *ModelScalingConfig `json:"models,omitempty"` + Servers *ServerScalingConfig `json:"servers,omitempty"` + // Scaling config impacting pipeline-gateway, dataflow-engine and model-gateway + Pipelines *PipelineScalingConfig `json:"pipelines,omitempty"` +} + +type ModelScalingConfig struct { + Enable bool `json:"enabled,omitempty"` +} + +type ServerScalingConfig struct { + Enable bool `json:"enabled,omitempty"` + ScaleDownPackingEnabled bool `json:"scaleDownPackingEnabled,omitempty"` + ScaleDownPackingPercentage int32 `json:"scaleDownPackingPercentage,omitempty"` +} + +type PipelineScalingConfig struct { + // MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + // replicas of pipeline components. + // + // - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + // `maxShardCountMultiplier * number of pipelines` + // - For model-gateway, the max number of replicas is + // `maxShardCountMultiplier * number of consumers` + // + // It doesn't make sense to set this to a value larger than the number of partitions for kafka + // topics used in the Core 2 install. + MaxShardCountMultiplier int32 `json:"maxShardCountMultiplier,omitempty"` +} + type ComponentDefn struct { // +kubebuilder:validation:Required @@ -135,6 +168,7 @@ func (s *SeldonConfiguration) AddDefaults(defaults SeldonConfiguration) { s.KafkaConfig.addDefaults(defaults.KafkaConfig) s.AgentConfig.addDefaults(defaults.AgentConfig) s.ServiceConfig.addDefaults(defaults.ServiceConfig) + s.ScalingConfig.addDefaults(defaults.ScalingConfig) } func (k *KafkaConfig) addDefaults(defaults KafkaConfig) { @@ -184,6 +218,18 @@ func (k *KafkaConfig) addDefaults(defaults KafkaConfig) { } } +func (sc *ScalingConfig) addDefaults(defaults ScalingConfig) { + if sc.Models == nil && defaults.Models != nil { + sc.Models = defaults.Models + } + if sc.Servers == nil && defaults.Servers != nil { + sc.Servers = defaults.Servers + } + if sc.Pipelines == nil && defaults.Pipelines != nil { + sc.Pipelines = defaults.Pipelines + } +} + func (a *AgentConfiguration) addDefaults(defaults AgentConfiguration) { a.Rclone.addDefaults(defaults.Rclone) } diff --git a/operator/config/seldonconfigs/default.yaml b/operator/config/seldonconfigs/default.yaml index aa7c9ec49b..36a60ae531 100644 --- a/operator/config/seldonconfigs/default.yaml +++ b/operator/config/seldonconfigs/default.yaml @@ -332,6 +332,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -413,6 +414,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -420,6 +423,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume diff --git a/operator/controllers/reconcilers/seldon/configmap_reconciler.go b/operator/controllers/reconcilers/seldon/configmap_reconciler.go index 0c79a09c94..d0ce62ea62 100644 --- a/operator/controllers/reconcilers/seldon/configmap_reconciler.go +++ b/operator/controllers/reconcilers/seldon/configmap_reconciler.go @@ -30,9 +30,10 @@ import ( ) const ( - agentConfigMapName = "seldon-agent" - kafkaConfigMapName = "seldon-kafka" - traceConfigMapName = "seldon-tracing" + agentConfigMapName = "seldon-agent" + scalingConfigMapName = "seldon-scaling" + kafkaConfigMapName = "seldon-kafka" + traceConfigMapName = "seldon-tracing" ) type ConfigMapReconciler struct { @@ -76,10 +77,15 @@ func toConfigMaps(config *mlopsv1alpha1.SeldonConfiguration, meta metav1.ObjectM if err != nil { return nil, err } + scalingConfigMap, err := getScalingConfigMap(config.ScalingConfig, meta.Namespace) + if err != nil { + return nil, err + } return []*v1.ConfigMap{ agentConfigMap, kafkaConfigMap, tracingConfigMap, + scalingConfigMap, }, nil } @@ -99,6 +105,22 @@ func getAgentConfigMap(agentConfig mlopsv1alpha1.AgentConfiguration, namespace s }, nil } +func getScalingConfigMap(scalingConfig mlopsv1alpha1.ScalingConfig, namespace string) (*v1.ConfigMap, error) { + scalingJson, err := yaml.Marshal(scalingConfig) + if err != nil { + return nil, err + } + return &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: scalingConfigMapName, + Namespace: namespace, + }, + Data: map[string]string{ + "scaling.yaml": string(scalingJson), + }, + }, nil +} + func getKafkaConfigMap(kafkaConfig mlopsv1alpha1.KafkaConfig, namespace string) (*v1.ConfigMap, error) { kafkaJson, err := json.Marshal(kafkaConfig) if err != nil { diff --git a/operator/controllers/reconcilers/seldon/runtime_reconciler.go b/operator/controllers/reconcilers/seldon/runtime_reconciler.go index 9d71172f4f..cc145cb145 100644 --- a/operator/controllers/reconcilers/seldon/runtime_reconciler.go +++ b/operator/controllers/reconcilers/seldon/runtime_reconciler.go @@ -24,7 +24,7 @@ import ( ) const ( - DEFAULT_NUM_PARTITIONS = 1 + DEFAULT_MAX_SHARD_COUNT_MULTIPLIER = 1 DEFAULT_MODELGATEWAY_MAX_NUM_CONSUMERS = 100 DEFAULT_PIPELINEGATEWAY_MAX_NUM_CONSUMERS = 100 @@ -81,12 +81,10 @@ func validateScaleSpec( ) error { ctx, clt, recorder := commonConfig.Ctx, commonConfig.Client, commonConfig.Recorder - numPartitions, err := ParseInt32( - runtime.Spec.Config.KafkaConfig.Topics["numPartitions"].StrVal, - DEFAULT_NUM_PARTITIONS, - ) - if err != nil { - return fmt.Errorf("failed to parse numPartitions from KafkaConfig: %w", err) + var maxShardCountMultiplier int32 = DEFAULT_MAX_SHARD_COUNT_MULTIPLIER + pipelineScaleConfig := runtime.Spec.Config.ScalingConfig.Pipelines + if pipelineScaleConfig != nil { + maxShardCountMultiplier = runtime.Spec.Config.ScalingConfig.Pipelines.MaxShardCountMultiplier } var resourceCount int32 = 0 @@ -97,9 +95,10 @@ func validateScaleSpec( resourceCount = int32(countResources(resourceListObj)) } - var maxConsumers int32 = defaultMaxConsumers + var maxConsumers = defaultMaxConsumers if maxConsumersEnvName != "" { maxConsumersEnv := getEnvVarValue(component.PodSpec, maxConsumersEnvName, "") + var err error maxConsumers, err = ParseInt32(maxConsumersEnv, defaultMaxConsumers) if err != nil { return fmt.Errorf("failed to parse %s: %w", maxConsumersEnvName, err) @@ -109,7 +108,7 @@ func validateScaleSpec( } } - maxReplicas := replicaCalc(resourceCount, maxConsumers, numPartitions) + maxReplicas := replicaCalc(resourceCount, maxConsumers, maxShardCountMultiplier) if component.Replicas != nil && *component.Replicas > maxReplicas { component.Replicas = &maxReplicas recorder.Eventf( From da313d452bf6da814ffed03447cfb0b70b02c751 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 22:04:53 +0100 Subject: [PATCH 5/9] gen(operator): files generated from operator definition of ScalingConfig automatically-generated files only, safe to ignore during review --- .../templates/seldon-v2-crds.yaml | 76 +++++++++++++++++++ k8s/yaml/crds.yaml | 76 +++++++++++++++++++ .../mlops/v1alpha1/zz_generated.deepcopy.go | 76 +++++++++++++++++++ .../bases/mlops.seldon.io_seldonconfigs.yaml | 38 ++++++++++ .../bases/mlops.seldon.io_seldonruntimes.yaml | 38 ++++++++++ 5 files changed, 304 insertions(+) diff --git a/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml b/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml index 05066cb168..9cfa5e147d 100644 --- a/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml +++ b/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml @@ -9061,6 +9061,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: @@ -9187,6 +9225,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/k8s/yaml/crds.yaml b/k8s/yaml/crds.yaml index be8bbb2840..1eae7d0d29 100644 --- a/k8s/yaml/crds.yaml +++ b/k8s/yaml/crds.yaml @@ -9066,6 +9066,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: @@ -9193,6 +9231,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go b/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go index 68e5a08cce..d0a033c2e2 100644 --- a/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go +++ b/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go @@ -464,6 +464,21 @@ func (in *ModelList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ModelScalingConfig) DeepCopyInto(out *ModelScalingConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelScalingConfig. +func (in *ModelScalingConfig) DeepCopy() *ModelScalingConfig { + if in == nil { + return nil + } + out := new(ModelScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModelSpec) DeepCopyInto(out *ModelSpec) { *out = *in @@ -756,6 +771,21 @@ func (in *PipelineOutput) DeepCopy() *PipelineOutput { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineScalingConfig) DeepCopyInto(out *PipelineScalingConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineScalingConfig. +func (in *PipelineScalingConfig) DeepCopy() *PipelineScalingConfig { + if in == nil { + return nil + } + out := new(PipelineScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PipelineSpec) DeepCopyInto(out *PipelineSpec) { *out = *in @@ -1027,6 +1057,36 @@ func (in *RcloneConfiguration) DeepCopy() *RcloneConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScalingConfig) DeepCopyInto(out *ScalingConfig) { + *out = *in + if in.Models != nil { + in, out := &in.Models, &out.Models + *out = new(ModelScalingConfig) + **out = **in + } + if in.Servers != nil { + in, out := &in.Servers, &out.Servers + *out = new(ServerScalingConfig) + **out = **in + } + if in.Pipelines != nil { + in, out := &in.Pipelines, &out.Pipelines + *out = new(PipelineScalingConfig) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScalingConfig. +func (in *ScalingConfig) DeepCopy() *ScalingConfig { + if in == nil { + return nil + } + out := new(ScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScalingSpec) DeepCopyInto(out *ScalingSpec) { *out = *in @@ -1161,6 +1221,7 @@ func (in *SeldonConfigStatus) DeepCopy() *SeldonConfigStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SeldonConfiguration) DeepCopyInto(out *SeldonConfiguration) { *out = *in + in.ScalingConfig.DeepCopyInto(&out.ScalingConfig) out.TracingConfig = in.TracingConfig in.KafkaConfig.DeepCopyInto(&out.KafkaConfig) in.AgentConfig.DeepCopyInto(&out.AgentConfig) @@ -1450,6 +1511,21 @@ func (in *ServerList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ServerScalingConfig) DeepCopyInto(out *ServerScalingConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServerScalingConfig. +func (in *ServerScalingConfig) DeepCopy() *ServerScalingConfig { + if in == nil { + return nil + } + out := new(ServerScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServerSpec) DeepCopyInto(out *ServerSpec) { *out = *in diff --git a/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml b/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml index be89e6a1ae..70d8aaf72c 100644 --- a/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml +++ b/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml @@ -8372,6 +8372,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml b/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml index 5e32c1a975..813af60c73 100644 --- a/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml +++ b/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml @@ -94,6 +94,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: From d55b7bc900905d82121ac3f13929f6f76722eee1 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 22:07:16 +0100 Subject: [PATCH 6/9] fix(gw-*): do not rely on settings in SeldonConfig.config.kafkaConfig.topics Before Core 2.11, keep the number of partitions and replication factor as microservice environment variables. Those settings are also present in the newly introduced ScalingConfig, but they have no effect atm. This is to ensure a smooth upgrade path, and make it possible in the future to react to changes in those variables by watching the ConfigMap. --- scheduler/pkg/kafka/gateway/infer.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index f9d331ccd7..98ab655e80 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -110,19 +110,11 @@ func NewInferKafkaHandler( consumerName string, schemaRegistryClient schemaregistry.Client, ) (*InferKafkaHandler, error) { - defaultReplicationFactor, err := util.GetIntEnvar(envDefaultReplicationFactor, defaultReplicationFactor) + replicationFactor, err := util.GetIntEnvar(envDefaultReplicationFactor, defaultReplicationFactor) if err != nil { return nil, fmt.Errorf("error getting default replication factor: %v", err) } - replicationFactor, err := GetIntConfigMapValue(topicsConfigMap, replicationFactorKey, defaultReplicationFactor) - if err != nil { - return nil, fmt.Errorf("invalid Kafka topic configuration: %v", err) - } - defaultNumPartitions, err := util.GetIntEnvar(envDefaultNumPartitions, defaultNumPartitions) - if err != nil { - return nil, fmt.Errorf("error getting default number of partitions: %v", err) - } - numPartitions, err := GetIntConfigMapValue(topicsConfigMap, numPartitionsKey, defaultNumPartitions) + numPartitions, err := util.GetIntEnvar(envDefaultNumPartitions, defaultNumPartitions) if err != nil { return nil, fmt.Errorf("invalid Kafka topic configuration: %w", err) } From 9a319236d6e8ca3cf514fcbdd56a33e62c9bf8bb Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 22:17:37 +0100 Subject: [PATCH 7/9] feat(helm-chart): build default ScalingConfig based on existing helm chart values This ensures that for helm-chart users, the smoothest upgrade possible exists: The values will be correctly set in 2.10, and will be then upgradeable at runtime in 2.11. --- .../seldon-core-v2-runtime/values.yaml | 17 +++++++++-------- .../seldon-core-v2-setup/values.yaml | 4 ++-- .../seldon-core-v2-setup/values.yaml.template | 4 ++-- .../helm-components-sc/kustomization.yaml | 6 ++++++ .../helm-components-sc/kustomization.yaml | 6 ++++++ .../helm-components-sc/patch_scalingconfig.yaml | 15 +++++++++++++++ .../patch_scalingconfig_json6902.yaml | 15 +++++++++++++++ 7 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml create mode 100644 k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml diff --git a/k8s/helm-charts/seldon-core-v2-runtime/values.yaml b/k8s/helm-charts/seldon-core-v2-runtime/values.yaml index fa9276983d..5ba4be4183 100644 --- a/k8s/helm-charts/seldon-core-v2-runtime/values.yaml +++ b/k8s/helm-charts/seldon-core-v2-runtime/values.yaml @@ -4,32 +4,33 @@ seldonConfig: default hodometer: disable: false replicas: 1 - + scheduler: disable: false replicas: 1 # controlplane exposure - serviceType: LoadBalancer - + serviceType: LoadBalancer + envoy: disable: false replicas: 1 # dataplane exposure - serviceType: LoadBalancer - + serviceType: LoadBalancer + dataflow: disable: false replicas: 1 - + modelgateway: disable: false replicas: 1 - + pipelinegateway: disable: false replicas: 1 config: + scalingConfig: agentConfig: rclone: configSecrets: @@ -48,4 +49,4 @@ config: serviceConfig: serviceGRPCPrefix: serviceType: - + diff --git a/k8s/helm-charts/seldon-core-v2-setup/values.yaml b/k8s/helm-charts/seldon-core-v2-setup/values.yaml index 750a5d5888..a342155cf1 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/values.yaml +++ b/k8s/helm-charts/seldon-core-v2-setup/values.yaml @@ -85,7 +85,7 @@ opentelemetry: # logging # this is a global setting, in the case individual components logLevel is not set -# Users should set a value from: +# Users should set a value from: # fatal, error, warn, info, debug, trace # if used also for .rclone.logLevel, the allowed set reduces to: # debug, info, error @@ -245,7 +245,7 @@ scheduler: runAsGroup: 1000 runAsNonRoot: true schedulerReadyTimeoutSeconds: 600 - + autoscaling: autoscalingModelEnabled: false autoscalingServerEnabled: true diff --git a/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template b/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template index a67e39cbf5..fa7a18122f 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template +++ b/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template @@ -85,7 +85,7 @@ opentelemetry: # logging # this is a global setting, in the case individual components logLevel is not set -# Users should set a value from: +# Users should set a value from: # fatal, error, warn, info, debug, trace # if used also for .rclone.logLevel, the allowed set reduces to: # debug, info, error @@ -245,7 +245,7 @@ scheduler: runAsGroup: 1000 runAsNonRoot: true schedulerReadyTimeoutSeconds: 600 - + autoscaling: autoscalingModelEnabled: false autoscalingServerEnabled: true diff --git a/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml b/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml index e1110bf8d7..bef32904f8 100644 --- a/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml +++ b/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml @@ -18,6 +18,7 @@ patchesStrategicMerge: - ../../kustomize/helm-components-sc/patch_scheduler.yaml - ../../kustomize/helm-components-sc/patch_kafkaconfig.yaml - ../../kustomize/helm-components-sc/patch_tracingconfig.yaml +- ../../kustomize/helm-components-sc/patch_scalingconfig.yaml - ../../kustomize/helm-components-sc/patch_agentconfig.yaml - ../../kustomize/helm-components-sc/patch_serviceconfig.yaml - patch_mlserver.yaml @@ -59,6 +60,11 @@ patches: version: v1alpha1 kind: SeldonConfig name: default +- path: ../../kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml + target: + version: v1alpha1 + kind: SeldonConfig + name: default - path: ../../kustomize/helm-components-sc/patch_pipelinegateway_json6902.yaml target: version: v1alpha1 diff --git a/k8s/kustomize/helm-components-sc/kustomization.yaml b/k8s/kustomize/helm-components-sc/kustomization.yaml index 351d8b1bd0..30898153ed 100644 --- a/k8s/kustomize/helm-components-sc/kustomization.yaml +++ b/k8s/kustomize/helm-components-sc/kustomization.yaml @@ -21,6 +21,7 @@ patchesStrategicMerge: - patch_kafkaconfig.yaml - patch_tracingconfig.yaml - patch_agentconfig.yaml +- patch_scalingconfig.yaml - patch_serviceconfig.yaml patches: @@ -59,6 +60,11 @@ patches: version: v1alpha1 kind: SeldonConfig name: default +- path: patch_scalingconfig_json6902.yaml + target: + version: v1alpha1 + kind: SeldonConfig + name: default - path: patch_pipelinegateway_json6902.yaml target: version: v1alpha1 diff --git a/k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml b/k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml new file mode 100644 index 0000000000..3076e27b32 --- /dev/null +++ b/k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml @@ -0,0 +1,15 @@ +apiVersion: mlops.seldon.io/v1alpha1 +kind: SeldonConfig +metadata: + name: default +spec: + config: + scalingConfig: + models: + enabled: + servers: + enabled: + scaleDownPackingEnabled: + scaleDownPackingPercentage: + pipelines: + maxShardCountMultiplier: diff --git a/k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml b/k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml new file mode 100644 index 0000000000..995fa4c3bd --- /dev/null +++ b/k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml @@ -0,0 +1,15 @@ +- op: add + path: /spec/config/scalingConfig/models/enabled + value: HACK_REMOVE_ME{{ .Values.autoscaling.autoscalingModelEnabled }} +- op: add + path: /spec/config/scalingConfig/servers/enabled + value: HACK_REMOVE_ME{{ .Values.autoscaling.autoscalingServerEnabled }} +- op: add + path: /spec/config/scalingConfig/servers/scaleDownPackingEnabled + value: HACK_REMOVE_ME{{ .Values.autoscaling.serverPackingEnabled }} +- op: add + path: /spec/config/scalingConfig/servers/scaleDownPackingPercentage + value: HACK_REMOVE_ME{{ .Values.autoscaling.serverPackingPercentage }} +- op: add + path: /spec/config/scalingConfig/pipelines/maxShardCountMultiplier + value: HACK_REMOVE_ME{{ .Values.kafka.topics.numPartitions }} From b4221e160cb754d6c3740357fee30231d2014985 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 22:20:15 +0100 Subject: [PATCH 8/9] gen(helm-charts): files generated following SeldonConfig helm-chart changes --- .../templates/_components-deployments.tpl | 18 ++++++++++++++++++ .../templates/_components-statefulsets.tpl | 18 ++++++++++++++++++ k8s/yaml/components.yaml | 15 +++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl index b07a0f67c0..bb1db7df6c 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl +++ b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl @@ -511,6 +511,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -639,6 +640,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -652,6 +655,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume @@ -1306,6 +1312,18 @@ spec: topics: numPartitions: '{{ .Values.kafka.topics.numPartitions }}' replicationFactor: '{{ .Values.kafka.topics.replicationFactor }}' + scalingConfig: + models: + enabled: {{ .Values.autoscaling.autoscalingModelEnabled }} + pipelines: + maxShardCountMultiplier: {{ .Values.kafka.topics.numPartitions + }} + servers: + enabled: {{ .Values.autoscaling.autoscalingServerEnabled }} + scaleDownPackingEnabled: {{ .Values.autoscaling.serverPackingEnabled + }} + scaleDownPackingPercentage: {{ .Values.autoscaling.serverPackingPercentage + }} serviceConfig: grpcServicePrefix: '{{ .Values.services.serviceGRPCPrefix }}' serviceType: '{{ .Values.services.defaultServiceType }}' diff --git a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl index 4a55d3766f..e465ee3adc 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl +++ b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl @@ -511,6 +511,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -639,6 +640,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -652,6 +655,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume @@ -1306,6 +1312,18 @@ spec: topics: numPartitions: '{{ .Values.kafka.topics.numPartitions }}' replicationFactor: '{{ .Values.kafka.topics.replicationFactor }}' + scalingConfig: + models: + enabled: {{ .Values.autoscaling.autoscalingModelEnabled }} + pipelines: + maxShardCountMultiplier: {{ .Values.kafka.topics.numPartitions + }} + servers: + enabled: {{ .Values.autoscaling.autoscalingServerEnabled }} + scaleDownPackingEnabled: {{ .Values.autoscaling.serverPackingEnabled + }} + scaleDownPackingPercentage: {{ .Values.autoscaling.serverPackingPercentage + }} serviceConfig: grpcServicePrefix: '{{ .Values.services.serviceGRPCPrefix }}' serviceType: '{{ .Values.services.defaultServiceType }}' diff --git a/k8s/yaml/components.yaml b/k8s/yaml/components.yaml index ac608469de..158f95f912 100644 --- a/k8s/yaml/components.yaml +++ b/k8s/yaml/components.yaml @@ -358,6 +358,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -481,6 +482,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -493,6 +496,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume @@ -1127,6 +1133,15 @@ spec: topics: numPartitions: '1' replicationFactor: '1' + scalingConfig: + models: + enabled: false + pipelines: + maxShardCountMultiplier: 1 + servers: + enabled: true + scaleDownPackingEnabled: false + scaleDownPackingPercentage: 0 serviceConfig: grpcServicePrefix: '' serviceType: 'LoadBalancer' From 92e3f4b4f1bd6bd1e27781c4f1471ec3a31a7c27 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Tue, 7 Oct 2025 22:21:22 +0100 Subject: [PATCH 9/9] fix(dataflow): update ktor package to avoid CVEs --- scheduler/data-flow/build.gradle.kts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scheduler/data-flow/build.gradle.kts b/scheduler/data-flow/build.gradle.kts index 08c24cf99e..c1ba740ecf 100644 --- a/scheduler/data-flow/build.gradle.kts +++ b/scheduler/data-flow/build.gradle.kts @@ -58,10 +58,10 @@ dependencies { implementation("io.kubernetes:client-java:24.0.0") // HTTP server for health probes - implementation("io.ktor:ktor-server-core:3.0.1") - implementation("io.ktor:ktor-server-netty:3.0.1") - implementation("io.ktor:ktor-server-content-negotiation:3.0.1") - implementation("io.ktor:ktor-serialization-kotlinx-json:3.0.1") + implementation("io.ktor:ktor-server-core:3.3.0") + implementation("io.ktor:ktor-server-netty:3.3.0") + implementation("io.ktor:ktor-server-content-negotiation:3.3.0") + implementation("io.ktor:ktor-serialization-kotlinx-json:3.3.0") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.3") implementation("com.charleskorn.kaml:kaml:0.61.0")