diff --git a/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml b/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml index 05066cb168..9cfa5e147d 100644 --- a/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml +++ b/k8s/helm-charts/seldon-core-v2-crds/templates/seldon-v2-crds.yaml @@ -9061,6 +9061,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: @@ -9187,6 +9225,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/k8s/helm-charts/seldon-core-v2-runtime/values.yaml b/k8s/helm-charts/seldon-core-v2-runtime/values.yaml index fa9276983d..5ba4be4183 100644 --- a/k8s/helm-charts/seldon-core-v2-runtime/values.yaml +++ b/k8s/helm-charts/seldon-core-v2-runtime/values.yaml @@ -4,32 +4,33 @@ seldonConfig: default hodometer: disable: false replicas: 1 - + scheduler: disable: false replicas: 1 # controlplane exposure - serviceType: LoadBalancer - + serviceType: LoadBalancer + envoy: disable: false replicas: 1 # dataplane exposure - serviceType: LoadBalancer - + serviceType: LoadBalancer + dataflow: disable: false replicas: 1 - + modelgateway: disable: false replicas: 1 - + pipelinegateway: disable: false replicas: 1 config: + scalingConfig: agentConfig: rclone: configSecrets: @@ -48,4 +49,4 @@ config: serviceConfig: serviceGRPCPrefix: serviceType: - + diff --git a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl index b07a0f67c0..bb1db7df6c 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl +++ b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl @@ -511,6 +511,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -639,6 +640,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -652,6 +655,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume @@ -1306,6 +1312,18 @@ spec: topics: numPartitions: '{{ .Values.kafka.topics.numPartitions }}' replicationFactor: '{{ .Values.kafka.topics.replicationFactor }}' + scalingConfig: + models: + enabled: {{ .Values.autoscaling.autoscalingModelEnabled }} + pipelines: + maxShardCountMultiplier: {{ .Values.kafka.topics.numPartitions + }} + servers: + enabled: {{ .Values.autoscaling.autoscalingServerEnabled }} + scaleDownPackingEnabled: {{ .Values.autoscaling.serverPackingEnabled + }} + scaleDownPackingPercentage: {{ .Values.autoscaling.serverPackingPercentage + }} serviceConfig: grpcServicePrefix: '{{ .Values.services.serviceGRPCPrefix }}' serviceType: '{{ .Values.services.defaultServiceType }}' diff --git a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl index 4a55d3766f..e465ee3adc 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl +++ b/k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl @@ -511,6 +511,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -639,6 +640,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -652,6 +655,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume @@ -1306,6 +1312,18 @@ spec: topics: numPartitions: '{{ .Values.kafka.topics.numPartitions }}' replicationFactor: '{{ .Values.kafka.topics.replicationFactor }}' + scalingConfig: + models: + enabled: {{ .Values.autoscaling.autoscalingModelEnabled }} + pipelines: + maxShardCountMultiplier: {{ .Values.kafka.topics.numPartitions + }} + servers: + enabled: {{ .Values.autoscaling.autoscalingServerEnabled }} + scaleDownPackingEnabled: {{ .Values.autoscaling.serverPackingEnabled + }} + scaleDownPackingPercentage: {{ .Values.autoscaling.serverPackingPercentage + }} serviceConfig: grpcServicePrefix: '{{ .Values.services.serviceGRPCPrefix }}' serviceType: '{{ .Values.services.defaultServiceType }}' diff --git a/k8s/helm-charts/seldon-core-v2-setup/values.yaml b/k8s/helm-charts/seldon-core-v2-setup/values.yaml index 750a5d5888..a342155cf1 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/values.yaml +++ b/k8s/helm-charts/seldon-core-v2-setup/values.yaml @@ -85,7 +85,7 @@ opentelemetry: # logging # this is a global setting, in the case individual components logLevel is not set -# Users should set a value from: +# Users should set a value from: # fatal, error, warn, info, debug, trace # if used also for .rclone.logLevel, the allowed set reduces to: # debug, info, error @@ -245,7 +245,7 @@ scheduler: runAsGroup: 1000 runAsNonRoot: true schedulerReadyTimeoutSeconds: 600 - + autoscaling: autoscalingModelEnabled: false autoscalingServerEnabled: true diff --git a/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template b/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template index a67e39cbf5..fa7a18122f 100644 --- a/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template +++ b/k8s/helm-charts/seldon-core-v2-setup/values.yaml.template @@ -85,7 +85,7 @@ opentelemetry: # logging # this is a global setting, in the case individual components logLevel is not set -# Users should set a value from: +# Users should set a value from: # fatal, error, warn, info, debug, trace # if used also for .rclone.logLevel, the allowed set reduces to: # debug, info, error @@ -245,7 +245,7 @@ scheduler: runAsGroup: 1000 runAsNonRoot: true schedulerReadyTimeoutSeconds: 600 - + autoscaling: autoscalingModelEnabled: false autoscalingServerEnabled: true diff --git a/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml b/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml index e1110bf8d7..bef32904f8 100644 --- a/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml +++ b/k8s/kustomize-experimental/helm-components-sc/kustomization.yaml @@ -18,6 +18,7 @@ patchesStrategicMerge: - ../../kustomize/helm-components-sc/patch_scheduler.yaml - ../../kustomize/helm-components-sc/patch_kafkaconfig.yaml - ../../kustomize/helm-components-sc/patch_tracingconfig.yaml +- ../../kustomize/helm-components-sc/patch_scalingconfig.yaml - ../../kustomize/helm-components-sc/patch_agentconfig.yaml - ../../kustomize/helm-components-sc/patch_serviceconfig.yaml - patch_mlserver.yaml @@ -59,6 +60,11 @@ patches: version: v1alpha1 kind: SeldonConfig name: default +- path: ../../kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml + target: + version: v1alpha1 + kind: SeldonConfig + name: default - path: ../../kustomize/helm-components-sc/patch_pipelinegateway_json6902.yaml target: version: v1alpha1 diff --git a/k8s/kustomize/helm-components-sc/kustomization.yaml b/k8s/kustomize/helm-components-sc/kustomization.yaml index 351d8b1bd0..30898153ed 100644 --- a/k8s/kustomize/helm-components-sc/kustomization.yaml +++ b/k8s/kustomize/helm-components-sc/kustomization.yaml @@ -21,6 +21,7 @@ patchesStrategicMerge: - patch_kafkaconfig.yaml - patch_tracingconfig.yaml - patch_agentconfig.yaml +- patch_scalingconfig.yaml - patch_serviceconfig.yaml patches: @@ -59,6 +60,11 @@ patches: version: v1alpha1 kind: SeldonConfig name: default +- path: patch_scalingconfig_json6902.yaml + target: + version: v1alpha1 + kind: SeldonConfig + name: default - path: patch_pipelinegateway_json6902.yaml target: version: v1alpha1 diff --git a/k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml b/k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml new file mode 100644 index 0000000000..3076e27b32 --- /dev/null +++ b/k8s/kustomize/helm-components-sc/patch_scalingconfig.yaml @@ -0,0 +1,15 @@ +apiVersion: mlops.seldon.io/v1alpha1 +kind: SeldonConfig +metadata: + name: default +spec: + config: + scalingConfig: + models: + enabled: + servers: + enabled: + scaleDownPackingEnabled: + scaleDownPackingPercentage: + pipelines: + maxShardCountMultiplier: diff --git a/k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml b/k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml new file mode 100644 index 0000000000..995fa4c3bd --- /dev/null +++ b/k8s/kustomize/helm-components-sc/patch_scalingconfig_json6902.yaml @@ -0,0 +1,15 @@ +- op: add + path: /spec/config/scalingConfig/models/enabled + value: HACK_REMOVE_ME{{ .Values.autoscaling.autoscalingModelEnabled }} +- op: add + path: /spec/config/scalingConfig/servers/enabled + value: HACK_REMOVE_ME{{ .Values.autoscaling.autoscalingServerEnabled }} +- op: add + path: /spec/config/scalingConfig/servers/scaleDownPackingEnabled + value: HACK_REMOVE_ME{{ .Values.autoscaling.serverPackingEnabled }} +- op: add + path: /spec/config/scalingConfig/servers/scaleDownPackingPercentage + value: HACK_REMOVE_ME{{ .Values.autoscaling.serverPackingPercentage }} +- op: add + path: /spec/config/scalingConfig/pipelines/maxShardCountMultiplier + value: HACK_REMOVE_ME{{ .Values.kafka.topics.numPartitions }} diff --git a/k8s/yaml/components.yaml b/k8s/yaml/components.yaml index ac608469de..158f95f912 100644 --- a/k8s/yaml/components.yaml +++ b/k8s/yaml/components.yaml @@ -358,6 +358,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -481,6 +482,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -493,6 +496,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume @@ -1127,6 +1133,15 @@ spec: topics: numPartitions: '1' replicationFactor: '1' + scalingConfig: + models: + enabled: false + pipelines: + maxShardCountMultiplier: 1 + servers: + enabled: true + scaleDownPackingEnabled: false + scaleDownPackingPercentage: 0 serviceConfig: grpcServicePrefix: '' serviceType: 'LoadBalancer' diff --git a/k8s/yaml/crds.yaml b/k8s/yaml/crds.yaml index be8bbb2840..1eae7d0d29 100644 --- a/k8s/yaml/crds.yaml +++ b/k8s/yaml/crds.yaml @@ -9066,6 +9066,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: @@ -9193,6 +9231,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/operator/apis/mlops/v1alpha1/seldonconfig_types.go b/operator/apis/mlops/v1alpha1/seldonconfig_types.go index c78b46f470..198eed5166 100644 --- a/operator/apis/mlops/v1alpha1/seldonconfig_types.go +++ b/operator/apis/mlops/v1alpha1/seldonconfig_types.go @@ -41,6 +41,8 @@ type SeldonConfigSpec struct { } type SeldonConfiguration struct { + // Control scaling parameters for various components + ScalingConfig ScalingConfig `json:"scalingConfig,omitempty"` TracingConfig TracingConfig `json:"tracingConfig,omitempty"` KafkaConfig KafkaConfig `json:"kafkaConfig,omitempty"` AgentConfig AgentConfiguration `json:"agentConfig,omitempty"` @@ -79,6 +81,37 @@ type TracingConfig struct { Ratio string `json:"ratio,omitempty"` } +type ScalingConfig struct { + Models *ModelScalingConfig `json:"models,omitempty"` + Servers *ServerScalingConfig `json:"servers,omitempty"` + // Scaling config impacting pipeline-gateway, dataflow-engine and model-gateway + Pipelines *PipelineScalingConfig `json:"pipelines,omitempty"` +} + +type ModelScalingConfig struct { + Enable bool `json:"enabled,omitempty"` +} + +type ServerScalingConfig struct { + Enable bool `json:"enabled,omitempty"` + ScaleDownPackingEnabled bool `json:"scaleDownPackingEnabled,omitempty"` + ScaleDownPackingPercentage int32 `json:"scaleDownPackingPercentage,omitempty"` +} + +type PipelineScalingConfig struct { + // MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + // replicas of pipeline components. + // + // - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + // `maxShardCountMultiplier * number of pipelines` + // - For model-gateway, the max number of replicas is + // `maxShardCountMultiplier * number of consumers` + // + // It doesn't make sense to set this to a value larger than the number of partitions for kafka + // topics used in the Core 2 install. + MaxShardCountMultiplier int32 `json:"maxShardCountMultiplier,omitempty"` +} + type ComponentDefn struct { // +kubebuilder:validation:Required @@ -135,6 +168,7 @@ func (s *SeldonConfiguration) AddDefaults(defaults SeldonConfiguration) { s.KafkaConfig.addDefaults(defaults.KafkaConfig) s.AgentConfig.addDefaults(defaults.AgentConfig) s.ServiceConfig.addDefaults(defaults.ServiceConfig) + s.ScalingConfig.addDefaults(defaults.ScalingConfig) } func (k *KafkaConfig) addDefaults(defaults KafkaConfig) { @@ -184,6 +218,18 @@ func (k *KafkaConfig) addDefaults(defaults KafkaConfig) { } } +func (sc *ScalingConfig) addDefaults(defaults ScalingConfig) { + if sc.Models == nil && defaults.Models != nil { + sc.Models = defaults.Models + } + if sc.Servers == nil && defaults.Servers != nil { + sc.Servers = defaults.Servers + } + if sc.Pipelines == nil && defaults.Pipelines != nil { + sc.Pipelines = defaults.Pipelines + } +} + func (a *AgentConfiguration) addDefaults(defaults AgentConfiguration) { a.Rclone.addDefaults(defaults.Rclone) } diff --git a/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go b/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go index 68e5a08cce..d0a033c2e2 100644 --- a/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go +++ b/operator/apis/mlops/v1alpha1/zz_generated.deepcopy.go @@ -464,6 +464,21 @@ func (in *ModelList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ModelScalingConfig) DeepCopyInto(out *ModelScalingConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelScalingConfig. +func (in *ModelScalingConfig) DeepCopy() *ModelScalingConfig { + if in == nil { + return nil + } + out := new(ModelScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModelSpec) DeepCopyInto(out *ModelSpec) { *out = *in @@ -756,6 +771,21 @@ func (in *PipelineOutput) DeepCopy() *PipelineOutput { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineScalingConfig) DeepCopyInto(out *PipelineScalingConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineScalingConfig. +func (in *PipelineScalingConfig) DeepCopy() *PipelineScalingConfig { + if in == nil { + return nil + } + out := new(PipelineScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PipelineSpec) DeepCopyInto(out *PipelineSpec) { *out = *in @@ -1027,6 +1057,36 @@ func (in *RcloneConfiguration) DeepCopy() *RcloneConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScalingConfig) DeepCopyInto(out *ScalingConfig) { + *out = *in + if in.Models != nil { + in, out := &in.Models, &out.Models + *out = new(ModelScalingConfig) + **out = **in + } + if in.Servers != nil { + in, out := &in.Servers, &out.Servers + *out = new(ServerScalingConfig) + **out = **in + } + if in.Pipelines != nil { + in, out := &in.Pipelines, &out.Pipelines + *out = new(PipelineScalingConfig) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScalingConfig. +func (in *ScalingConfig) DeepCopy() *ScalingConfig { + if in == nil { + return nil + } + out := new(ScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScalingSpec) DeepCopyInto(out *ScalingSpec) { *out = *in @@ -1161,6 +1221,7 @@ func (in *SeldonConfigStatus) DeepCopy() *SeldonConfigStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SeldonConfiguration) DeepCopyInto(out *SeldonConfiguration) { *out = *in + in.ScalingConfig.DeepCopyInto(&out.ScalingConfig) out.TracingConfig = in.TracingConfig in.KafkaConfig.DeepCopyInto(&out.KafkaConfig) in.AgentConfig.DeepCopyInto(&out.AgentConfig) @@ -1450,6 +1511,21 @@ func (in *ServerList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ServerScalingConfig) DeepCopyInto(out *ServerScalingConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServerScalingConfig. +func (in *ServerScalingConfig) DeepCopy() *ServerScalingConfig { + if in == nil { + return nil + } + out := new(ServerScalingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServerSpec) DeepCopyInto(out *ServerSpec) { *out = *in diff --git a/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml b/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml index be89e6a1ae..70d8aaf72c 100644 --- a/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml +++ b/operator/config/crd/bases/mlops.seldon.io_seldonconfigs.yaml @@ -8372,6 +8372,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml b/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml index 5e32c1a975..813af60c73 100644 --- a/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml +++ b/operator/config/crd/bases/mlops.seldon.io_seldonruntimes.yaml @@ -94,6 +94,44 @@ spec: x-kubernetes-int-or-string: true type: object type: object + scalingConfig: + description: Control scaling parameters for various components + properties: + models: + properties: + enabled: + type: boolean + type: object + pipelines: + description: Scaling config impacting pipeline-gateway, dataflow-engine + and model-gateway + properties: + maxShardCountMultiplier: + description: |- + MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + replicas of pipeline components. + + - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + `maxShardCountMultiplier * number of pipelines` + - For model-gateway, the max number of replicas is + `maxShardCountMultiplier * number of consumers` + + It doesn't make sense to set this to a value larger than the number of partitions for kafka + topics used in the Core 2 install. + format: int32 + type: integer + type: object + servers: + properties: + enabled: + type: boolean + scaleDownPackingEnabled: + type: boolean + scaleDownPackingPercentage: + format: int32 + type: integer + type: object + type: object serviceConfig: properties: grpcServicePrefix: diff --git a/operator/config/seldonconfigs/default.yaml b/operator/config/seldonconfigs/default.yaml index aa7c9ec49b..36a60ae531 100644 --- a/operator/config/seldonconfigs/default.yaml +++ b/operator/config/seldonconfigs/default.yaml @@ -332,6 +332,7 @@ spec: - --db-path=/mnt/scheduler/db - --allow-plaintxt=$(ALLOW_PLAINTXT) - --kafka-config-path=/mnt/kafka/kafka.json + - --scaling-config-path=/mnt/scaling/scaling.yaml - --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS) - --server-packing-enabled=$(SERVER_PACKING_ENABLED) - --server-packing-percentage=$(SERVER_PACKING_PERCENTAGE) @@ -413,6 +414,8 @@ spec: volumeMounts: - mountPath: /mnt/kafka name: kafka-config-volume + - mountPath: /mnt/scaling + name: scaling-config-volume - mountPath: /mnt/tracing name: tracing-config-volume - mountPath: /mnt/scheduler @@ -420,6 +423,9 @@ spec: serviceAccountName: seldon-scheduler terminationGracePeriodSeconds: 5 volumes: + - configMap: + name: seldon-scaling + name: scaling-config-volume - configMap: name: seldon-kafka name: kafka-config-volume diff --git a/operator/controllers/reconcilers/seldon/configmap_reconciler.go b/operator/controllers/reconcilers/seldon/configmap_reconciler.go index 0c79a09c94..d0ce62ea62 100644 --- a/operator/controllers/reconcilers/seldon/configmap_reconciler.go +++ b/operator/controllers/reconcilers/seldon/configmap_reconciler.go @@ -30,9 +30,10 @@ import ( ) const ( - agentConfigMapName = "seldon-agent" - kafkaConfigMapName = "seldon-kafka" - traceConfigMapName = "seldon-tracing" + agentConfigMapName = "seldon-agent" + scalingConfigMapName = "seldon-scaling" + kafkaConfigMapName = "seldon-kafka" + traceConfigMapName = "seldon-tracing" ) type ConfigMapReconciler struct { @@ -76,10 +77,15 @@ func toConfigMaps(config *mlopsv1alpha1.SeldonConfiguration, meta metav1.ObjectM if err != nil { return nil, err } + scalingConfigMap, err := getScalingConfigMap(config.ScalingConfig, meta.Namespace) + if err != nil { + return nil, err + } return []*v1.ConfigMap{ agentConfigMap, kafkaConfigMap, tracingConfigMap, + scalingConfigMap, }, nil } @@ -99,6 +105,22 @@ func getAgentConfigMap(agentConfig mlopsv1alpha1.AgentConfiguration, namespace s }, nil } +func getScalingConfigMap(scalingConfig mlopsv1alpha1.ScalingConfig, namespace string) (*v1.ConfigMap, error) { + scalingJson, err := yaml.Marshal(scalingConfig) + if err != nil { + return nil, err + } + return &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: scalingConfigMapName, + Namespace: namespace, + }, + Data: map[string]string{ + "scaling.yaml": string(scalingJson), + }, + }, nil +} + func getKafkaConfigMap(kafkaConfig mlopsv1alpha1.KafkaConfig, namespace string) (*v1.ConfigMap, error) { kafkaJson, err := json.Marshal(kafkaConfig) if err != nil { diff --git a/operator/controllers/reconcilers/seldon/runtime_reconciler.go b/operator/controllers/reconcilers/seldon/runtime_reconciler.go index 9d71172f4f..cc145cb145 100644 --- a/operator/controllers/reconcilers/seldon/runtime_reconciler.go +++ b/operator/controllers/reconcilers/seldon/runtime_reconciler.go @@ -24,7 +24,7 @@ import ( ) const ( - DEFAULT_NUM_PARTITIONS = 1 + DEFAULT_MAX_SHARD_COUNT_MULTIPLIER = 1 DEFAULT_MODELGATEWAY_MAX_NUM_CONSUMERS = 100 DEFAULT_PIPELINEGATEWAY_MAX_NUM_CONSUMERS = 100 @@ -81,12 +81,10 @@ func validateScaleSpec( ) error { ctx, clt, recorder := commonConfig.Ctx, commonConfig.Client, commonConfig.Recorder - numPartitions, err := ParseInt32( - runtime.Spec.Config.KafkaConfig.Topics["numPartitions"].StrVal, - DEFAULT_NUM_PARTITIONS, - ) - if err != nil { - return fmt.Errorf("failed to parse numPartitions from KafkaConfig: %w", err) + var maxShardCountMultiplier int32 = DEFAULT_MAX_SHARD_COUNT_MULTIPLIER + pipelineScaleConfig := runtime.Spec.Config.ScalingConfig.Pipelines + if pipelineScaleConfig != nil { + maxShardCountMultiplier = runtime.Spec.Config.ScalingConfig.Pipelines.MaxShardCountMultiplier } var resourceCount int32 = 0 @@ -97,9 +95,10 @@ func validateScaleSpec( resourceCount = int32(countResources(resourceListObj)) } - var maxConsumers int32 = defaultMaxConsumers + var maxConsumers = defaultMaxConsumers if maxConsumersEnvName != "" { maxConsumersEnv := getEnvVarValue(component.PodSpec, maxConsumersEnvName, "") + var err error maxConsumers, err = ParseInt32(maxConsumersEnv, defaultMaxConsumers) if err != nil { return fmt.Errorf("failed to parse %s: %w", maxConsumersEnvName, err) @@ -109,7 +108,7 @@ func validateScaleSpec( } } - maxReplicas := replicaCalc(resourceCount, maxConsumers, numPartitions) + maxReplicas := replicaCalc(resourceCount, maxConsumers, maxShardCountMultiplier) if component.Replicas != nil && *component.Replicas > maxReplicas { component.Replicas = &maxReplicas recorder.Eventf( diff --git a/scheduler/cmd/agent/main.go b/scheduler/cmd/agent/main.go index 28d7e65a7e..8fa58799de 100644 --- a/scheduler/cmd/agent/main.go +++ b/scheduler/cmd/agent/main.go @@ -183,7 +183,7 @@ func main() { } defer func() { _ = agentConfigHandler.Close() - logger.Info("Closed agent handler") + logger.Info("Closed agent config watcher") }() // Create Rclone client diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index 85d4e9ba44..9676b8c7a5 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -16,7 +16,6 @@ import ( "math/rand" "os" "os/signal" - "strconv" "syscall" "time" @@ -35,6 +34,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache" health_probe "github.com/seldonio/seldon-core/scheduler/v2/pkg/health-probe" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/dataflow" + scaling_config "github.com/seldonio/seldon-core/scheduler/v2/pkg/scaling/config" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner" schedulerServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/server" @@ -66,6 +66,7 @@ var ( autoscalingModelEnabled bool autoscalingServerEnabled bool kafkaConfigPath string + scalingConfigPath string schedulerReadyTimeoutSeconds uint deletedResourceTTLSeconds uint serverPackingEnabled bool @@ -127,6 +128,13 @@ func init() { flag.BoolVar(&allowPlaintxt, "allow-plaintxt", true, "Allow plain text scheduler server") // Autoscaling + // Scaling config path + flag.StringVar( + &scalingConfigPath, + "scaling-config-path", + "/mnt/config/scaling.json", + "Path to scaling configuration file", + ) flag.BoolVar(&autoscalingModelEnabled, "enable-model-autoscaling", false, "Enable native model autoscaling feature") flag.BoolVar(&autoscalingServerEnabled, "enable-server-autoscaling", true, "Enable native server autoscaling feature") @@ -278,20 +286,25 @@ func main() { if err != nil { logger.WithError(err).Fatal("Failed to load Kafka config") } - - numPartitions, err := strconv.Atoi(kafkaConfigMap.Topics["numPartitions"].(string)) + scalingConfigHdl, err := scaling_config.NewScalingConfigHandler(scalingConfigPath, namespace, logger) if err != nil { - logger.WithError(err).Fatal("Failed to parse numPartitions from Kafka config. Defaulting to 1") - numPartitions = 1 + logger.WithError(err).Fatalf("Failed to load Scaling config from %s", scalingConfigPath) } + defer func() { + _ = scalingConfigHdl.Close() + logger.Info("Closed scheduler scaling config watcher") + }() + + maxShardCountMultiplier := scalingConfigHdl.GetConfiguration().Pipelines.MaxShardCountMultiplier - dataFlowLoadBalancer := util.NewRingLoadBalancer(numPartitions) - log.Info("Using ring load balancer for data flow with numPartitions: ", numPartitions) + dataFlowLoadBalancer := util.NewRingLoadBalancer(maxShardCountMultiplier) + log.Info("Using ring load balancer for data flow with numPartitions: ", maxShardCountMultiplier) - cs, err := dataflow.NewChainerServer(logger, eventHub, ps, namespace, dataFlowLoadBalancer, kafkaConfigMap) + cs, err := dataflow.NewChainerServer(logger, eventHub, ps, namespace, dataFlowLoadBalancer, kafkaConfigMap, scalingConfigHdl) if err != nil { logger.WithError(err).Fatal("Failed to start data engine chainer server") } + defer cs.Stop() go func() { err := cs.StartGrpcServer(chainerPort) if err != nil { @@ -334,9 +347,9 @@ func main() { eventHub, ) - // scheduler <-> controller grpc - modelGwLoadBalancer := util.NewRingLoadBalancer(numPartitions) - pipelineGWLoadBalancer := util.NewRingLoadBalancer(numPartitions) + // scheduler <-> controller and {pipeline,model-gw} grpc + modelGwLoadBalancer := util.NewRingLoadBalancer(maxShardCountMultiplier) + pipelineGWLoadBalancer := util.NewRingLoadBalancer(maxShardCountMultiplier) s := schedulerServer.NewSchedulerServer( logger, ss, es, ps, sched, eventHub, sync, schedulerServer.SchedulerServerConfig{ @@ -347,8 +360,10 @@ func main() { kafkaConfigMap.ConsumerGroupIdPrefix, modelGwLoadBalancer, pipelineGWLoadBalancer, + scalingConfigHdl, *tlsOptions, ) + defer s.Stop() err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort) if err != nil { @@ -390,10 +405,9 @@ func main() { s.StopSendExperimentEvents() s.StopSendPipelineEvents() s.StopSendControlPlaneEvents() - cs.StopSendPipelineEvents() as.StopAgentStreams() - log.Info("Shutdown services") + log.Info("All services have shut down cleanly") } type probe struct { diff --git a/scheduler/data-flow/build.gradle.kts b/scheduler/data-flow/build.gradle.kts index 08c24cf99e..c1ba740ecf 100644 --- a/scheduler/data-flow/build.gradle.kts +++ b/scheduler/data-flow/build.gradle.kts @@ -58,10 +58,10 @@ dependencies { implementation("io.kubernetes:client-java:24.0.0") // HTTP server for health probes - implementation("io.ktor:ktor-server-core:3.0.1") - implementation("io.ktor:ktor-server-netty:3.0.1") - implementation("io.ktor:ktor-server-content-negotiation:3.0.1") - implementation("io.ktor:ktor-serialization-kotlinx-json:3.0.1") + implementation("io.ktor:ktor-server-core:3.3.0") + implementation("io.ktor:ktor-server-netty:3.3.0") + implementation("io.ktor:ktor-server-content-negotiation:3.3.0") + implementation("io.ktor:ktor-serialization-kotlinx-json:3.3.0") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.3") implementation("com.charleskorn.kaml:kaml:0.61.0") diff --git a/scheduler/pkg/agent/config/config.go b/scheduler/pkg/agent/config/config.go index c4a4c97369..a3fb0000fe 100644 --- a/scheduler/pkg/agent/config/config.go +++ b/scheduler/pkg/agent/config/config.go @@ -10,19 +10,12 @@ the Change License after the Change Date as each is defined in accordance with t package config import ( - "fmt" - "os" - "path" - "sync" "time" - "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" - yaml "gopkg.in/yaml.v2" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" - "knative.dev/pkg/configmap" - "knative.dev/pkg/configmap/informer" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/config" ) const ( @@ -31,6 +24,18 @@ const ( ServiceReadyRetryMaxInterval = 30 * time.Second ) +var ( + DefaultAgentConfiguration = AgentConfiguration{ + Rclone: &RcloneConfiguration{ + ConfigSecrets: []string{}, + Config: []string{}, + }, + Kafka: &KafkaConfiguration{ + Active: false, + }, + } +) + type AgentConfiguration struct { Rclone *RcloneConfiguration `json:"rclone,omitempty" yaml:"rclone,omitempty"` Kafka *KafkaConfiguration `json:"kafka,omitempty" yaml:"kafka,omitempty"` @@ -46,203 +51,67 @@ type KafkaConfiguration struct { Broker string `json:"broker,omitempty" yaml:"broker,omitempty"` } -type AgentConfigHandler struct { - logger log.FieldLogger - mu sync.RWMutex - config *AgentConfiguration - listeners []chan<- AgentConfiguration - watcher *fsnotify.Watcher - fileWatcherDone chan struct{} - namespace string - configFilePath string - configMapWatcherDone chan struct{} -} - -func NewAgentConfigHandler(configPath string, namespace string, logger log.FieldLogger, clientset kubernetes.Interface) (*AgentConfigHandler, error) { - configHandler := &AgentConfigHandler{ - logger: logger.WithField("source", "AgentConfigHandler"), - namespace: namespace, - } - if configPath != "" { - logger.Infof("Init config from path %s", configPath) - err := configHandler.initConfigFromPath(configPath) - if err != nil { - return nil, err - } - } +type AgentConfigHandler = config.ConfigWatcher[AgentConfiguration, *AgentConfiguration] - err := configHandler.initWatcher(configPath, namespace, clientset) - if err != nil { - return nil, err - } +func (ac *AgentConfiguration) DeepCopy() AgentConfiguration { + var rcloneCopy *RcloneConfiguration + var kafkaCopy *KafkaConfiguration - return configHandler, nil -} - -func (a *AgentConfigHandler) initConfigFromPath(configPath string) error { - m, err := configmap.Load(configPath) - if err != nil { - return err - } - - if v, ok := m[AgentConfigYamlFilename]; ok { - err = a.updateConfig([]byte(v)) - if err != nil { - return err + if ac.Rclone != nil { + // Maintain nil slices if settings are not present. + // This is important because json.Marshal treats nil and empty slices differently. + var cs []string + if len(ac.Rclone.ConfigSecrets) > 0 { + cs = make([]string, len(ac.Rclone.ConfigSecrets)) + copy(cs, ac.Rclone.ConfigSecrets) } - a.configFilePath = path.Join(configPath, AgentConfigYamlFilename) - return nil - } - return fmt.Errorf("Failed to find config file %s", AgentConfigYamlFilename) -} - -func (a *AgentConfigHandler) initWatcher(configPath string, namespace string, clientset kubernetes.Interface) error { - logger := a.logger.WithField("func", "initWatcher") - if namespace != "" { // Running in k8s - err := a.watchConfigMap(clientset) - if err != nil { - return err + var cfg []string + if len(ac.Rclone.Config) > 0 { + cfg = make([]string, len(ac.Rclone.Config)) + copy(cfg, ac.Rclone.Config) } - } else if configPath != "" { // Watch local file - err := a.watchFile(a.configFilePath) - if err != nil { - return err + rcloneDeepCopy := RcloneConfiguration{ + ConfigSecrets: cs, + Config: cfg, } + rcloneCopy = &rcloneDeepCopy } else { - logger.Warnf("No config available on initialization") + rcloneCopy = nil } - return nil -} -func (a *AgentConfigHandler) Close() error { - if a == nil { - return nil - } - a.logger.Info("Starting graceful shutdown") - if a.fileWatcherDone != nil { - close(a.fileWatcherDone) - } - if a.configMapWatcherDone != nil { - close(a.configMapWatcherDone) - } - if a.watcher != nil { - return a.watcher.Close() + if ac.Kafka != nil { + kafkaDeepCopy := *ac.Kafka + kafkaCopy = &kafkaDeepCopy + } else { + kafkaCopy = nil } - for _, c := range a.listeners { - close(c) + + return AgentConfiguration{ + Rclone: rcloneCopy, + Kafka: kafkaCopy, } - a.logger.Infof("Finished graceful shutdown") - return nil } -func (a *AgentConfigHandler) AddListener(c chan<- AgentConfiguration) *AgentConfiguration { - a.mu.Lock() - defer a.mu.Unlock() - a.listeners = append(a.listeners, c) - return a.config +func (ac *AgentConfiguration) Default() AgentConfiguration { + return DefaultAgentConfiguration.DeepCopy() } -func (a *AgentConfigHandler) GetConfiguration() *AgentConfiguration { - a.mu.RLock() - defer a.mu.RUnlock() - return a.config +func NewAgentConfigHandler(configPath string, namespace string, logger log.FieldLogger, clientset kubernetes.Interface) (*AgentConfigHandler, error) { + return config.NewConfigWatcher( + configPath, + AgentConfigYamlFilename, + namespace, + false, // watch mounted config file rather than using k8s informer on the config map + ConfigMapName, + clientset, + onConfigUpdate, + logger.WithField("source", "AgentConfigHandler"), + ) } -func (a *AgentConfigHandler) updateConfig(configData []byte) error { - logger := a.logger.WithField("func", "updateConfig") - logger.Infof("Updating config %s", configData) - - a.mu.Lock() - defer a.mu.Unlock() - - config := AgentConfiguration{} - err := yaml.Unmarshal(configData, &config) - if err != nil { - return err - } - +func onConfigUpdate(config *AgentConfiguration, logger log.FieldLogger) error { if config.Rclone != nil { logger.Infof("Rclone Config loaded %v", config.Rclone) } - - a.config = &config - return nil -} - -// Watch the config file passed for changes and reload and signal listeners when it does -func (a *AgentConfigHandler) watchFile(filePath string) error { - logger := a.logger.WithField("func", "watchFile") - watcher, err := fsnotify.NewWatcher() - if err != nil { - logger.Error(err, "Failed to create watcher") - return err - } - a.watcher = watcher - a.fileWatcherDone = make(chan struct{}) - - go func() { - for { - select { - case event := <-watcher.Events: - logger.Infof("Processing event %v", event) - isCreate := event.Op&fsnotify.Create != 0 - isWrite := event.Op&fsnotify.Write != 0 - if isCreate || isWrite { - b, err := os.ReadFile(filePath) - if err != nil { - logger.WithError(err).Errorf("Failed to read %s", filePath) - } else { - err := a.updateConfig(b) - if err != nil { - logger.WithError(err).Errorf("Failed to update config %s", filePath) - } else { - a.mu.RLock() - for _, ch := range a.listeners { - ch <- *a.config - } - a.mu.RUnlock() - } - } - } - case err := <-watcher.Errors: - logger.Error(err, "watcher error") - case <-a.fileWatcherDone: - return - } - } - }() - - if err = watcher.Add(filePath); err != nil { - a.logger.Errorf("Failed add filePath %s to watcher", filePath) - return err - } - a.logger.Infof("Start to watch config file %s", filePath) - - return nil -} - -func (a *AgentConfigHandler) watchConfigMap(clientset kubernetes.Interface) error { - logger := a.logger.WithField("func", "watchConfigMap") - - watcher := informer.NewInformedWatcher(clientset, a.namespace) - watcher.Watch(ConfigMapName, func(updated *corev1.ConfigMap) { - if data, ok := updated.Data[AgentConfigYamlFilename]; ok { - err := a.updateConfig([]byte(data)) - if err != nil { - logger.Errorf("Failed to update configmap from data in %s", AgentConfigYamlFilename) - } else { - a.mu.RLock() - for _, ch := range a.listeners { - ch <- *a.config - } - a.mu.RUnlock() - } - } - }) - a.configMapWatcherDone = make(chan struct{}) - err := watcher.Start(a.configMapWatcherDone) - if err != nil { - return err - } return nil } diff --git a/scheduler/pkg/agent/config/config_test.go b/scheduler/pkg/agent/config/config_test.go index 2ee367d7c6..e94d8e58fe 100644 --- a/scheduler/pkg/agent/config/config_test.go +++ b/scheduler/pkg/agent/config/config_test.go @@ -10,7 +10,6 @@ the Change License after the Change Date as each is defined in accordance with t package config import ( - "context" "encoding/json" "os" "path" @@ -18,9 +17,6 @@ import ( . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" ) func TestLoadConfigRcloneSecrets(t *testing.T) { @@ -36,12 +32,12 @@ func TestLoadConfigRcloneSecrets(t *testing.T) { } tests := []test{ { - name: "yaml", + name: "test.json", config: `{"rclone":{"config_secrets":["a","b"]}}`, expected: []string{"a", "b"}, }, { - name: "json", + name: "test.yaml", config: `rclone: config_secrets: - a @@ -49,7 +45,7 @@ func TestLoadConfigRcloneSecrets(t *testing.T) { expected: []string{"a", "b"}, }, { - name: "badJson", + name: "badJson.json", config: `{"rclone":{"config_secrets":["a","b"]}`, expected: []string{"a", "b"}, err: true, @@ -59,12 +55,12 @@ func TestLoadConfigRcloneSecrets(t *testing.T) { t.Run(test.name, func(t *testing.T) { configHandler, err := NewAgentConfigHandler("", "", logger, nil) g.Expect(err).To(BeNil()) - err = configHandler.updateConfig([]byte(test.config)) + err = configHandler.UpdateConfig([]byte(test.config), test.name) if test.err { g.Expect(err).ToNot(BeNil()) } else { g.Expect(err).To(BeNil()) - g.Expect(configHandler.config.Rclone.ConfigSecrets).To(Equal(test.expected)) + g.Expect(configHandler.GetConfiguration().Rclone.ConfigSecrets).To(Equal(test.expected)) } }) } @@ -107,75 +103,13 @@ func TestWatchFile(t *testing.T) { defer func() { _ = configHandler.Close() }() g.Expect(err).To(BeNil()) - configHandler.mu.RLock() - g.Expect(configHandler.config).To(Equal(test.contents1)) - configHandler.mu.RUnlock() + g.Expect(configHandler.GetConfiguration()).To(Equal(*test.contents1)) b, err = json.Marshal(test.contents2) g.Expect(err).To(BeNil()) err = os.WriteFile(configFile, b, 0644) g.Expect(err).To(BeNil()) - g.Eventually(configHandler.GetConfiguration).Should(Equal(test.contents2)) - }) - } -} - -func TestWatchConfigMap(t *testing.T) { - t.Logf("Started") - logger := log.New() - log.SetLevel(log.DebugLevel) - g := NewGomegaWithT(t) - type test struct { - name string - configMapV1 *v1.ConfigMap - expectedSecretsV1 []string - configMapV2 *v1.ConfigMap - expectedSecretsV2 []string - errorOnInit bool - } - namespace := "seldon-mesh" - tests := []test{ - { - name: "simple", - configMapV1: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: ConfigMapName, Namespace: namespace}, - Data: map[string]string{ - AgentConfigYamlFilename: "{\"rclone\":{\"config_secrets\":[\"rclone-gs-public\"]}}", - }, - }, - expectedSecretsV1: []string{"rclone-gs-public"}, - configMapV2: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: ConfigMapName, Namespace: namespace}, - Data: map[string]string{ - AgentConfigYamlFilename: "{\"rclone\":{\"config_secrets\":[\"rclone-gs-public2\"]}}", - }, - }, - expectedSecretsV2: []string{"rclone-gs-public2"}, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fakeClientset := fake.NewSimpleClientset(test.configMapV1) - configHandler, err := NewAgentConfigHandler("", namespace, logger, fakeClientset) - defer func() { _ = configHandler.Close() }() - getSecrets := func() []string { - c := configHandler.GetConfiguration() - if c != nil { - return c.Rclone.ConfigSecrets - } - return []string{} - } - g.Expect(err).To(BeNil()) - if !test.errorOnInit { - g.Expect(configHandler.config).ToNot(BeNil()) - g.Eventually(getSecrets).Should(Equal(test.expectedSecretsV1)) - // update - _, err = fakeClientset.CoreV1().ConfigMaps(namespace).Update(context.Background(), test.configMapV2, metav1.UpdateOptions{}) - g.Expect(err).To(BeNil()) - g.Eventually(getSecrets).Should(Equal(test.expectedSecretsV2)) - } else { - g.Expect(configHandler.config).To(BeNil()) - } + g.Eventually(configHandler.GetConfiguration).Should(Equal(*test.contents2)) }) } } diff --git a/scheduler/pkg/agent/rclone/rclone.go b/scheduler/pkg/agent/rclone/rclone.go index fc7e3710e0..43e32343e7 100644 --- a/scheduler/pkg/agent/rclone/rclone.go +++ b/scheduler/pkg/agent/rclone/rclone.go @@ -135,7 +135,8 @@ func (r *RCloneClient) StartConfigListener() error { go r.listenForConfigUpdates() // Add ourself as listener on channel and handle initial config logger.Info("Loading initial rclone configuration") - err := r.loadRcloneConfiguration(r.configHandler.AddListener(r.configChan)) + r.configHandler.AddListener(r.configChan) + err := r.loadRcloneConfiguration(r.configHandler.GetConfiguration()) if err != nil { logger.WithError(err).Errorf("Failed to load rclone defaults") return err @@ -148,7 +149,7 @@ func (r *RCloneClient) listenForConfigUpdates() { for config := range r.configChan { logger.Info("Received config update") go func() { - err := r.loadRcloneConfiguration(&config) + err := r.loadRcloneConfiguration(config) if err != nil { logger.WithError(err).Error("Failed to load rclone defaults") } diff --git a/scheduler/pkg/agent/rclone/rclone_config.go b/scheduler/pkg/agent/rclone/rclone_config.go index 6ad1072752..7b24f89eb8 100644 --- a/scheduler/pkg/agent/rclone/rclone_config.go +++ b/scheduler/pkg/agent/rclone/rclone_config.go @@ -17,14 +17,9 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) -func (r *RCloneClient) loadRcloneConfiguration(config *config.AgentConfiguration) error { +func (r *RCloneClient) loadRcloneConfiguration(config config.AgentConfiguration) error { logger := r.logger.WithField("func", "loadRcloneConfiguration") - if config == nil { - logger.Warn("nil config passed") - return nil - } - // Load any secrets that have Rclone config addedFromSecrets, err := r.loadRcloneSecretsConfiguration(config) if err != nil { @@ -55,11 +50,11 @@ func (r *RCloneClient) loadRcloneConfiguration(config *config.AgentConfiguration return nil } -func (r *RCloneClient) loadRcloneRawConfiguration(config *config.AgentConfiguration) ([]string, error) { +func (r *RCloneClient) loadRcloneRawConfiguration(config config.AgentConfiguration) ([]string, error) { logger := r.logger.WithField("func", "loadRcloneRawConfiguration") var rcloneNamesAdded []string - if len(config.Rclone.Config) > 0 { + if config.Rclone != nil && len(config.Rclone.Config) > 0 { logger.Infof("found %d Rclone configs", len(config.Rclone.Config)) for _, config := range config.Rclone.Config { @@ -77,7 +72,7 @@ func (r *RCloneClient) loadRcloneRawConfiguration(config *config.AgentConfigurat return rcloneNamesAdded, nil } -func (r *RCloneClient) deleteUnusedRcloneConfiguration(config *config.AgentConfiguration, rcloneNamesAdded []string) error { +func (r *RCloneClient) deleteUnusedRcloneConfiguration(config config.AgentConfiguration, rcloneNamesAdded []string) error { logger := r.logger.WithField("func", "deleteUnusedRcloneConfiguration") existingRemotes, err := r.ListRemotes() @@ -107,12 +102,12 @@ func (r *RCloneClient) deleteUnusedRcloneConfiguration(config *config.AgentConfi return nil } -func (r *RCloneClient) loadRcloneSecretsConfiguration(config *config.AgentConfiguration) ([]string, error) { +func (r *RCloneClient) loadRcloneSecretsConfiguration(config config.AgentConfiguration) ([]string, error) { logger := r.logger.WithField("func", "loadRcloneSecretsConfiguration") var rcloneNamesAdded []string // Load any secrets that have Rclone config - if len(config.Rclone.ConfigSecrets) > 0 { + if config.Rclone != nil && len(config.Rclone.ConfigSecrets) > 0 { secretClientSet, err := k8s.CreateClientset() if err != nil { return nil, err diff --git a/scheduler/pkg/agent/rclone/rclone_config_test.go b/scheduler/pkg/agent/rclone/rclone_config_test.go index b2d1a14981..881a68132c 100644 --- a/scheduler/pkg/agent/rclone/rclone_config_test.go +++ b/scheduler/pkg/agent/rclone/rclone_config_test.go @@ -147,7 +147,7 @@ func TestLoadRcloneConfig(t *testing.T) { httpmock.NewStringResponder(200, "{}")) g.Expect(err).To(gomega.BeNil()) - err = rcloneClient.loadRcloneConfiguration(test.agentConfiguration) + err = rcloneClient.loadRcloneConfiguration(*test.agentConfiguration) if test.error { g.Expect(err).ToNot(gomega.BeNil()) } else { diff --git a/scheduler/pkg/agent/rclone/rclone_test.go b/scheduler/pkg/agent/rclone/rclone_test.go index 2ba9b8eb92..56035a7081 100644 --- a/scheduler/pkg/agent/rclone/rclone_test.go +++ b/scheduler/pkg/agent/rclone/rclone_test.go @@ -116,7 +116,7 @@ func TestRcloneCopy(t *testing.T) { body: "{}", createLocalFolder: true, expectError: true, - expectedCallCount: 2, // for config resync + expectedCallCount: 3, // for config resync }, { name: "noFiles", diff --git a/scheduler/pkg/config/watcher.go b/scheduler/pkg/config/watcher.go new file mode 100644 index 0000000000..26dc8a8bf6 --- /dev/null +++ b/scheduler/pkg/config/watcher.go @@ -0,0 +1,288 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package config + +import ( + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "sync" + + "github.com/fsnotify/fsnotify" + log "github.com/sirupsen/logrus" + yaml "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "knative.dev/pkg/configmap" + "knative.dev/pkg/configmap/informer" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" +) + +type ConfigUpdateProcessor[T any, PT util.ConfigHandle[T]] func(config PT, logger log.FieldLogger) error + +type ConfigWatcher[T any, PT util.ConfigHandle[T]] struct { + logger log.FieldLogger + mu sync.RWMutex + config PT + configFilePath string + listeners []chan<- T + namespace string + useConfigMapInformer bool + configMapName string + configMapFileName string + watcher *fsnotify.Watcher + fileWatcherDone chan struct{} + configMapWatcherDone chan struct{} + configUpdateProcessor ConfigUpdateProcessor[T, PT] +} + +func NewConfigWatcher[T any, PT util.ConfigHandle[T]](configPath string, configMapFileName string, namespace string, watchK8sConfigMap bool, configMapName string, clientset kubernetes.Interface, configUpdateProcessor ConfigUpdateProcessor[T, PT], logger log.FieldLogger) (*ConfigWatcher[T, PT], error) { + configHandler := &ConfigWatcher[T, PT]{ + logger: logger.WithField("source", "ConfigWatcher"), + namespace: namespace, + useConfigMapInformer: watchK8sConfigMap, + configMapName: configMapName, + configMapFileName: configMapFileName, + configUpdateProcessor: configUpdateProcessor, + } + + if configPath != "" { + isDir, err := configPathisDir(configPath) + if err != nil { + return nil, fmt.Errorf("config path %s: %w", configPath, err) + } + if isDir { + configPath = path.Join(configPath, configMapFileName) + } + logger.Infof("Init config from path %s", configPath) + err = configHandler.initConfigFromPath(configPath) + if err != nil { + return nil, err + } + _, filename := path.Split(configPath) + if filename != "" && filename != configMapFileName { + logger.Warnf("Watched local config file name %s does not match config map file name %s. This means the config may get updates from two different sources", filename, configMapFileName) + } + } + + err := configHandler.initWatcher(configPath, clientset) + if err != nil { + return nil, err + } + + return configHandler, nil +} + +func configPathisDir(configPath string) (bool, error) { + fileInfo, err := os.Stat(configPath) + if err != nil { + return false, err + } + return fileInfo.IsDir(), nil +} + +func (cw *ConfigWatcher[T, PT]) initConfigFromPath(configPath string) error { + m, err := configmap.Load(path.Dir(configPath)) + if err != nil { + return err + } + + _, configFileName := path.Split(configPath) + if v, ok := m[configFileName]; ok { + err = cw.UpdateConfig([]byte(v), configFileName) + if err != nil { + return err + } + cw.configFilePath = path.Clean(configPath) + return nil + } + return fmt.Errorf("configuration watcher failed to find file %s", configPath) +} + +func (cw *ConfigWatcher[T, PT]) initWatcher(configPath string, clientset kubernetes.Interface) error { + logger := cw.logger.WithField("func", "initWatcher") + if cw.useConfigMapInformer && clientset != nil { // Watch k8s config map + err := cw.watchConfigMap(clientset) + if err != nil { + return err + } + } else if configPath != "" { // Watch local file + err := cw.watchFile(cw.configFilePath) + if err != nil { + return err + } + } else { + logger.Warnf("No config available on initialization") + } + return nil +} + +func (cw *ConfigWatcher[T, PT]) Close() error { + if cw.fileWatcherDone != nil { + close(cw.fileWatcherDone) + } + if cw.configMapWatcherDone != nil { + close(cw.configMapWatcherDone) + } + var err error + if cw.watcher != nil { + err = cw.watcher.Close() + } + for _, c := range cw.listeners { + close(c) + } + return err +} + +func (cw *ConfigWatcher[T, PT]) AddListener(c chan<- T) { + cw.mu.Lock() + defer cw.mu.Unlock() + cw.listeners = append(cw.listeners, c) +} + +func (cw *ConfigWatcher[T, PT]) GetConfiguration() T { + cw.mu.RLock() + defer cw.mu.RUnlock() + if cw.config != nil { + return cw.config.DeepCopy() + } else { + return cw.config.Default() + } +} + +func (cw *ConfigWatcher[T, PT]) UpdateConfig(configData []byte, filename string) error { + logger := cw.logger.WithField("func", "updateConfig") + + cw.mu.Lock() + defer cw.mu.Unlock() + + config := new(T) + canonicalExt := strings.Trim(strings.ToLower(path.Ext(filename)), " ") + if canonicalExt == ".yaml" { + err := yaml.Unmarshal(configData, &config) + if err != nil { + return err + } + } else { + // assume json if not yaml, irrespective of file extension + err := json.Unmarshal(configData, &config) + if err != nil { + return err + } + } + + // The config update processor is passed a pointer to the config so that it can validate and + // modify it as needed based on application logic. Any changes to the config are made while + // holding the config watcher (write) lock. + if cw.configUpdateProcessor != nil { + err := cw.configUpdateProcessor(config, logger) + if err != nil { + return err + } + } + + cw.config = config + return nil +} + +// Watch the config file passed for changes, reload and signal listeners when it does +func (cw *ConfigWatcher[T, PT]) watchFile(filePath string) error { + logger := cw.logger.WithField("func", "watchFile") + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.Error(err, "Failed to create watcher") + return err + } + cw.watcher = watcher + cw.fileWatcherDone = make(chan struct{}) + + configDir, _ := filepath.Split(filePath) + knownConfigFile, _ := filepath.EvalSymlinks(filePath) + + go func() { + for { + select { + case event := <-watcher.Events: + isCreate := event.Op&fsnotify.Create != 0 + isWrite := event.Op&fsnotify.Write != 0 + isRemove := event.Op&fsnotify.Remove != 0 + + // when running in k8s, the file is a symlink that gets replaced on update + currentConfigFile, _ := filepath.EvalSymlinks(filePath) + + existingFileChanged := filepath.Clean(event.Name) == filePath && (isWrite || isCreate) + configSymlinkChanged := currentConfigFile != "" && currentConfigFile != knownConfigFile + + if existingFileChanged || configSymlinkChanged { + knownConfigFile = currentConfigFile + b, err := os.ReadFile(filePath) + if err != nil { + logger.WithError(err).Errorf("Failed to read %s", filePath) + } else { + err := cw.UpdateConfig(b, filePath) + if err != nil { + logger.WithError(err).Errorf("Failed to update config %s", filePath) + } else { + cw.mu.RLock() + for _, ch := range cw.listeners { + ch <- cw.config.DeepCopy() + } + cw.mu.RUnlock() + } + } + } else if filepath.Clean(event.Name) == filePath && isRemove { + return + } + case err := <-watcher.Errors: + logger.Error(err, "watcher error") + case <-cw.fileWatcherDone: + return + } + } + }() + + if err = watcher.Add(configDir); err != nil { + cw.logger.Errorf("Failed to add file path %s to config watcher", filePath) + return err + } + cw.logger.Infof("Starting to watch config file %s", filePath) + + return nil +} + +func (cw *ConfigWatcher[T, PT]) watchConfigMap(clientset kubernetes.Interface) error { + logger := cw.logger.WithField("func", "watchConfigMap") + + watcher := informer.NewInformedWatcher(clientset, cw.namespace) + watcher.Watch(cw.configMapName, func(updated *corev1.ConfigMap) { + filename := cw.configMapFileName + if data, ok := updated.Data[filename]; ok { + err := cw.UpdateConfig([]byte(data), cw.configMapName) + if err != nil { + logger.Errorf("Failed to update config with data in configmap %s.%s/%s", cw.configMapName, cw.namespace, filename) + } else { + cw.mu.RLock() + for _, ch := range cw.listeners { + ch <- cw.config.DeepCopy() + } + cw.mu.RUnlock() + } + } + }) + cw.configMapWatcherDone = make(chan struct{}) + err := watcher.Start(cw.configMapWatcherDone) + if err != nil { + return err + } + return nil +} diff --git a/scheduler/pkg/kafka/dataflow/server.go b/scheduler/pkg/kafka/dataflow/server.go index 9944f55589..7884831141 100644 --- a/scheduler/pkg/kafka/dataflow/server.go +++ b/scheduler/pkg/kafka/dataflow/server.go @@ -28,6 +28,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka" cr "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/conflict-resolution" + scaling_config "github.com/seldonio/seldon-core/scheduler/v2/pkg/scaling/config" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -48,6 +49,11 @@ type ChainerServer struct { loadBalancer util.LoadBalancer conflictResolutioner *cr.ConflictResolutioner[pipeline.PipelineStatus] chainerMutex sync.Map + configUpdatesMutex sync.Mutex + scalingConfigUpdates chan scaling_config.ScalingConfig + currentScalingConfig scaling_config.ScalingConfig + done chan struct{} + grpcServer *grpc.Server chainer.UnimplementedChainerServer health.UnimplementedHealthCheckServiceServer } @@ -58,8 +64,15 @@ type ChainerSubscription struct { fin chan bool } -func NewChainerServer(logger log.FieldLogger, eventHub *coordinator.EventHub, pipelineHandler pipeline.PipelineHandler, - namespace string, loadBalancer util.LoadBalancer, kafkaConfig *kafka_config.KafkaConfig) (*ChainerServer, error) { +func NewChainerServer( + logger log.FieldLogger, + eventHub *coordinator.EventHub, + pipelineHandler pipeline.PipelineHandler, + namespace string, + loadBalancer util.LoadBalancer, + kafkaConfig *kafka_config.KafkaConfig, + scalingConfigHdl *scaling_config.ScalingConfigHandler, +) (*ChainerServer, error) { conflictResolutioner := cr.NewConflictResolution[pipeline.PipelineStatus](logger) topicNamer, err := kafka.NewTopicNamer(namespace, kafkaConfig.TopicPrefix) if err != nil { @@ -74,6 +87,8 @@ func NewChainerServer(logger log.FieldLogger, eventHub *coordinator.EventHub, pi loadBalancer: loadBalancer, conflictResolutioner: conflictResolutioner, chainerMutex: sync.Map{}, + scalingConfigUpdates: make(chan scaling_config.ScalingConfig), + done: make(chan struct{}), } eventHub.RegisterPipelineEventHandler( @@ -82,9 +97,32 @@ func NewChainerServer(logger log.FieldLogger, eventHub *coordinator.EventHub, pi c.logger, c.handlePipelineEvent, ) + + if scalingConfigHdl != nil { + c.currentScalingConfig = scalingConfigHdl.GetConfiguration() + scalingConfigHdl.AddListener(c.scalingConfigUpdates) + go c.handleScalingConfigChanges() + } else { + c.currentScalingConfig = scaling_config.DefaultScalingConfig + } + + c.configUpdatesMutex.Lock() + scaling_config.LogWhenUsingDefaultScalingConfig(&c.currentScalingConfig, c.logger) + c.configUpdatesMutex.Unlock() + return c, nil } +func (c *ChainerServer) Stop() { + if c.grpcServer != nil { + c.grpcServer.GracefulStop() + c.logger.Info("Scheduler closing gRPC server managing connections from dataflow-engine replicas") + } + c.logger.Info("Stop watching for scaling config changes") + close(c.done) + c.StopSendPipelineEvents() +} + func (c *ChainerServer) StartGrpcServer(agentPort uint) error { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", agentPort)) if err != nil { @@ -99,6 +137,7 @@ func (c *ChainerServer) StartGrpcServer(agentPort uint) error { grpcServer := grpc.NewServer(grpcOptions...) chainer.RegisterChainerServer(grpcServer, c) health.RegisterHealthCheckServiceServer(grpcServer, c) + c.grpcServer = grpcServer c.logger.Printf("Chainer server running on %d", agentPort) return grpcServer.Serve(lis) @@ -489,6 +528,33 @@ func (c *ChainerServer) rebalance() { } } +func (c *ChainerServer) handleScalingConfigChanges() { + logger := c.logger.WithField("func", "handleScalingConfigChanges") + for { + select { + case newConfig := <-c.scalingConfigUpdates: + if newConfig.Pipelines == nil { + continue + } + c.configUpdatesMutex.Lock() + if newConfig.Pipelines.MaxShardCountMultiplier != c.currentScalingConfig.Pipelines.MaxShardCountMultiplier { + logger.Info("Updating mapping of Pipelines onto dataflow-engine replicas following scaling config change") + // lock Mutex to avoid updating load balancer if a concurrent rebalance is in progress + c.mu.Lock() + c.currentScalingConfig = newConfig + scaling_config.LogWhenUsingDefaultScalingConfig(&c.currentScalingConfig, logger) + c.loadBalancer.UpdatePartitions(newConfig.Pipelines.MaxShardCountMultiplier) + c.mu.Unlock() + // rebalance all pipelines onto available dataflow-engine replicas according to new config + c.rebalance() + } + c.configUpdatesMutex.Unlock() + case <-c.done: + return + } + } +} + func (c *ChainerServer) handlePipelineEvent(event coordinator.PipelineEventMsg) { logger := c.logger.WithField("func", "handlePipelineEvent") if event.ExperimentUpdate { diff --git a/scheduler/pkg/kafka/dataflow/server_test.go b/scheduler/pkg/kafka/dataflow/server_test.go index 4aae731f0f..58e9d72076 100644 --- a/scheduler/pkg/kafka/dataflow/server_test.go +++ b/scheduler/pkg/kafka/dataflow/server_test.go @@ -870,7 +870,7 @@ func createTestScheduler(t *testing.T, serverName string) (*ChainerServer, *coor kc, _ := kafka_config.NewKafkaConfig(configFilePath, "debug") b := util.NewRingLoadBalancer(1) b.AddServer(serverName) - s, _ := NewChainerServer(logger, eventHub, pipelineServer, "test-ns", b, kc) + s, _ := NewChainerServer(logger, eventHub, pipelineServer, "test-ns", b, kc, nil) return s, eventHub } diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index f9d331ccd7..98ab655e80 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -110,19 +110,11 @@ func NewInferKafkaHandler( consumerName string, schemaRegistryClient schemaregistry.Client, ) (*InferKafkaHandler, error) { - defaultReplicationFactor, err := util.GetIntEnvar(envDefaultReplicationFactor, defaultReplicationFactor) + replicationFactor, err := util.GetIntEnvar(envDefaultReplicationFactor, defaultReplicationFactor) if err != nil { return nil, fmt.Errorf("error getting default replication factor: %v", err) } - replicationFactor, err := GetIntConfigMapValue(topicsConfigMap, replicationFactorKey, defaultReplicationFactor) - if err != nil { - return nil, fmt.Errorf("invalid Kafka topic configuration: %v", err) - } - defaultNumPartitions, err := util.GetIntEnvar(envDefaultNumPartitions, defaultNumPartitions) - if err != nil { - return nil, fmt.Errorf("error getting default number of partitions: %v", err) - } - numPartitions, err := GetIntConfigMapValue(topicsConfigMap, numPartitionsKey, defaultNumPartitions) + numPartitions, err := util.GetIntEnvar(envDefaultNumPartitions, defaultNumPartitions) if err != nil { return nil, fmt.Errorf("invalid Kafka topic configuration: %w", err) } diff --git a/scheduler/pkg/scaling/config/config.go b/scheduler/pkg/scaling/config/config.go new file mode 100644 index 0000000000..87638c2c5c --- /dev/null +++ b/scheduler/pkg/scaling/config/config.go @@ -0,0 +1,192 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package config + +import ( + log "github.com/sirupsen/logrus" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/config" +) + +const ( + ScalingConfigYamlFilename = "scaling.yaml" + ConfigMapName = "seldon-scaling" +) + +var ( + DefaultPipelineScalingConfig = PipelineScalingConfig{ + MaxShardCountMultiplier: 1, + isDefault: true, + } + DefaultModelScalingConfig = ModelScalingConfig{ + Enable: false, + isDefault: true, + } + DefaultServerScalingConfig = ServerScalingConfig{ + Enable: true, + ScaleDownPackingEnabled: false, + ScaleDownPackingPercentage: 0, + isDefault: true, + } + DefaultScalingConfig = ScalingConfig{ + Models: &DefaultModelScalingConfig, + Servers: &DefaultServerScalingConfig, + Pipelines: &DefaultPipelineScalingConfig, + isDefault: true, + } +) + +type ScalingConfig struct { + Models *ModelScalingConfig `json:"models,omitempty" yaml:"models,omitempty"` + Servers *ServerScalingConfig `json:"servers,omitempty" yaml:"servers,omitempty"` + // Scaling config impacting pipeline-gateway, dataflow-engine and model-gateway + Pipelines *PipelineScalingConfig `json:"pipelines,omitempty" yaml:"pipelines,omitempty"` + isDefault bool +} + +type ModelScalingConfig struct { + Enable bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + isDefault bool +} + +type ServerScalingConfig struct { + Enable bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + ScaleDownPackingEnabled bool `json:"scaleDownPackingEnabled,omitempty" yaml:"scaleDownPackingEnabled,omitempty"` + ScaleDownPackingPercentage int `json:"scaleDownPackingPercentage,omitempty" yaml:"scaleDownPackingPercentage,omitempty"` + isDefault bool +} + +type PipelineScalingConfig struct { + // MaxShardCountMultiplier influences the way the inferencing workload is sharded over the + // replicas of pipeline components. + // + // - For each of pipeline-gateway and dataflow-engine, the max number of replicas is + // `maxShardCountMultiplier * number of pipelines` + // - For model-gateway, the max number of replicas is + // `maxShardCountMultiplier * number of consumers` + // + // It doesn't make sense to set this to a value larger than the number of partitions for kafka + // topics used in the Core 2 install. + MaxShardCountMultiplier int `json:"maxShardCountMultiplier,omitempty" yaml:"maxShardCountMultiplier,omitempty"` + isDefault bool +} + +func (sc *ScalingConfig) DeepCopy() ScalingConfig { + var modelsCopy *ModelScalingConfig + var serversCopy *ServerScalingConfig + var pipelinesCopy *PipelineScalingConfig + + var modelsDeepCopy ModelScalingConfig + if sc.Models != nil { + modelsDeepCopy = *sc.Models + } else { + modelsDeepCopy = ModelScalingConfig(DefaultModelScalingConfig) + } + modelsCopy = &modelsDeepCopy + + var serversDeepCopy ServerScalingConfig + if sc.Servers != nil { + serversDeepCopy = *sc.Servers + } else { + serversDeepCopy = ServerScalingConfig(DefaultServerScalingConfig) + } + serversCopy = &serversDeepCopy + + var pipelinesDeepCopy PipelineScalingConfig + if sc.Pipelines != nil { + pipelinesDeepCopy = *sc.Pipelines + } else { + pipelinesDeepCopy = PipelineScalingConfig(DefaultPipelineScalingConfig) + } + pipelinesCopy = &pipelinesDeepCopy + + res := ScalingConfig{ + Models: modelsCopy, + Servers: serversCopy, + Pipelines: pipelinesCopy, + isDefault: sc.isDefault, + } + return res +} + +func (sc *ScalingConfig) Default() ScalingConfig { + return DefaultScalingConfig.DeepCopy() +} + +func (sc *ScalingConfig) IsDefault() bool { + return sc.isDefault || + (sc.Models.IsDefault() && sc.Servers.IsDefault() && sc.Pipelines.IsDefault()) +} + +func (msc *ModelScalingConfig) IsDefault() bool { + return msc.isDefault +} + +func (ssc *ServerScalingConfig) IsDefault() bool { + return ssc.isDefault +} + +func (psc *PipelineScalingConfig) IsDefault() bool { + return psc.isDefault +} + +func LogWhenUsingDefaultScalingConfig(scalingConfig *ScalingConfig, logger log.FieldLogger) { + if scalingConfig.IsDefault() { + logger.Infof("Using default scaling config") + } else { + if scalingConfig.Models != nil && scalingConfig.Models.IsDefault() { + logger.Infof("Using default model scaling config") + } + if scalingConfig.Servers != nil && scalingConfig.Servers.IsDefault() { + logger.Infof("Using default server scaling config") + } + if scalingConfig.Pipelines != nil && scalingConfig.Pipelines.IsDefault() { + logger.Infof("Using default pipeline scaling config") + } + } +} + +type ScalingConfigHandler = config.ConfigWatcher[ScalingConfig, *ScalingConfig] + +func NewScalingConfigHandler(configPath string, namespace string, logger log.FieldLogger) (*ScalingConfigHandler, error) { + return config.NewConfigWatcher( + configPath, + ScalingConfigYamlFilename, + namespace, + false, // watch mounted config file rather than using k8s informer on the config map + ConfigMapName, + nil, + onConfigUpdate, + logger.WithField("source", "ScalingConfigHandler"), + ) +} + +func onConfigUpdate(config *ScalingConfig, logger log.FieldLogger) error { + // Any missing top-level config sections (Models, Server, Pipelines) are set to their defaults. + // However, setting an empty section is treated differently, with all the fields being + // considered explicitly set to the zero value of their datatype. + // + // We also ensure minimal validation of values, so that (for example) when the zero value of + // the type is not valid, we set it to the default value. + if config.Pipelines == nil { + config.Pipelines = &DefaultPipelineScalingConfig + } else { + if config.Pipelines.MaxShardCountMultiplier <= 0 { + config.Pipelines.MaxShardCountMultiplier = DefaultPipelineScalingConfig.MaxShardCountMultiplier + } + } + if config.Models == nil { + config.Models = &DefaultModelScalingConfig + } + if config.Servers == nil { + config.Servers = &DefaultServerScalingConfig + } + return nil +} diff --git a/scheduler/pkg/server/control_plane_test.go b/scheduler/pkg/server/control_plane_test.go index de6d6141be..b51132a4b3 100644 --- a/scheduler/pkg/server/control_plane_test.go +++ b/scheduler/pkg/server/control_plane_test.go @@ -135,7 +135,7 @@ func TestSubscribeControlPlane(t *testing.T) { modelGwLoadBalancer := util.NewRingLoadBalancer(1) pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( - logger, nil, nil, nil, nil, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}, + logger, nil, nil, nil, nil, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}, ) sync.Signals(1) diff --git a/scheduler/pkg/server/server.go b/scheduler/pkg/server/server.go index 13ade6d43e..08e9577d86 100644 --- a/scheduler/pkg/server/server.go +++ b/scheduler/pkg/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" cr "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/conflict-resolution" + scaling_config "github.com/seldonio/seldon-core/scheduler/v2/pkg/scaling/config" scheduler2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment" @@ -77,6 +78,11 @@ type SchedulerServer struct { config SchedulerServerConfig modelGwLoadBalancer *util.RingLoadBalancer pipelineGWLoadBalancer *util.RingLoadBalancer + scalingConfigUpdates chan scaling_config.ScalingConfig + currentScalingConfig *scaling_config.ScalingConfig + mu sync.Mutex + done chan struct{} + grpcServer *grpc.Server consumerGroupConfig *ConsumerGroupConfig eventHub *coordinator.EventHub tlsOptions seldontls.TLSOptions @@ -194,6 +200,7 @@ func (s *SchedulerServer) startServer(port uint, secure bool) error { opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler())) opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep)) grpcServer := grpc.NewServer(opts...) + s.grpcServer = grpcServer pb.RegisterSchedulerServer(grpcServer, s) health.RegisterHealthCheckServiceServer(grpcServer, s) @@ -257,6 +264,7 @@ func NewSchedulerServer( consumerGroupIdPrefix string, modelGwLoadBalancer *util.RingLoadBalancer, pipelineGWLoadBalancer *util.RingLoadBalancer, + scalingConfigHdl *scaling_config.ScalingConfigHandler, tlsOptions seldontls.TLSOptions, ) *SchedulerServer { loggerWithField := logger.WithField("source", "SchedulerServer") @@ -301,6 +309,8 @@ func NewSchedulerServer( config: config, modelGwLoadBalancer: modelGwLoadBalancer, pipelineGWLoadBalancer: pipelineGWLoadBalancer, + scalingConfigUpdates: make(chan scaling_config.ScalingConfig), + done: make(chan struct{}), consumerGroupConfig: consumerGroupConfig, eventHub: eventHub, tlsOptions: tlsOptions, @@ -337,9 +347,77 @@ func NewSchedulerServer( s.handleServerEvents, ) + if scalingConfigHdl != nil { + initScalingConfig := scalingConfigHdl.GetConfiguration() + s.currentScalingConfig = &initScalingConfig + scalingConfigHdl.AddListener(s.scalingConfigUpdates) + go s.handleScalingConfigChanges() + } else { + s.currentScalingConfig = &scaling_config.DefaultScalingConfig + } + + s.mu.Lock() + scaling_config.LogWhenUsingDefaultScalingConfig(s.currentScalingConfig, loggerWithField) + s.mu.Unlock() + return s } +func (s *SchedulerServer) Stop() { + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + s.logger.Info("Scheduler closing gRPC server managing connections from controller and gateways") + } + close(s.done) +} + +func (s *SchedulerServer) handleScalingConfigChanges() { + logger := s.logger.WithField("func", "handleScalingConfigChanges") + for { + select { + case newScalingConfig := <-s.scalingConfigUpdates: + if newScalingConfig.Pipelines == nil { + continue + } + s.mu.Lock() + if newScalingConfig.Pipelines.MaxShardCountMultiplier != s.currentScalingConfig.Pipelines.MaxShardCountMultiplier { + logger.Info("Updating mapping of Pipelines and Models onto pipeline-gateway and model-gateway replicas following scaling config change") + wg := sync.WaitGroup{} + wg.Add(2) + s.currentScalingConfig = &newScalingConfig + scaling_config.LogWhenUsingDefaultScalingConfig(s.currentScalingConfig, logger) + go func() { + // lock Mutex to avoid updating load balancer if a concurrent rebalance is in progress + s.pipelineEventStream.mu.Lock() + s.pipelineGWLoadBalancer.UpdatePartitions(newScalingConfig.Pipelines.MaxShardCountMultiplier) + s.pipelineEventStream.mu.Unlock() + + // There is a chance that another concurrent rebalance will start here (applying the + // updated partitions), but it means we'll just do one extra rebalance that will + // distribute the pipelines in the exact same way (given no other infra changes) + // Given that config changes should be infrequent, this should be ok. + + // rebalance all pipelines onto available pipeline-gw replicas according to new config + s.pipelineGwRebalance() + wg.Done() + }() + + go func() { + s.modelEventStream.mu.Lock() + s.modelGwLoadBalancer.UpdatePartitions(newScalingConfig.Pipelines.MaxShardCountMultiplier) + s.modelEventStream.mu.Unlock() + s.modelGwRebalance() + wg.Done() + }() + wg.Wait() + } + s.mu.Unlock() + case <-s.done: + return + } + } +} + func (s *SchedulerServer) ServerNotify(ctx context.Context, req *pb.ServerNotifyRequest) (*pb.ServerNotifyResponse, error) { logger := s.logger.WithField("func", "ServerNotify") // numExpectedReplicas is only used when we are doing the first sync diff --git a/scheduler/pkg/server/server_status_test.go b/scheduler/pkg/server/server_status_test.go index 75915bd37f..19f0d240c2 100644 --- a/scheduler/pkg/server/server_status_test.go +++ b/scheduler/pkg/server/server_status_test.go @@ -1328,7 +1328,7 @@ func createTestSchedulerImpl(t *testing.T, config SchedulerServerConfig) (*Sched s := NewSchedulerServer( logger, schedulerStore, experimentServer, pipelineServer, scheduler, eventHub, synchroniser.NewSimpleSynchroniser(time.Duration(10*time.Millisecond)), config, - "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}, + "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}, ) return s, eventHub diff --git a/scheduler/pkg/server/server_test.go b/scheduler/pkg/server/server_test.go index a90fac7c69..97506d011c 100644 --- a/scheduler/pkg/server/server_test.go +++ b/scheduler/pkg/server/server_test.go @@ -73,7 +73,7 @@ func TestLoadModel(t *testing.T) { pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( logger, schedulerStore, experimentServer, pipelineServer, - scheduler, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}) + scheduler, eventHub, sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}) sync.Signals(1) mockAgent := &mockAgentHandler{} @@ -379,7 +379,7 @@ func TestUnloadModel(t *testing.T) { pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( logger, schedulerStore, experimentServer, pipelineServer, scheduler, eventHub, - sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}) + sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}) sync.Signals(1) return s, mockAgent, eventHub } @@ -719,7 +719,7 @@ func TestServerNotify(t *testing.T) { pipelineGwLoadBalancer := util.NewRingLoadBalancer(1) s := NewSchedulerServer( logger, schedulerStore, nil, nil, scheduler, eventHub, - sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, tls.TLSOptions{}) + sync, SchedulerServerConfig{}, "", "", modelGwLoadBalancer, pipelineGwLoadBalancer, nil, tls.TLSOptions{}) return s, sync } diff --git a/scheduler/pkg/util/config.go b/scheduler/pkg/util/config.go new file mode 100644 index 0000000000..11a0707dc1 --- /dev/null +++ b/scheduler/pkg/util/config.go @@ -0,0 +1,14 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package util + +type ConfigHandle[T any] interface { + DeepCopier[T] + Defaulter[T] +} diff --git a/scheduler/pkg/util/deepcopy.go b/scheduler/pkg/util/deepcopy.go new file mode 100644 index 0000000000..08a3fe8c6d --- /dev/null +++ b/scheduler/pkg/util/deepcopy.go @@ -0,0 +1,14 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package util + +type DeepCopier[T any] interface { + DeepCopy() T + *T +} diff --git a/scheduler/pkg/util/defaults.go b/scheduler/pkg/util/defaults.go new file mode 100644 index 0000000000..c6ef435d07 --- /dev/null +++ b/scheduler/pkg/util/defaults.go @@ -0,0 +1,13 @@ +/* +Copyright (c) 2025 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ +package util + +type Defaulter[T any] interface { + Default() T +} diff --git a/scheduler/pkg/util/loadbalancer.go b/scheduler/pkg/util/loadbalancer.go index c5bd07095f..6a7a39bcef 100644 --- a/scheduler/pkg/util/loadbalancer.go +++ b/scheduler/pkg/util/loadbalancer.go @@ -10,17 +10,21 @@ the Change License after the Change Date as each is defined in accordance with t package util import ( + "sync" + "github.com/serialx/hashring" ) type LoadBalancer interface { AddServer(serverName string) RemoveServer(serverName string) + UpdatePartitions(numPartitions int) GetServersForKey(key string) []string } type RingLoadBalancer struct { ring *hashring.HashRing + mu sync.RWMutex nodes map[string]bool replicationFactor int numPartitions int @@ -34,19 +38,32 @@ func NewRingLoadBalancer(numPartitions int) *RingLoadBalancer { } } +func (lb *RingLoadBalancer) UpdatePartitions(numPartitions int) { + lb.mu.Lock() + defer lb.mu.Unlock() + lb.numPartitions = numPartitions + lb.replicationFactor = min(len(lb.nodes), lb.numPartitions) +} + func (lb *RingLoadBalancer) AddServer(serverName string) { + lb.mu.Lock() + defer lb.mu.Unlock() lb.ring = lb.ring.AddNode(serverName) lb.nodes[serverName] = true lb.replicationFactor = min(len(lb.nodes), lb.numPartitions) } func (lb *RingLoadBalancer) RemoveServer(serverName string) { + lb.mu.Lock() + defer lb.mu.Unlock() lb.ring = lb.ring.RemoveNode(serverName) delete(lb.nodes, serverName) lb.replicationFactor = min(len(lb.nodes), lb.numPartitions) } func (lb *RingLoadBalancer) GetServersForKey(key string) []string { + lb.mu.RLock() + defer lb.mu.RUnlock() nodes, _ := lb.ring.GetNodes(key, lb.replicationFactor) return nodes }