diff --git a/apix/config/v1alpha1/endpointpickerconfig_types.go b/apix/config/v1alpha1/endpointpickerconfig_types.go index 705e37231..8e80e9d5e 100644 --- a/apix/config/v1alpha1/endpointpickerconfig_types.go +++ b/apix/config/v1alpha1/endpointpickerconfig_types.go @@ -39,13 +39,25 @@ type EndpointPickerConfig struct { // SchedulingProfiles is the list of named SchedulingProfiles // that will be created. SchedulingProfiles []SchedulingProfile `json:"schedulingProfiles"` + + // +optional + // FeatureGates is a set of flags that enable various experimental features with the EPP. + // If omitted non of these experimental features will be enabled. + FeatureGates FeatureGates `json:"featureGates,omitempty"` + + // +optional + // SaturationDetector when present specifies the configuration of the + // Saturation detector. If not present, default values are used. + SaturationDetector *SaturationDetector `json:"saturationDetector,omitempty"` } func (cfg EndpointPickerConfig) String() string { return fmt.Sprintf( - "{Plugins: %v, SchedulingProfiles: %v}", + "{Plugins: %v, SchedulingProfiles: %v, FeatureGates: %v, SaturationDetector: %v}", cfg.Plugins, cfg.SchedulingProfiles, + cfg.FeatureGates, + cfg.SaturationDetector, ) } @@ -118,3 +130,64 @@ func (sp SchedulingPlugin) String() string { } return fmt.Sprintf("{PluginRef: %s%s}", sp.PluginRef, weight) } + +// FeatureGates is a set of flags that enable various experimental features with the EPP +type FeatureGates []string + +func (fg FeatureGates) String() string { + if fg == nil { + return "{}" + } + + result := "" + for _, gate := range fg { + result += gate + "," + } + + if len(result) > 0 { + result = result[:len(result)-1] + } + return "{" + result + "}" +} + +// SaturationDetector +type SaturationDetector struct { + // +optional + // QueueDepthThreshold defines the backend waiting queue size above which a + // pod is considered to have insufficient capacity for new requests. + QueueDepthThreshold int `json:"queueDepthThreshold,omitempty"` + + // +optional + // KVCacheUtilThreshold defines the KV cache utilization (0.0 to 1.0) above + // which a pod is considered to have insufficient capacity. + KVCacheUtilThreshold float64 `json:"kvCacheUtilThreshold,omitempty"` + + // +optional + // MetricsStalenessThreshold defines how old a pod's metrics can be. + // If a pod's metrics are older than this, it might be excluded from + // "good capacity" considerations or treated as having no capacity for + // safety. + MetricsStalenessThreshold metav1.Duration `json:"metricsStalenessThreshold,omitempty"` +} + +func (sd *SaturationDetector) String() string { + result := "" + if sd != nil { + if sd.QueueDepthThreshold != 0 { + result += fmt.Sprintf("QueueDepthThreshold: %d", sd.QueueDepthThreshold) + } + if sd.KVCacheUtilThreshold != 0.0 { + if len(result) != 0 { + result += ", " + } + result += fmt.Sprintf("KVCacheUtilThreshold: %g", sd.KVCacheUtilThreshold) + } + if sd.MetricsStalenessThreshold.Duration != 0.0 { + if len(result) != 0 { + result += ", " + } + result += fmt.Sprintf("MetricsStalenessThreshold: %s", sd.MetricsStalenessThreshold) + } + } + return "{" + result + "}" +} diff --git a/apix/config/v1alpha1/zz_generated.deepcopy.go b/apix/config/v1alpha1/zz_generated.deepcopy.go index 1326f357b..701f73793 100644 --- a/apix/config/v1alpha1/zz_generated.deepcopy.go +++ b/apix/config/v1alpha1/zz_generated.deepcopy.go @@ -43,6 +43,16 @@ func (in *EndpointPickerConfig) DeepCopyInto(out *EndpointPickerConfig) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.FeatureGates != nil { + in, out := &in.FeatureGates, &out.FeatureGates + *out = make(FeatureGates, len(*in)) + copy(*out, *in) + } + if in.SaturationDetector != nil { + in, out := &in.SaturationDetector, &out.SaturationDetector + *out = new(SaturationDetector) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EndpointPickerConfig. @@ -63,6 +73,25 @@ func (in *EndpointPickerConfig) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in FeatureGates) DeepCopyInto(out *FeatureGates) { + { + in := &in + *out = make(FeatureGates, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FeatureGates. +func (in FeatureGates) DeepCopy() FeatureGates { + if in == nil { + return nil + } + out := new(FeatureGates) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PluginSpec) DeepCopyInto(out *PluginSpec) { *out = *in @@ -83,6 +112,22 @@ func (in *PluginSpec) DeepCopy() *PluginSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SaturationDetector) DeepCopyInto(out *SaturationDetector) { + *out = *in + out.MetricsStalenessThreshold = in.MetricsStalenessThreshold +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SaturationDetector. +func (in *SaturationDetector) DeepCopy() *SaturationDetector { + if in == nil { + return nil + } + out := new(SaturationDetector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SchedulingPlugin) DeepCopyInto(out *SchedulingPlugin) { *out = *in diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 429b5c348..f80dacf1a 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -46,6 +46,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" @@ -62,17 +63,10 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" "sigs.k8s.io/gateway-api-inference-extension/version" ) -const ( - // enableExperimentalDatalayerV2 defines the environment variable - // used as feature flag for the pluggable data layer. - enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2" -) - var ( grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") @@ -156,9 +150,6 @@ func (r *Runner) Run(ctx context.Context) error { }) setupLog.Info("Flags processed", "flags", flags) - // --- Load Configurations from Environment Variables --- - sdConfig := saturationdetector.LoadConfigFromEnv() - // --- Get Kubernetes Config --- cfg, err := ctrl.GetConfig() if err != nil { @@ -166,9 +157,14 @@ func (r *Runner) Run(ctx context.Context) error { return err } + eppConfig, err := r.parseConfiguration(ctx) + if err != nil { + setupLog.Error(err, "Failed to parse plugins configuration") + return err + } + // --- Setup Datastore --- - useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog) - epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2) + epf, err := r.setupMetricsCollection(setupLog, eppConfig.FeatureConfig[datalayer.FeatureGate]) if err != nil { return err } @@ -233,12 +229,6 @@ func (r *Runner) Run(ctx context.Context) error { runtime.SetBlockProfileRate(1) } - err = r.parsePluginsConfiguration(ctx) - if err != nil { - setupLog.Error(err, "Failed to parse plugins configuration") - return err - } - // --- Initialize Core EPP Components --- if r.schedulerConfig == nil { err := errors.New("scheduler config must be set either by config api or through code") @@ -250,7 +240,7 @@ func (r *Runner) Run(ctx context.Context) error { scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig) - saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog) + saturationDetector := saturationdetector.NewDetector(&eppConfig.SaturationDetectorConfig, setupLog) director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig) @@ -267,7 +257,7 @@ func (r *Runner) Run(ctx context.Context) error { MetricsStalenessThreshold: *metricsStalenessThreshold, Director: director, SaturationDetector: saturationDetector, - UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag + UseExperimentalDatalayerV2: eppConfig.FeatureConfig[datalayer.FeatureGate], // pluggable data layer feature flag } if err := serverRunner.SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "Failed to setup EPP controllers") @@ -310,9 +300,9 @@ func (r *Runner) registerInTreePlugins() { plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory) } -func (r *Runner) parsePluginsConfiguration(ctx context.Context) error { +func (r *Runner) parseConfiguration(ctx context.Context) (*config.Config, error) { if *configText == "" && *configFile == "" { - return nil // configuring through code, not through file + return nil, nil // configuring through code, not through file } logger := log.FromContext(ctx) @@ -324,24 +314,26 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error { var err error configBytes, err = os.ReadFile(*configFile) if err != nil { - return fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err) + return nil, fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err) } } + loader.RegisterFeatureGate(datalayer.FeatureGate) + r.registerInTreePlugins() handle := plugins.NewEppHandle(ctx) - config, err := loader.LoadConfig(configBytes, handle, logger) + cfg, err := loader.LoadConfig(configBytes, handle, logger) if err != nil { - return fmt.Errorf("failed to load the configuration - %w", err) + return nil, fmt.Errorf("failed to load the configuration - %w", err) } - r.schedulerConfig = config.SchedulerConfig + r.schedulerConfig = cfg.SchedulerConfig // Add requestControl plugins r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...) logger.Info("loaded configuration from file/text successfully") - return nil + return cfg, nil } func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) { diff --git a/mkdocs.yml b/mkdocs.yml index 8262f6fb0..ef34c5690 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -71,7 +71,7 @@ nav: - InferencePool Rollout: guides/inferencepool-rollout.md - Metrics and Observability: guides/metrics-and-observability.md - Configuration Guide: - - Configuring the plugins via configuration YAML file: guides/epp-configuration/config-text.md + - Configuring the EndPoint Picker via configuration YAML file: guides/epp-configuration/config-text.md - Prefix Cache Aware Plugin: guides/epp-configuration/prefix-aware.md - Troubleshooting Guide: guides/troubleshooting.md - Implementer Guides: diff --git a/pkg/epp/config/config.go b/pkg/epp/config/config.go index 7b07a0746..829e58137 100644 --- a/pkg/epp/config/config.go +++ b/pkg/epp/config/config.go @@ -16,9 +16,14 @@ limitations under the License. package config -import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" +) // Config is the configuration loaded from the text based configuration type Config struct { - SchedulerConfig *scheduling.SchedulerConfig + SchedulerConfig *scheduling.SchedulerConfig + FeatureConfig map[string]bool + SaturationDetectorConfig saturationdetector.Config } diff --git a/pkg/epp/config/loader/configloader.go b/pkg/epp/config/loader/configloader.go index 8e80b037d..2f1f77a9f 100644 --- a/pkg/epp/config/loader/configloader.go +++ b/pkg/epp/config/loader/configloader.go @@ -29,12 +29,15 @@ import ( configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" ) var scheme = runtime.NewScheme() +var registeredFeatureGates = map[string]struct{}{} + func init() { utilruntime.Must(configapi.Install(scheme)) } @@ -48,6 +51,10 @@ func LoadConfig(configBytes []byte, handle plugins.Handle, logger logr.Logger) ( logger.Info("Loaded configuration", "config", rawConfig) + if err = validateFeatureGates(rawConfig.FeatureGates); err != nil { + return nil, fmt.Errorf("failed to validate feature gates - %w", err) + } + setDefaultsPhaseOne(rawConfig) // instantiate loaded plugins @@ -69,6 +76,8 @@ func LoadConfig(configBytes []byte, handle plugins.Handle, logger logr.Logger) ( if err != nil { return nil, err } + config.FeatureConfig = loadFeatureConfig(rawConfig.FeatureGates) + config.SaturationDetectorConfig = loadSaturationDetectorConfig(rawConfig.SaturationDetector) return config, nil } @@ -116,6 +125,39 @@ func loadSchedulerConfig(configProfiles []configapi.SchedulingProfile, handle pl return scheduling.NewSchedulerConfig(profileHandler, profiles), nil } +func loadFeatureConfig(featureGates configapi.FeatureGates) map[string]bool { + featureConfig := map[string]bool{} + + for gate := range registeredFeatureGates { + featureConfig[gate] = false + } + + for _, gate := range featureGates { + featureConfig[gate] = true + } + + return featureConfig +} + +func loadSaturationDetectorConfig(sd *configapi.SaturationDetector) saturationdetector.Config { + sdConfig := saturationdetector.Config{} + + sdConfig.QueueDepthThreshold = sd.QueueDepthThreshold + if sdConfig.QueueDepthThreshold <= 0 { + sdConfig.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold + } + sdConfig.KVCacheUtilThreshold = sd.KVCacheUtilThreshold + if sdConfig.KVCacheUtilThreshold <= 0.0 || sdConfig.KVCacheUtilThreshold >= 1.0 { + sdConfig.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold + } + sdConfig.MetricsStalenessThreshold = sd.MetricsStalenessThreshold.Duration + if sdConfig.MetricsStalenessThreshold <= 0.0 { + sdConfig.MetricsStalenessThreshold = saturationdetector.DefaultMetricsStalenessThreshold + } + + return sdConfig +} + func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error { pluginNames := sets.New[string]() // set of plugin names, a name must be unique @@ -145,34 +187,7 @@ func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins return nil } -func validateSchedulingProfiles(config *configapi.EndpointPickerConfig) error { - profileNames := sets.New[string]() - for _, profile := range config.SchedulingProfiles { - if profile.Name == "" { - return errors.New("SchedulingProfile must have a name") - } - - if profileNames.Has(profile.Name) { - return fmt.Errorf("the name '%s' has been specified for more than one SchedulingProfile", profile.Name) - } - profileNames.Insert(profile.Name) - - for _, plugin := range profile.Plugins { - if len(plugin.PluginRef) == 0 { - return fmt.Errorf("SchedulingProfile '%s' plugins must have a plugin reference", profile.Name) - } - - notFound := true - for _, pluginConfig := range config.Plugins { - if plugin.PluginRef == pluginConfig.Name { - notFound = false - break - } - } - if notFound { - return errors.New(plugin.PluginRef + " is a reference to an undefined Plugin") - } - } - } - return nil +// RegisterFeatureGate registers feature gate keys for validation +func RegisterFeatureGate(gate string) { + registeredFeatureGates[gate] = struct{}{} } diff --git a/pkg/epp/config/loader/configloader_test.go b/pkg/epp/config/loader/configloader_test.go index ff7b65256..c5c205827 100644 --- a/pkg/epp/config/loader/configloader_test.go +++ b/pkg/epp/config/loader/configloader_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "os" "testing" + "time" "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +29,9 @@ import ( configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" @@ -97,6 +100,10 @@ func TestLoadRawConfiguration(t *testing.T) { }, }, }, + FeatureGates: configapi.FeatureGates{datalayer.FeatureGate}, + SaturationDetector: &configapi.SaturationDetector{ + MetricsStalenessThreshold: metav1.Duration{Duration: 150 * time.Millisecond}, + }, } goodConfigNoProfiles := &configapi.EndpointPickerConfig{ @@ -199,6 +206,12 @@ func TestLoadRawConfigurationWithDefaults(t *testing.T) { }, }, }, + FeatureGates: configapi.FeatureGates{datalayer.FeatureGate}, + SaturationDetector: &configapi.SaturationDetector{ + QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, + KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, + MetricsStalenessThreshold: metav1.Duration{Duration: 150 * time.Millisecond}, + }, } goodConfigNoProfiles := &configapi.EndpointPickerConfig{ @@ -234,6 +247,12 @@ func TestLoadRawConfigurationWithDefaults(t *testing.T) { }, }, }, + FeatureGates: configapi.FeatureGates{}, + SaturationDetector: &configapi.SaturationDetector{ + QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, + KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, + MetricsStalenessThreshold: metav1.Duration{Duration: saturationdetector.DefaultMetricsStalenessThreshold}, + }, } tests := []testStruct{ @@ -346,6 +365,7 @@ func checkError(t *testing.T, function string, test testStruct, err error) { } func TestInstantiatePlugins(t *testing.T) { + registerNeededFeatureGates() handle := utils.NewTestHandle(context.Background()) _, err := LoadConfig([]byte(successConfigText), handle, logging.NewTestLogger()) if err != nil { @@ -420,8 +440,14 @@ func TestLoadConfig(t *testing.T) { configText: errorNoProfileHandlersText, wantErr: true, }, + { + name: "errorUnknownFeatureGate", + configText: errorUnknownFeatureGateText, + wantErr: true, + }, } + registerNeededFeatureGates() registerNeededPlgugins() logger := logging.NewTestLogger() @@ -439,6 +465,10 @@ func TestLoadConfig(t *testing.T) { } } +func registerNeededFeatureGates() { + RegisterFeatureGate(datalayer.FeatureGate) +} + func registerNeededPlgugins() { plugins.Register(prefix.PrefixCachePluginType, prefix.PrefixCachePluginFactory) plugins.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory) @@ -447,6 +477,64 @@ func registerNeededPlgugins() { plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory) } +func TestNewDetector(t *testing.T) { + tests := []struct { + name string + config *configapi.SaturationDetector + expectedConfig saturationdetector.Config + }{ + { + name: "Valid config", + config: &configapi.SaturationDetector{ + QueueDepthThreshold: 10, + KVCacheUtilThreshold: 0.8, + MetricsStalenessThreshold: metav1.Duration{Duration: 100 * time.Millisecond}, + }, + expectedConfig: saturationdetector.Config{ + QueueDepthThreshold: 10, + KVCacheUtilThreshold: 0.8, + MetricsStalenessThreshold: 100 * time.Millisecond, + }, + }, + { + name: "invalid thresholds, fallback to default", + config: &configapi.SaturationDetector{ + QueueDepthThreshold: -1, + KVCacheUtilThreshold: -5.0, + MetricsStalenessThreshold: metav1.Duration{Duration: 0 * time.Second}, + }, + expectedConfig: saturationdetector.Config{ + QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, + KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, + MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold, + }, + }, + { + name: "kv cache threshold above range, fallback to default", + config: &configapi.SaturationDetector{ + QueueDepthThreshold: 10, + KVCacheUtilThreshold: 1.5, + MetricsStalenessThreshold: metav1.Duration{Duration: 100 * time.Millisecond}, + }, + expectedConfig: saturationdetector.Config{ + QueueDepthThreshold: 10, + KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, + MetricsStalenessThreshold: 100 * time.Millisecond, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // validate configuration values are loaded from configuration struct properly, including the use of default values when provided value is invalid. + sdConfig := loadSaturationDetectorConfig(test.config) + if diff := cmp.Diff(test.expectedConfig, sdConfig); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + }) + } +} + // The following multi-line string constants, cause false positive lint errors (dupword) // valid configuration @@ -474,6 +562,10 @@ schedulingProfiles: - pluginRef: test-two weight: 50 - pluginRef: testPicker +featureGates: +- dataLayer +saturationDetector: + metricsStalenessThreshold: 150ms ` // success with missing scheduling profiles @@ -639,6 +731,21 @@ schedulingProfiles: - pluginRef: test2 ` +// error with an unknown feature gate +// +//nolint:dupword +const errorUnknownFeatureGateText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + type: test-one + parameters: + threshold: 10 +featureGates: +- qwerty +` + // compile-time type validation var _ framework.Filter = &test1{} @@ -778,6 +885,8 @@ schedulingProfiles: - pluginRef: prefixCacheScorer weight: 50 - pluginRef: maxScorePicker +featureGates: +- dataLayer ` // valid configuration, with default weight for scorer diff --git a/pkg/epp/config/loader/defaults.go b/pkg/epp/config/loader/defaults.go index 2a8236ea5..17075dd05 100644 --- a/pkg/epp/config/loader/defaults.go +++ b/pkg/epp/config/loader/defaults.go @@ -17,8 +17,10 @@ limitations under the License. package loader import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" @@ -41,6 +43,8 @@ var defaultScorerWeight = DefaultScorerWeight // setDefaultsPhaseOne Performs the first phase of setting configuration defaults. // In particuylar it: // 1. Sets the name of plugins, for which one wasn't specified +// 2. Sets defaults for the feature gates +// 3. Sets defaults for the SaturationDetector configuration func setDefaultsPhaseOne(cfg *configapi.EndpointPickerConfig) { // If no name was given for the plugin, use it's type as the name for idx, pluginConfig := range cfg.Plugins { @@ -48,6 +52,26 @@ func setDefaultsPhaseOne(cfg *configapi.EndpointPickerConfig) { cfg.Plugins[idx].Name = pluginConfig.Type } } + + // If no feature gates were specified, provide a default FeatureGates struct + if cfg.FeatureGates == nil { + cfg.FeatureGates = configapi.FeatureGates{} + } + + // If the SaturationDetector configuration wasn't specified setup a default one + if cfg.SaturationDetector == nil { + cfg.SaturationDetector = &configapi.SaturationDetector{} + } + if cfg.SaturationDetector.QueueDepthThreshold == 0 { + cfg.SaturationDetector.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold + } + if cfg.SaturationDetector.KVCacheUtilThreshold == 0.0 { + cfg.SaturationDetector.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold + } + if cfg.SaturationDetector.MetricsStalenessThreshold.Duration == 0.0 { + cfg.SaturationDetector.MetricsStalenessThreshold = + metav1.Duration{Duration: saturationdetector.DefaultMetricsStalenessThreshold} + } } // setDefaultsPhaseTwo Performs the second phase of setting configuration defaults. diff --git a/pkg/epp/config/loader/validation.go b/pkg/epp/config/loader/validation.go new file mode 100644 index 000000000..e43fcb5c2 --- /dev/null +++ b/pkg/epp/config/loader/validation.go @@ -0,0 +1,71 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loader + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/util/sets" + configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1" +) + +func validateSchedulingProfiles(config *configapi.EndpointPickerConfig) error { + profileNames := sets.New[string]() + for _, profile := range config.SchedulingProfiles { + if profile.Name == "" { + return errors.New("SchedulingProfile must have a name") + } + + if profileNames.Has(profile.Name) { + return fmt.Errorf("the name '%s' has been specified for more than one SchedulingProfile", profile.Name) + } + profileNames.Insert(profile.Name) + + for _, plugin := range profile.Plugins { + if len(plugin.PluginRef) == 0 { + return fmt.Errorf("SchedulingProfile '%s' plugins must have a plugin reference", profile.Name) + } + + notFound := true + for _, pluginConfig := range config.Plugins { + if plugin.PluginRef == pluginConfig.Name { + notFound = false + break + } + } + if notFound { + return errors.New(plugin.PluginRef + " is a reference to an undefined Plugin") + } + } + } + return nil +} + +func validateFeatureGates(fg configapi.FeatureGates) error { + if fg == nil { + return nil + } + + for _, gate := range fg { + if _, ok := registeredFeatureGates[gate]; !ok { + return errors.New(gate + " is an unregistered Feature Gate") + } + } + + return nil +} diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index eca7697e5..f30b9cefd 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -28,6 +28,10 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" ) +const ( + FeatureGate = "dataLayer" +) + // PoolInfo represents the DataStore information needed for endpoints. // TODO: // Consider if to remove/simplify in follow-ups. This is mostly for backward diff --git a/pkg/epp/saturationdetector/config.go b/pkg/epp/saturationdetector/config.go index b429d1e66..629c3a6a6 100644 --- a/pkg/epp/saturationdetector/config.go +++ b/pkg/epp/saturationdetector/config.go @@ -16,12 +16,7 @@ limitations under the License. package saturationdetector import ( - "fmt" "time" - - "sigs.k8s.io/controller-runtime/pkg/log" - - envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" ) // Default configuration values @@ -36,37 +31,3 @@ const ( // that should be fine. DefaultMetricsStalenessThreshold = 200 * time.Millisecond ) - -// Environment variable names for SaturationDetector configuration -const ( - EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD" - EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD" - EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD" -) - -// LoadConfigFromEnv loads SaturationDetector Config from environment variables. -func LoadConfigFromEnv() *Config { - // Use a default logger for initial configuration loading. - logger := log.Log.WithName("saturation-detector-config") - - cfg := &Config{} - - cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger) - if cfg.QueueDepthThreshold <= 0 { - cfg.QueueDepthThreshold = DefaultQueueDepthThreshold - } - - cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger) - if cfg.KVCacheUtilThreshold <= 0 || cfg.KVCacheUtilThreshold >= 1 { - cfg.KVCacheUtilThreshold = DefaultKVCacheUtilThreshold - } - - cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger) - if cfg.MetricsStalenessThreshold <= 0 { - cfg.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold - } - - // NewDetector validates the config and assigns defaults. - logger.Info("SaturationDetector configuration loaded from env", "config", fmt.Sprintf("%+v", cfg)) - return cfg -} diff --git a/pkg/epp/saturationdetector/saturationdetector_test.go b/pkg/epp/saturationdetector/saturationdetector_test.go index 0b861d90a..833d0b245 100644 --- a/pkg/epp/saturationdetector/saturationdetector_test.go +++ b/pkg/epp/saturationdetector/saturationdetector_test.go @@ -18,14 +18,10 @@ package saturationdetector import ( "context" - "fmt" - "os" - "strconv" "testing" "time" "github.com/go-logr/logr" - "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" @@ -43,68 +39,6 @@ func newMockPodMetrics(name string, metrics *backendmetrics.MetricsState) *backe // --- Tests --- -func TestNewDetector(t *testing.T) { - tests := []struct { - name string - config *Config - expectedConfig *Config - }{ - { - name: "Valid config", - config: &Config{ - QueueDepthThreshold: 10, - KVCacheUtilThreshold: 0.8, - MetricsStalenessThreshold: 100 * time.Millisecond, - }, - expectedConfig: &Config{ - QueueDepthThreshold: 10, - KVCacheUtilThreshold: 0.8, - MetricsStalenessThreshold: 100 * time.Millisecond, - }, - }, - { - name: "invalid thresholds, fallback to default", - config: &Config{ - QueueDepthThreshold: -1, - KVCacheUtilThreshold: -5, - MetricsStalenessThreshold: 0, - }, - expectedConfig: &Config{ - QueueDepthThreshold: DefaultQueueDepthThreshold, - KVCacheUtilThreshold: DefaultKVCacheUtilThreshold, - MetricsStalenessThreshold: DefaultMetricsStalenessThreshold, - }, - }, - { - name: "kv cache threshold above range, fallback to default", - config: &Config{ - QueueDepthThreshold: 10, - KVCacheUtilThreshold: 1.5, - MetricsStalenessThreshold: 100 * time.Millisecond, - }, - expectedConfig: &Config{ - QueueDepthThreshold: 10, - KVCacheUtilThreshold: DefaultKVCacheUtilThreshold, - MetricsStalenessThreshold: 100 * time.Millisecond, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - // validate configuration values are loaded from env vars properly, including the use of default values when provided value is invalid. - os.Setenv(EnvSdQueueDepthThreshold, strconv.Itoa(test.config.QueueDepthThreshold)) - os.Setenv(EnvSdKVCacheUtilThreshold, fmt.Sprintf("%v", test.config.KVCacheUtilThreshold)) - os.Setenv(EnvSdMetricsStalenessThreshold, test.config.MetricsStalenessThreshold.String()) - - detector := NewDetector(LoadConfigFromEnv(), logr.Discard()) - if diff := cmp.Diff(test.expectedConfig, detector.config); diff != "" { - t.Errorf("Unexpected output (-want +got): %v", diff) - } - }) - } -} - func TestDetector_IsSaturated(t *testing.T) { baseTime := time.Now() defaultConfig := &Config{ diff --git a/site-src/guides/epp-configuration/config-text.md b/site-src/guides/epp-configuration/config-text.md index cdb3714cb..a73502258 100644 --- a/site-src/guides/epp-configuration/config-text.md +++ b/site-src/guides/epp-configuration/config-text.md @@ -1,23 +1,14 @@ -# Configuring Plugins via YAML +# Configuring via YAML -The set of lifecycle hooks (plugins) that are used by the Inference Gateway (IGW) is determined by how -it is configured. The IGW is primarily configured via a configuration file. +The Inference Gateway (IGW) can be configured via a YAML file. -The YAML file can either be specified as a path to a file or in-line as a parameter. The configuration defines the set of -plugins to be instantiated along with their parameters. Each plugin can also be given a name, enabling -the same plugin type to be instantiated multiple times, if needed (such as when configuring multiple scheduling profiles). +At this time the YAML file based configuration allows for: -Also defined is a set of SchedulingProfiles, which determine the set of plugins to be used when scheduling a request. -If no scheduling profile is specified, a default profile, named `default` will be added and will reference all of the -instantiated plugins. +1. The set of the lifecycle hooks (plugins) that are used by the IGW. +2. The configuration of the saturation detector +3. A set of feature gates that are used to enable experimental features. -The set of plugins instantiated can include a Profile Handler, which determines which SchedulingProfiles -will be used for a particular request. A Profile Handler must be specified, unless the configuration only -contains one profile, in which case the `SingleProfileHandler` will be used. - -In addition, the set of instantiated plugins can also include a picker, which chooses the actual pod to which -the request is scheduled after filtering and scoring. If one is not referenced in a SchedulingProfile, an -instance of `MaxScorePicker` will be added to the SchedulingProfile in question. +The YAML file can either be specified as a path to a file or in-line as a parameter. ***NOTE***: While the configuration text looks like a Kubernetes CRD, it is **NOT** a Kubernetes CRD. Specifically, the config is not reconciled upon, and is only read on startup. @@ -33,10 +24,46 @@ plugins: schedulingProfiles: - .... - .... +saturationDetector: + ... +featureGates: + ... ``` The first two lines of the configuration are constant and must appear as is. +The plugins section defines the set of plugins that will be instantiated and their parameters. This section is described in more detail in the section [Configuring Plugins via text](#configuring-plugins-via-text) + +The schedulingProfiles section defines the set of scheduling profiles that can be used in scheduling +requests to pods. This section is described in more detail in the section [Configuring Plugins via YAML](#configuring-plugins-via-yaml) + +The saturationDetector section configures the saturation detector, which is used to determine if special +action needs to eb taken due to the system being overloaded or saturated. This section is described in more detail in the section [Saturation Detector configuration](#saturation-detector-configuration) + +The featureGates sections allows the enablement of experimental features of the IGW. This section is +described in more detail in the section [Feature Gates](#feature-gates) + +## Configuring Plugins via YAML + +The set of plugins that are used by the IGW is determined by how it is configured. The IGW is +primarily configured via a configuration file. + +The configuration defines the set of plugins to be instantiated along with their parameters. +Each plugin can also be given a name, enabling the same plugin type to be instantiated multiple +times, if needed (such as when configuring multiple scheduling profiles). + +Also defined is a set of SchedulingProfiles, which determine the set of plugins to be used when scheduling +a request. If one is not defined, a default one names `default` will be added and will reference all of +the instantiated plugins. + +The set of plugins instantiated can include a Profile Handler, which determines which SchedulingProfiles +will be used for a particular request. A Profile Handler must be specified, unless the configuration only +contains one profile, in which case the `SingleProfileHandler` will be used. + +In addition, the set of instantiated plugins can also include a picker, which chooses the actual pod to which +the request is scheduled after filtering and scoring. If one is not referenced in a SchedulingProfile, an +instance of `MaxScorePicker` will be added to the SchedulingProfile in question. + The plugins section defines the set of plugins that will be instantiated and their parameters. Each entry in this section has the following form: @@ -184,7 +211,7 @@ schedulingProfiles: -pluginRef: max-score-picker ``` -## Plugin Configuration +### Plugin Configuration This section describes how to setup the various plugins that are available with the IGW. @@ -260,3 +287,57 @@ scored higher (since it's more available to serve new request). - *Type*: lora-affinity-scorer - *Parameters*: none + +## Saturation Detector configuration + +The Saturation Detector is used to determine if the the cluster is overloaded, i.e. saturated. When +the cluster is saturated special actions will be taken depending what has been enabled. At this time, sheddable requests will be dropped. + +The Saturation Detector determines that the cluster is saturated by looking at the following metrics provided by the inference servers: + +- Backed waiting queue size +- KV cache utilization +- Metrics staleness + +The Saturation Detector is configured via the saturationDetector section of the overall configuration. +It has the following form: + +```yaml +saturationDetector: + queueDepthThreshold: 8 + kvCacheUtilThreshold: 0.75 + metricsStalenessThreshold: 150ms +``` + +The various sub-fields of the saturationDetector section are: + +- The `queueDepthThreshold` field which defines the backend waiting queue size above which a +pod is considered to have insufficient capacity for new requests. This field is optional, if +omitted a value of `5` will be used. +- The `kvCacheUtilThreshold` field which defines the KV cache utilization (0.0 to 1.0) above +which a pod is considered to have insufficient capacity. This field is optional, if omitted +a value of `0.8` will be used. +- The `metricsStalenessThreshold` field which defines how old a pod's metrics can be. If a pod's +metrics are older than this, it might be excluded from "good capacity" considerations or treated +as having no capacity for safety. This field is optional, if omitted a value of `200ms` will be used. + +## Feature Gates + +The Feature Gates section allows for the enabling of experimental features of the IGW. These experimental +features are all disabled unless you explicitly enable them one by one. + +The Feature Gates section has the follwoing form: + +```yaml +featureGates: +- dataLayer +- flowControl +``` + +The Feature Gates section is an array of flags, each of which enables one experimental feature. +The available values for these elements are: + +- `dataLayer` which, if present, enables the experimental Datalayer APIs. +- `flowControl` which, if present, enables the experimental FlowControl feature. + +In all cases if the appropriate element isn't present, that experimental feature will be disabled. \ No newline at end of file diff --git a/test/testdata/configloader_1_test.yaml b/test/testdata/configloader_1_test.yaml index f1f167efb..e425a78b6 100644 --- a/test/testdata/configloader_1_test.yaml +++ b/test/testdata/configloader_1_test.yaml @@ -12,7 +12,6 @@ plugins: hashBlockSize: 32 - name: testPicker type: test-picker - schedulingProfiles: - name: default plugins: @@ -20,3 +19,7 @@ schedulingProfiles: - pluginRef: test-two weight: 50 - pluginRef: testPicker +featureGates: +- dataLayer +saturationDetector: + metricsStalenessThreshold: 150ms