From 3bbd02bce43c2290054df0b98ad2cc7963c51290 Mon Sep 17 00:00:00 2001 From: Miguel Angel Date: Thu, 2 Oct 2025 14:19:44 +0100 Subject: [PATCH 1/6] fix number of partitions retrieval --- components/kafka/pkg/config/config.go | 35 +++++++++ components/kafka/pkg/config/config_test.go | 88 ++++++++++++++++++++++ scheduler/cmd/scheduler/main.go | 6 +- 3 files changed, 125 insertions(+), 4 deletions(-) diff --git a/components/kafka/pkg/config/config.go b/components/kafka/pkg/config/config.go index 61ead8890d..2526910489 100644 --- a/components/kafka/pkg/config/config.go +++ b/components/kafka/pkg/config/config.go @@ -14,6 +14,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" "strings" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -184,6 +185,40 @@ func (kc KafkaConfig) HasKafkaBootstrapServer() bool { return bs != nil && bs != "" } +// GetNumPartitions retrieves the number of partitions from a kafka config. +// The number of partitions must be bigger than 1. +func (kc KafkaConfig) GetNumPartitions() (int, error) { + numPartition, ok := kc.Topics["numPartitions"] + if !ok { + return -1, fmt.Errorf("no numPartitions topic found") + } + + // Try int first (most common case) + if partitionInt, ok := numPartition.(int); ok { + if partitionInt < 1 { + return -1, fmt.Errorf("numPartitions %d must be bigger than 0", partitionInt) + } + return partitionInt, nil + } + + // Try string + partitionStr, ok := numPartition.(string) + if !ok { + return -1, fmt.Errorf("numPartitions topic has wrong type: %T", numPartition) + } + + numPartitions, err := strconv.Atoi(partitionStr) + if err != nil { + return -1, fmt.Errorf("invalid numPartitions: %v", numPartition) + } + + if numPartitions < 1 { + return -1, fmt.Errorf("numPartitions %d must be bigger than 0", numPartitions) + } + + return numPartitions, nil +} + func WithoutSecrets(c kafka.ConfigMap) kafka.ConfigMap { safe := make(kafka.ConfigMap) diff --git a/components/kafka/pkg/config/config_test.go b/components/kafka/pkg/config/config_test.go index 660282623a..8e2504f4a7 100644 --- a/components/kafka/pkg/config/config_test.go +++ b/components/kafka/pkg/config/config_test.go @@ -235,3 +235,91 @@ func TestParseSysLogLevel(t *testing.T) { }) } } + +func TestKafkaConfig_GetNumPartitions(t *testing.T) { + g := NewGomegaWithT(t) + + type test struct { + name string + data kafka.ConfigMap + expected int + err bool + } + + tests := []test{ + { + name: "success: integer in numPartitions", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": 1}, + expected: 1, + err: false, + }, + { + name: "fail: negative numPartitions", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": -10}, + expected: -1, + err: true, + }, + { + name: "fail: invalid int input", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": 0}, + expected: -1, + err: true, + }, + { + name: "fail: float int numPartitions", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": 0.0}, + expected: -1, + err: true, + }, + { + name: "success: string in numPartitions", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "1"}, + expected: 1, + err: false, + }, + + { + name: "fail: string and negative numPartitions", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "-10"}, + expected: -1, + err: true, + }, + { + name: "fail: invalid string input", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "-10-"}, + expected: -1, + err: true, + }, + { + name: "fail: invalid string input", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "0"}, + expected: 0, + err: true, + }, + { + name: "fail: no topic", + data: nil, + expected: 0, + err: true, + }, + { + name: "fail: float string numPartitions", + data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "0.0"}, + expected: 0, + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + kc := KafkaConfig{} + kc.Topics = test.data + numPartitions, err := kc.GetNumPartitions() + if test.err { + g.Expect(err).ToNot(BeNil()) + } else { + g.Expect(numPartitions).To(Equal(test.expected)) + } + }) + } +} diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index 0d1a939a41..1ce7067373 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -16,7 +16,6 @@ import ( "math/rand" "os" "os/signal" - "strconv" "syscall" "time" @@ -279,10 +278,9 @@ func main() { logger.WithError(err).Fatal("Failed to load Kafka config") } - numPartitions, err := strconv.Atoi(kafkaConfigMap.Topics["numPartitions"].(string)) + numPartitions, err := kafkaConfigMap.GetNumPartitions() if err != nil { - logger.WithError(err).Fatal("Failed to parse numPartitions from Kafka config. Defaulting to 1") - numPartitions = 1 + logger.WithError(err).Fatal("Failed to parse numPartitions from Kafka config") } dataFlowLoadBalancer := util.NewRingLoadBalancer(numPartitions) From a263fa000e06cac84df28dd306ea7c533bd6bf26 Mon Sep 17 00:00:00 2001 From: Miguel Angel Date: Thu, 2 Oct 2025 15:28:29 +0100 Subject: [PATCH 2/6] fix modelgateway replication factor and partition retrieval of kafka config --- components/kafka/pkg/config/config.go | 2 +- scheduler/pkg/kafka/gateway/infer.go | 28 +++++++++++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/components/kafka/pkg/config/config.go b/components/kafka/pkg/config/config.go index 2526910489..d20262ef14 100644 --- a/components/kafka/pkg/config/config.go +++ b/components/kafka/pkg/config/config.go @@ -209,7 +209,7 @@ func (kc KafkaConfig) GetNumPartitions() (int, error) { numPartitions, err := strconv.Atoi(partitionStr) if err != nil { - return -1, fmt.Errorf("invalid numPartitions: %v", numPartition) + return -1, err } if numPartitions < 1 { diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index 0134629715..283a7d06fd 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -77,9 +77,25 @@ func GetIntConfigMapValue(configMap kafka.ConfigMap, key string, defaultValue in return defaultValue, nil } - value, err := strconv.Atoi(configMapValue.(string)) + if configMapValueInt, ok := configMapValue.(int); ok { + if configMapValueInt < 0 { + return -1, fmt.Errorf("%s: %d must not be negative", key, configMapValueInt) + } + return configMapValueInt, nil + } + + configMapValueStr, ok := configMapValue.(string) + if !ok { + return defaultValue, fmt.Errorf("%s key has wrong type: %T", key, configMapValue) + } + + value, err := strconv.Atoi(configMapValueStr) if err != nil { - return 0, err + return 0, fmt.Errorf("invalid value %s in %s with error: %v", configMapValueStr, key, err) + } + + if value < 0 { + return -1, fmt.Errorf("%s: %d must be bigger than 0", key, value) } return value, nil @@ -96,19 +112,19 @@ func NewInferKafkaHandler( ) (*InferKafkaHandler, error) { defaultReplicationFactor, err := util.GetIntEnvar(envDefaultReplicationFactor, defaultReplicationFactor) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting default replication factor: %v", err) } replicationFactor, err := GetIntConfigMapValue(topicsConfigMap, replicationFactorKey, defaultReplicationFactor) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting replication factor: %v", err) } defaultNumPartitions, err := util.GetIntEnvar(envDefaultNumPartitions, defaultNumPartitions) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting default number of partitions: %v", err) } numPartitions, err := GetIntConfigMapValue(topicsConfigMap, numPartitionsKey, defaultNumPartitions) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting num partitions: %v", err) } tlsClientOptions, err := util.CreateTLSClientOptions() if err != nil { From 08bf3d0469020ffb495ed5f3a000809dae55d069 Mon Sep 17 00:00:00 2001 From: Miguel Angel Date: Thu, 2 Oct 2025 15:57:34 +0100 Subject: [PATCH 3/6] Revert "fix number of partitions retrieval" This reverts commit 3bbd02bc --- components/kafka/pkg/config/config.go | 35 --------- components/kafka/pkg/config/config_test.go | 88 ---------------------- 2 files changed, 123 deletions(-) diff --git a/components/kafka/pkg/config/config.go b/components/kafka/pkg/config/config.go index d20262ef14..61ead8890d 100644 --- a/components/kafka/pkg/config/config.go +++ b/components/kafka/pkg/config/config.go @@ -14,7 +14,6 @@ import ( "encoding/json" "fmt" "os" - "strconv" "strings" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -185,40 +184,6 @@ func (kc KafkaConfig) HasKafkaBootstrapServer() bool { return bs != nil && bs != "" } -// GetNumPartitions retrieves the number of partitions from a kafka config. -// The number of partitions must be bigger than 1. -func (kc KafkaConfig) GetNumPartitions() (int, error) { - numPartition, ok := kc.Topics["numPartitions"] - if !ok { - return -1, fmt.Errorf("no numPartitions topic found") - } - - // Try int first (most common case) - if partitionInt, ok := numPartition.(int); ok { - if partitionInt < 1 { - return -1, fmt.Errorf("numPartitions %d must be bigger than 0", partitionInt) - } - return partitionInt, nil - } - - // Try string - partitionStr, ok := numPartition.(string) - if !ok { - return -1, fmt.Errorf("numPartitions topic has wrong type: %T", numPartition) - } - - numPartitions, err := strconv.Atoi(partitionStr) - if err != nil { - return -1, err - } - - if numPartitions < 1 { - return -1, fmt.Errorf("numPartitions %d must be bigger than 0", numPartitions) - } - - return numPartitions, nil -} - func WithoutSecrets(c kafka.ConfigMap) kafka.ConfigMap { safe := make(kafka.ConfigMap) diff --git a/components/kafka/pkg/config/config_test.go b/components/kafka/pkg/config/config_test.go index 8e2504f4a7..660282623a 100644 --- a/components/kafka/pkg/config/config_test.go +++ b/components/kafka/pkg/config/config_test.go @@ -235,91 +235,3 @@ func TestParseSysLogLevel(t *testing.T) { }) } } - -func TestKafkaConfig_GetNumPartitions(t *testing.T) { - g := NewGomegaWithT(t) - - type test struct { - name string - data kafka.ConfigMap - expected int - err bool - } - - tests := []test{ - { - name: "success: integer in numPartitions", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": 1}, - expected: 1, - err: false, - }, - { - name: "fail: negative numPartitions", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": -10}, - expected: -1, - err: true, - }, - { - name: "fail: invalid int input", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": 0}, - expected: -1, - err: true, - }, - { - name: "fail: float int numPartitions", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": 0.0}, - expected: -1, - err: true, - }, - { - name: "success: string in numPartitions", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "1"}, - expected: 1, - err: false, - }, - - { - name: "fail: string and negative numPartitions", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "-10"}, - expected: -1, - err: true, - }, - { - name: "fail: invalid string input", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "-10-"}, - expected: -1, - err: true, - }, - { - name: "fail: invalid string input", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "0"}, - expected: 0, - err: true, - }, - { - name: "fail: no topic", - data: nil, - expected: 0, - err: true, - }, - { - name: "fail: float string numPartitions", - data: kafka.ConfigMap{"replication.factor": 1, "numPartitions": "0.0"}, - expected: 0, - err: true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - kc := KafkaConfig{} - kc.Topics = test.data - numPartitions, err := kc.GetNumPartitions() - if test.err { - g.Expect(err).ToNot(BeNil()) - } else { - g.Expect(numPartitions).To(Equal(test.expected)) - } - }) - } -} From d4b3d808ab3d211e41351a8c7b3c0894e7940cac Mon Sep 17 00:00:00 2001 From: Miguel Angel Date: Thu, 2 Oct 2025 16:02:58 +0100 Subject: [PATCH 4/6] fix the commit rollback --- scheduler/cmd/scheduler/main.go | 6 +- scheduler/pkg/kafka/gateway/infer_test.go | 94 +++++++++++++++++++++++ 2 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 scheduler/pkg/kafka/gateway/infer_test.go diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index 1ce7067373..0d1a939a41 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -16,6 +16,7 @@ import ( "math/rand" "os" "os/signal" + "strconv" "syscall" "time" @@ -278,9 +279,10 @@ func main() { logger.WithError(err).Fatal("Failed to load Kafka config") } - numPartitions, err := kafkaConfigMap.GetNumPartitions() + numPartitions, err := strconv.Atoi(kafkaConfigMap.Topics["numPartitions"].(string)) if err != nil { - logger.WithError(err).Fatal("Failed to parse numPartitions from Kafka config") + logger.WithError(err).Fatal("Failed to parse numPartitions from Kafka config. Defaulting to 1") + numPartitions = 1 } dataFlowLoadBalancer := util.NewRingLoadBalancer(numPartitions) diff --git a/scheduler/pkg/kafka/gateway/infer_test.go b/scheduler/pkg/kafka/gateway/infer_test.go new file mode 100644 index 0000000000..838115466d --- /dev/null +++ b/scheduler/pkg/kafka/gateway/infer_test.go @@ -0,0 +1,94 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package gateway + +import ( + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + . "github.com/onsi/gomega" +) + +func TestGetIntConfigMapValue(t *testing.T) { + g := NewGomegaWithT(t) + + type test struct { + name string + configMap kafka.ConfigMap + key string + defaultValue int + wantValue int + wantError bool + } + + tests := []test{ + { + name: "success: string", + configMap: kafka.ConfigMap{"replicationFactor": "5"}, + key: replicationFactorKey, + defaultValue: 0, + wantValue: 5, + wantError: false, + }, + { + name: "fail: negative string", + configMap: kafka.ConfigMap{"replicationFactor": "-5"}, + key: replicationFactorKey, + defaultValue: 0, + wantValue: 0, + wantError: true, + }, + { + name: "fail: float string value", + configMap: kafka.ConfigMap{"replicationFactor": "5.0"}, + key: replicationFactorKey, + defaultValue: 0, + wantValue: 0, + wantError: true, + }, + { + name: "fail: string value", + configMap: kafka.ConfigMap{"replicationFactor": "---"}, + key: replicationFactorKey, + defaultValue: 0, + wantValue: 0, + wantError: true, + }, + { + name: "success: integer", + configMap: kafka.ConfigMap{"replicationFactor": 5}, + key: replicationFactorKey, + defaultValue: 0, + wantValue: 5, + wantError: false, + }, + { + name: "fail: negative integer", + configMap: kafka.ConfigMap{"replicationFactor": -5}, + key: replicationFactorKey, + defaultValue: 0, + wantValue: 0, + wantError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotInt, err := GetIntConfigMapValue(test.configMap, test.key, test.defaultValue) + if test.wantError { + g.Expect(err).ToNot(BeNil()) + } else { + g.Expect(gotInt).To(Equal(test.wantValue)) + } + }) + + } + +} From 76b83fc676c22e94c50d4b0cb52245d30cd30910 Mon Sep 17 00:00:00 2001 From: Miguel Angel Date: Thu, 2 Oct 2025 16:05:36 +0100 Subject: [PATCH 5/6] fix int --- scheduler/pkg/kafka/gateway/infer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index 283a7d06fd..4f8dd238cc 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -91,7 +91,7 @@ func GetIntConfigMapValue(configMap kafka.ConfigMap, key string, defaultValue in value, err := strconv.Atoi(configMapValueStr) if err != nil { - return 0, fmt.Errorf("invalid value %s in %s with error: %v", configMapValueStr, key, err) + return -1, fmt.Errorf("invalid value %s in %s with error: %v", configMapValueStr, key, err) } if value < 0 { From af8442539df68265d57e121181e9664894f137cf Mon Sep 17 00:00:00 2001 From: Miguel Angel Date: Thu, 2 Oct 2025 17:01:47 +0100 Subject: [PATCH 6/6] better error logging --- scheduler/pkg/kafka/gateway/infer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index 4f8dd238cc..f9d331ccd7 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -116,7 +116,7 @@ func NewInferKafkaHandler( } replicationFactor, err := GetIntConfigMapValue(topicsConfigMap, replicationFactorKey, defaultReplicationFactor) if err != nil { - return nil, fmt.Errorf("error getting replication factor: %v", err) + return nil, fmt.Errorf("invalid Kafka topic configuration: %v", err) } defaultNumPartitions, err := util.GetIntEnvar(envDefaultNumPartitions, defaultNumPartitions) if err != nil { @@ -124,7 +124,7 @@ func NewInferKafkaHandler( } numPartitions, err := GetIntConfigMapValue(topicsConfigMap, numPartitionsKey, defaultNumPartitions) if err != nil { - return nil, fmt.Errorf("error getting num partitions: %v", err) + return nil, fmt.Errorf("invalid Kafka topic configuration: %w", err) } tlsClientOptions, err := util.CreateTLSClientOptions() if err != nil {