diff --git a/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml b/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml index baa20d678e7..6b637f44591 100644 --- a/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml +++ b/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml @@ -50,8 +50,16 @@ spec: host: {{ .Values.yurthubBindingAddr }} path: /v1/healthz port: 10267 - initialDelaySeconds: 300 - periodSeconds: 5 + initialDelaySeconds: 120 + periodSeconds: 10 + failureThreshold: 3 + readinessProbe: + httpGet: + host: {{ .Values.yurthubBindingAddr }} + path: /v1/readyz + port: 10267 + initialDelaySeconds: 120 + periodSeconds: 20 failureThreshold: 3 resources: requests: diff --git a/charts/yurthub/templates/yurthub-yurtstaticset.yaml b/charts/yurthub/templates/yurthub-yurtstaticset.yaml index 272e06709e7..83cd78ba9ff 100644 --- a/charts/yurthub/templates/yurthub-yurtstaticset.yaml +++ b/charts/yurthub/templates/yurthub-yurtstaticset.yaml @@ -50,8 +50,16 @@ spec: host: {{ .Values.yurthubBindingAddr }} path: /v1/healthz port: 10267 - initialDelaySeconds: 300 - periodSeconds: 5 + initialDelaySeconds: 120 + periodSeconds: 10 + failureThreshold: 3 + readinessProbe: + httpGet: + host: {{ .Values.yurthubBindingAddr }} + path: /v1/readyz + port: 10267 + initialDelaySeconds: 120 + periodSeconds: 20 failureThreshold: 3 resources: requests: diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 1342f599c80..05a0a43b036 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -28,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" apiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -63,19 +62,6 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -var AllowedMultiplexerResources = []schema.GroupVersionResource{ - { - Group: "", - Version: "v1", - Resource: "services", - }, - { - Group: "discovery.k8s.io", - Version: "v1", - Resource: "endpointslices", - }, -} - // YurtHubConfiguration represents configuration of yurthub type YurtHubConfiguration struct { LBMode string @@ -111,9 +97,7 @@ type YurtHubConfiguration struct { ProxiedClient kubernetes.Interface DiskCachePath string HostControlPlaneAddr string // ip:port - PostStartHooks map[string]func() error - RequestMultiplexerManager multiplexer.MultiplexerManager - MultiplexerResources []schema.GroupVersionResource + RequestMultiplexerManager *multiplexer.MultiplexerManager ConfigManager *configuration.Manager } @@ -178,8 +162,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { ProxiedClient: proxiedClient, DiskCachePath: options.DiskCachePath, HostControlPlaneAddr: options.HostControlPlaneAddr, - MultiplexerResources: AllowedMultiplexerResources, - RequestMultiplexerManager: newMultiplexerCacheManager(options), + RequestMultiplexerManager: newRequestMultiplexerManager(options, restMapperManager), ConfigManager: configManager, } @@ -302,7 +285,7 @@ func registerInformers(options *options.YurtHubOptions, workingMode util.WorkingMode, tenantNs string) { - // configmap informer is used by Yurthub filter approver + // configmap informer is used by Yurthub filter approver/cache manager newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { tweakListOptions := func(options *metav1.ListOptions) { options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String() @@ -409,16 +392,28 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y return nil } -func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager { - config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort) - rsm := storage.NewStorageManager(config) +func newRequestMultiplexerManager(options *options.YurtHubOptions, restMapperManager *meta.RESTMapperManager) *multiplexer.MultiplexerManager { + config := &rest.Config{ + Host: fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), + UserAgent: util.MultiplexerProxyClientUserAgentPrefix + options.NodeName, + } + storageProvider := storage.NewStorageProvider(config) - return multiplexer.NewRequestsMultiplexerManager(rsm) + return multiplexer.NewRequestMultiplexerManager(storageProvider, restMapperManager, options.PoolScopeResources) } -func newRestConfig(nodeName string, host string, port int) *rest.Config { - return &rest.Config{ - Host: fmt.Sprintf("http://%s:%d", host, port), - UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName, +func ReadinessCheck(cfg *YurtHubConfiguration) error { + if ready := cfg.CertManager.Ready(); !ready { + return fmt.Errorf("certificates are not ready") + } + + if ready := cfg.ConfigManager.HasSynced(); !ready { + return fmt.Errorf("yurt-hub-cfg configmap is not synced") } + + if synced := cfg.FilterFinder.HasSynced(); !synced { + return fmt.Errorf("resources needed by filters are not synced") + } + + return nil } diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index a4ef50f7010..20514d26684 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -23,6 +23,7 @@ import ( "time" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -81,6 +82,7 @@ type YurtHubOptions struct { UnsafeSkipCAVerification bool ClientForTest kubernetes.Interface EnablePoolServiceTopology bool + PoolScopeResources PoolScopeMetadatas } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -116,6 +118,10 @@ func NewYurtHubOptions() *YurtHubOptions { CACertHashes: make([]string, 0), UnsafeSkipCAVerification: true, EnablePoolServiceTopology: false, + PoolScopeResources: []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + }, } return o } @@ -208,6 +214,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.UnsafeSkipCAVerification, "discovery-token-unsafe-skip-ca-verification", o.UnsafeSkipCAVerification, "For token-based discovery, allow joining without --discovery-token-ca-cert-hash pinning.") fs.BoolVar(&o.EnablePoolServiceTopology, "enable-pool-service-topology", o.EnablePoolServiceTopology, "enable service topology feature in the node pool.") fs.StringVar(&o.HostControlPlaneAddr, "host-control-plane-address", o.HostControlPlaneAddr, "the address (ip:port) of host kubernetes cluster that used for yurthub local mode.") + fs.Var(&o.PoolScopeResources, "pool-scope-resources", "The list/watch requests for these resources will be multiplexered in yurthub in order to reduce overhead of kube-apiserver. comma-separated list of GroupVersionResource in the format Group/Version/Resource") } // verifyDummyIP verify the specified ip is valid or not and set the default ip if empty diff --git a/cmd/yurthub/app/options/options_test.go b/cmd/yurthub/app/options/options_test.go index 35327aa9b63..009187851e3 100644 --- a/cmd/yurthub/app/options/options_test.go +++ b/cmd/yurthub/app/options/options_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/runtime/schema" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" @@ -61,6 +62,10 @@ func TestNewYurtHubOptions(t *testing.T) { MinRequestTimeout: time.Second * 1800, CACertHashes: make([]string, 0), UnsafeSkipCAVerification: true, + PoolScopeResources: []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + }, } options := NewYurtHubOptions() diff --git a/cmd/yurthub/app/options/pool_scope_metadata.go b/cmd/yurthub/app/options/pool_scope_metadata.go new file mode 100644 index 00000000000..a219d48078a --- /dev/null +++ b/cmd/yurthub/app/options/pool_scope_metadata.go @@ -0,0 +1,68 @@ +/* +Copyright 2025 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type PoolScopeMetadatas []schema.GroupVersionResource + +// String returns the string representation of the GVR slice. +func (psm *PoolScopeMetadatas) String() string { + var result strings.Builder + for i, gvr := range *psm { + if i > 0 { + result.WriteString(",") + } + + result.WriteString(gvr.Group) + result.WriteString("/") + result.WriteString(gvr.Version) + result.WriteString("/") + result.WriteString(gvr.Resource) + } + + return result.String() +} + +// Set parses the input string and updates the PoolScopeMetadata slice. +func (psm *PoolScopeMetadatas) Set(value string) error { + parts := strings.Split(value, ",") + for _, part := range parts { + subParts := strings.Split(part, "/") + if len(subParts) != 3 { + return fmt.Errorf("invalid GVR format: %s, expected format is Group/Version/Resource", part) + } + + *psm = append(*psm, schema.GroupVersionResource{ + Group: strings.TrimSpace(subParts[0]), + Version: strings.TrimSpace(subParts[1]), + Resource: strings.TrimSpace(subParts[2]), + }) + } + + return nil +} + +// Type returns the type of the flag as a string. +func (psm *PoolScopeMetadatas) Type() string { + return "PoolScopeMetadatas" +} diff --git a/cmd/yurthub/app/options/pool_scope_metadata_test.go b/cmd/yurthub/app/options/pool_scope_metadata_test.go new file mode 100644 index 00000000000..eea758927cf --- /dev/null +++ b/cmd/yurthub/app/options/pool_scope_metadata_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2025 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestSet(t *testing.T) { + testcases := map[string]struct { + input string + expected []schema.GroupVersionResource + expectErr bool + }{ + "single pool scope metadata": { + input: "group1/v1/resource1", + expected: []schema.GroupVersionResource{ + {Group: "group1", Version: "v1", Resource: "resource1"}, + }, + }, + "multiple pool scope metadatas": { + input: "group1/v1/resource1, group2/v2/resource2", + expected: []schema.GroupVersionResource{ + {Group: "group1", Version: "v1", Resource: "resource1"}, + {Group: "group2", Version: "v2", Resource: "resource2"}, + }, + }, + "multiple pool scope metadatas with empty group": { + input: "/v1/resource1, /v2/resource2", + expected: []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "resource1"}, + {Group: "", Version: "v2", Resource: "resource2"}, + }, + }, + "invalid format of pool scope metadata": { + input: "group1/v1", + expectErr: true, + }, + "empty string of pool scope metadata": { + input: "", + expectErr: true, + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + var psm PoolScopeMetadatas + err := psm.Set(tc.input) + if (err != nil) != tc.expectErr { + t.Errorf("expected error %v, but got %v", tc.expectErr, err != nil) + } + + if !tc.expectErr && !comparePoolScopeMetadatas(psm, tc.expected) { + t.Errorf("expected pool scope metadatas: %+v, but got %+v", tc.expected, psm) + } + }) + } +} + +func comparePoolScopeMetadatas(a, b []schema.GroupVersionResource) bool { + if len(a) != len(b) { + return false + } + + for i := range a { + if a[i] != b[i] { + return false + } + } + + return true +} + +func TestString(t *testing.T) { + testcases := map[string]struct { + psm PoolScopeMetadatas + expectedStr string + }{ + "single pool scope metadata": { + psm: PoolScopeMetadatas{ + {Group: "group1", Version: "v1", Resource: "resource1"}, + }, + expectedStr: "group1/v1/resource1", + }, + "multiple pool scope metadatas": { + psm: PoolScopeMetadatas{ + {Group: "group1", Version: "v1", Resource: "resource1"}, + {Group: "group2", Version: "v2", Resource: "resource2"}, + }, + + expectedStr: "group1/v1/resource1,group2/v2/resource2", + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + if tc.psm.String() != tc.expectedStr { + t.Errorf("expected string %s, but got %s", tc.expectedStr, tc.psm.String()) + } + }) + } +} diff --git a/pkg/node-servant/components/yurthub.go b/pkg/node-servant/components/yurthub.go index 5c35504115e..e9a6c1d24c8 100644 --- a/pkg/node-servant/components/yurthub.go +++ b/pkg/node-servant/components/yurthub.go @@ -182,16 +182,16 @@ func hubHealthcheck(timeout time.Duration) error { if err != nil { return err } - serverHealthzURL.Path = constants.ServerHealthzURLPath + serverHealthzURL.Path = constants.ServerReadyzURLPath start := time.Now() return wait.PollUntilContextTimeout(context.Background(), hubHealthzCheckFrequency, timeout, true, func(ctx context.Context) (bool, error) { _, err := pingClusterHealthz(http.DefaultClient, serverHealthzURL.String()) if err != nil { - klog.Infof("yurt-hub is not ready, ping cluster healthz with result: %v", err) + klog.Infof("yurt-hub is not ready, ping cluster readyz with result: %v", err) return false, nil } - klog.Infof("yurt-hub healthz is OK after %f seconds", time.Since(start).Seconds()) + klog.Infof("yurt-hub readyz is OK after %f seconds", time.Since(start).Seconds()) return true, nil }) } diff --git a/pkg/yurthub/configuration/manager.go b/pkg/yurthub/configuration/manager.go index 9a7fcfd8882..2db434abb01 100644 --- a/pkg/yurthub/configuration/manager.go +++ b/pkg/yurthub/configuration/manager.go @@ -104,7 +104,6 @@ func (m *Manager) HasSynced() bool { func (m *Manager) ListAllCacheAgents() []string { m.RLock() defer m.RUnlock() - return m.allCacheAgents.UnsortedList() } diff --git a/pkg/yurthub/filter/approver/approver.go b/pkg/yurthub/filter/approver/approver.go index 3a1b5c44517..dda50985569 100644 --- a/pkg/yurthub/filter/approver/approver.go +++ b/pkg/yurthub/filter/approver/approver.go @@ -21,7 +21,6 @@ import ( "strings" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/configuration" @@ -59,10 +58,6 @@ func (a *approver) Approve(req *http.Request) (bool, []string) { } } - if ok := cache.WaitForCacheSync(a.stopCh, a.configManager.HasSynced); !ok { - return false, []string{} - } - filterNames := a.configManager.FindFiltersFor(req) if len(filterNames) == 0 { return false, filterNames diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index e33b0e0a3c2..cc55bd9811c 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -58,6 +58,14 @@ type ObjectFilter interface { type FilterFinder interface { FindResponseFilter(req *http.Request) (ResponseFilter, bool) FindObjectFilter(req *http.Request) (ObjectFilter, bool) + ResourceSyncer } type NodeGetter func(name string) (*v1.Node, error) + +// ResourceSyncer is used for verifying the resources which filter depends on has been synced or not. +// For example: servicetopology filter depends on service and nodebucket metadata, filter can be worked +// before all these metadata has been synced completely. +type ResourceSyncer interface { + HasSynced() bool +} diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index 7e0fdb4edd1..ed326997ef1 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" yurtoptions "github.com/openyurtio/openyurt/cmd/yurthub/app/options" "github.com/openyurtio/openyurt/pkg/yurthub/configuration" @@ -40,6 +41,7 @@ type Manager struct { filter.Approver nameToObjectFilter map[string]filter.ObjectFilter serializerManager *serializer.SerializerManager + resourceSyncers []filter.ResourceSyncer } func NewFilterManager(options *yurtoptions.YurtHubOptions, @@ -48,45 +50,68 @@ func NewFilterManager(options *yurtoptions.YurtHubOptions, proxiedClient kubernetes.Interface, serializerManager *serializer.SerializerManager, configManager *configuration.Manager) (filter.FilterFinder, error) { - if !options.EnableResourceFilter { - return nil, nil - } - - // 1. new base filters - if options.WorkingMode == string(util.WorkingModeCloud) { - options.DisabledResourceFilters = append(options.DisabledResourceFilters, yurtoptions.DisabledInCloudMode...) - } - filters := base.NewFilters(options.DisabledResourceFilters) + var err error + nameToFilters := make(map[string]filter.ObjectFilter) + if options.EnableResourceFilter { + // 1. new base filters + if options.WorkingMode == string(util.WorkingModeCloud) { + options.DisabledResourceFilters = append(options.DisabledResourceFilters, yurtoptions.DisabledInCloudMode...) + } + filters := base.NewFilters(options.DisabledResourceFilters) - // 2. register all filter factory - yurtoptions.RegisterAllFilters(filters) + // 2. register all filter factory + yurtoptions.RegisterAllFilters(filters) - // 3. prepare filter initializer chain - mutatedMasterServicePort := strconv.Itoa(options.YurtHubProxySecurePort) - mutatedMasterServiceHost := options.YurtHubProxyHost - if options.EnableDummyIf { - mutatedMasterServiceHost = options.HubAgentDummyIfIP + // 3. prepare filter initializer chain + mutatedMasterServicePort := strconv.Itoa(options.YurtHubProxySecurePort) + mutatedMasterServiceHost := options.YurtHubProxyHost + if options.EnableDummyIf { + mutatedMasterServiceHost = options.HubAgentDummyIfIP + } + genericInitializer := initializer.New(sharedFactory, proxiedClient, options.NodeName, options.NodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort) + nodesInitializer := initializer.NewNodesInitializer(options.EnableNodePool, options.EnablePoolServiceTopology, dynamicSharedFactory) + initializerChain := base.Initializers{} + initializerChain = append(initializerChain, genericInitializer, nodesInitializer) + + // 4. initialize all object filters + nameToFilters, err = filters.NewFromFilters(initializerChain) + if err != nil { + return nil, err + } } - genericInitializer := initializer.New(sharedFactory, proxiedClient, options.NodeName, options.NodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort) - nodesInitializer := initializer.NewNodesInitializer(options.EnableNodePool, options.EnablePoolServiceTopology, dynamicSharedFactory) - initializerChain := base.Initializers{} - initializerChain = append(initializerChain, genericInitializer, nodesInitializer) - - // 4. initialize all object filters - nameToFilters, err := filters.NewFromFilters(initializerChain) - if err != nil { - return nil, err + + resourceSyncers := make([]filter.ResourceSyncer, 0) + for name, objFilter := range nameToFilters { + if resourceSyncer, ok := objFilter.(filter.ResourceSyncer); ok { + klog.Infof("filter %s need to sync resource before starting to work", name) + resourceSyncers = append(resourceSyncers, resourceSyncer) + } } // 5. new filter manager including approver and nameToObjectFilter + // if resource filters are disabled, nameToObjectFilter and resourceSyncers will be empty silces. return &Manager{ Approver: approver.NewApprover(options.NodeName, configManager), nameToObjectFilter: nameToFilters, serializerManager: serializerManager, + resourceSyncers: resourceSyncers, }, nil } +func (m *Manager) HasSynced() bool { + for i := range m.resourceSyncers { + if !m.resourceSyncers[i].HasSynced() { + return false + } + } + return true +} + func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, bool) { + if len(m.nameToObjectFilter) == 0 { + return nil, false + } + approved, filterNames := m.Approver.Approve(req) if approved { objectFilters := make([]filter.ObjectFilter, 0) @@ -107,6 +132,10 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, } func (m *Manager) FindObjectFilter(req *http.Request) (filter.ObjectFilter, bool) { + if len(m.nameToObjectFilter) == 0 { + return nil, false + } + approved, filterNames := m.Approver.Approve(req) if !approved { return nil, false diff --git a/pkg/yurthub/filter/manager/manager_test.go b/pkg/yurthub/filter/manager/manager_test.go index 6169b3254a9..bab2d75e561 100644 --- a/pkg/yurthub/filter/manager/manager_test.go +++ b/pkg/yurthub/filter/manager/manager_test.go @@ -56,13 +56,17 @@ func TestFindResponseFilter(t *testing.T) { userAgent string verb string path string - mgrIsNil bool isFound bool names sets.Set[string] }{ "disable resource filter": { enableResourceFilter: false, - mgrIsNil: true, + enableDummyIf: true, + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/services", + isFound: false, + names: sets.New[string](), }, "get master service filter": { enableResourceFilter: true, @@ -109,6 +113,15 @@ func TestFindResponseFilter(t *testing.T) { isFound: true, names: sets.New("nodeportisolation"), }, + "reject by approver for unknown component": { + enableResourceFilter: true, + enableDummyIf: true, + userAgent: "unknown-agent", + verb: "GET", + path: "/api/v1/services", + isFound: false, + names: sets.New[string](), + }, } resolver := newTestRequestInfoResolver() @@ -134,9 +147,6 @@ func TestFindResponseFilter(t *testing.T) { defer close(stopper) finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager, configManager) - if tt.mgrIsNil && finder == nil { - return - } sharedFactory.Start(stopper) nodePoolFactory.Start(stopper) @@ -161,7 +171,10 @@ func TestFindResponseFilter(t *testing.T) { handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) - if !tt.isFound && isFound == tt.isFound { + if isFound != tt.isFound { + t.Errorf("expect found result %v, but got %v", tt.isFound, isFound) + } else if !tt.isFound { + // skip checking filter names because no filter is found. return } @@ -188,13 +201,17 @@ func TestFindObjectFilter(t *testing.T) { userAgent string verb string path string - mgrIsNil bool isFound bool names sets.Set[string] }{ "disable resource filter": { enableResourceFilter: false, - mgrIsNil: true, + enableDummyIf: true, + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/services", + isFound: false, + names: sets.New[string](), }, "get master service filter": { enableResourceFilter: true, @@ -241,6 +258,15 @@ func TestFindObjectFilter(t *testing.T) { isFound: true, names: sets.New("nodeportisolation"), }, + "reject by approver for unknown component": { + enableResourceFilter: true, + enableDummyIf: true, + userAgent: "unknown-agent", + verb: "GET", + path: "/api/v1/services", + isFound: false, + names: sets.New[string](), + }, } resolver := newTestRequestInfoResolver() @@ -266,9 +292,6 @@ func TestFindObjectFilter(t *testing.T) { defer close(stopper) finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager, configManager) - if tt.mgrIsNil && finder == nil { - return - } sharedFactory.Start(stopper) nodePoolFactory.Start(stopper) @@ -293,7 +316,10 @@ func TestFindObjectFilter(t *testing.T) { handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) - if !tt.isFound && isFound == tt.isFound { + if isFound != tt.isFound { + t.Errorf("expect found result %v, but got %v", tt.isFound, isFound) + } else if !tt.isFound { + // skip checking filter names because no filter is found. return } @@ -311,3 +337,74 @@ func newTestRequestInfoResolver() *request.RequestInfoFactory { GrouplessAPIPrefixes: sets.NewString("api"), } } + +func TestHasSynced(t *testing.T) { + fakeClient := &fake.Clientset{} + scheme := runtime.NewScheme() + apis.AddToScheme(scheme) + fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + serializerManager := serializer.NewSerializerManager() + + testcases := map[string]struct { + enableResourceFilter bool + workingMode string + disabledResourceFilters []string + enableDummyIf bool + userAgent string + verb string + path string + hasSynced bool + }{ + "has synced by disabling resource filter": { + enableResourceFilter: false, + enableDummyIf: true, + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/services", + hasSynced: true, + }, + "has synced by disabling service topology filter": { + enableResourceFilter: true, + disabledResourceFilters: []string{"servicetopology"}, + enableDummyIf: true, + userAgent: "kube-proxy", + verb: "GET", + path: "/api/v1/endpoints", + hasSynced: true, + }, + "not synced by setting service topology filter": { + enableResourceFilter: true, + enableDummyIf: false, + userAgent: "kube-proxy", + verb: "GET", + path: "/api/v1/endpoints", + hasSynced: false, + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + options := &options.YurtHubOptions{ + EnableResourceFilter: tc.enableResourceFilter, + WorkingMode: tc.workingMode, + DisabledResourceFilters: make([]string, 0), + EnableDummyIf: tc.enableDummyIf, + NodeName: "test", + YurtHubProxySecurePort: 10268, + HubAgentDummyIfIP: "127.0.0.1", + YurtHubProxyHost: "127.0.0.1", + } + options.DisabledResourceFilters = append(options.DisabledResourceFilters, tc.disabledResourceFilters...) + sharedFactory, nodePoolFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour), + dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, 24*time.Hour) + configManager := configuration.NewConfigurationManager(options.NodeName, sharedFactory) + + finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager, configManager) + hasSynced := finder.HasSynced() + if hasSynced != tc.hasSynced { + t.Errorf("expect synced result: %v, but got %v", tc.hasSynced, hasSynced) + } + }) + } + +} diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index 7b2eb1eb5ec..95c17ec2d0b 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -72,6 +72,18 @@ func (stf *serviceTopologyFilter) Name() string { return FilterName } +func (stf *serviceTopologyFilter) HasSynced() bool { + if stf.nodesSynced == nil || stf.serviceSynced == nil { + return false + } + + if !stf.nodesSynced() || !stf.serviceSynced() { + return false + } + + return true +} + func (stf *serviceTopologyFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { stf.serviceLister = factory.Core().V1().Services().Lister() stf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced @@ -117,10 +129,6 @@ func (stf *serviceTopologyFilter) resolveNodePoolName() string { } func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { - if ok := cache.WaitForCacheSync(stopCh, stf.serviceSynced, stf.nodesSynced); !ok { - return obj - } - switch v := obj.(type) { case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice: return stf.serviceTopologyHandler(v) diff --git a/pkg/yurthub/metrics/metrics.go b/pkg/yurthub/metrics/metrics.go index 00c8cf50679..91577b8b100 100644 --- a/pkg/yurthub/metrics/metrics.go +++ b/pkg/yurthub/metrics/metrics.go @@ -44,15 +44,18 @@ var ( ) type HubMetrics struct { - serversHealthyCollector *prometheus.GaugeVec - inFlightRequestsCollector *prometheus.GaugeVec - inFlightRequestsGauge prometheus.Gauge - rejectedRequestsCounter prometheus.Counter - closableConnsCollector *prometheus.GaugeVec - proxyTrafficCollector *prometheus.CounterVec - proxyLatencyCollector *prometheus.GaugeVec - errorKeysPersistencyStatusCollector prometheus.Gauge - errorKeysCountCollector prometheus.Gauge + serversHealthyCollector *prometheus.GaugeVec + inFlightRequestsCollector *prometheus.GaugeVec + inFlightRequestsGauge prometheus.Gauge + rejectedRequestsCounter prometheus.Counter + inFlightMultiplexerRequestsCollector *prometheus.GaugeVec + inFlightMultiplexerRequestsGauge prometheus.Gauge + rejectedMultiplexerRequestsCounter prometheus.Counter + closableConnsCollector *prometheus.GaugeVec + proxyTrafficCollector *prometheus.CounterVec + proxyLatencyCollector *prometheus.GaugeVec + errorKeysPersistencyStatusCollector prometheus.Gauge + errorKeysCountCollector prometheus.Gauge } func newHubMetrics() *HubMetrics { @@ -86,6 +89,28 @@ func newHubMetrics() *HubMetrics { Name: "rejected_requests_counter", Help: "counter of rejected requests for exceeding in flight limit in hub agent", }) + inFlightMultiplexerRequestsCollector := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "in_flight_multiplexer_requests_collector", + Help: "collector of in flight requests handling by multiplexer manager", + }, + []string{"verb", "resource", "subresources", "client"}) + inFlightMultiplexerRequestsGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "in_flight_multiplexer_requests_total", + Help: "total of in flight requests handling by multiplexer manager", + }) + rejectedMultiplexerRequestsCounter := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rejected_multiplexer_requests_counter", + Help: "counter of rejected multiplexer requests for exceeding in flight limit in hub agent", + }) closableConnsCollector := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -128,21 +153,27 @@ func newHubMetrics() *HubMetrics { prometheus.MustRegister(inFlightRequestsCollector) prometheus.MustRegister(inFlightRequestsGauge) prometheus.MustRegister(rejectedRequestsCounter) + prometheus.MustRegister(inFlightMultiplexerRequestsCollector) + prometheus.MustRegister(inFlightMultiplexerRequestsGauge) + prometheus.MustRegister(rejectedMultiplexerRequestsCounter) prometheus.MustRegister(closableConnsCollector) prometheus.MustRegister(proxyTrafficCollector) prometheus.MustRegister(proxyLatencyCollector) prometheus.MustRegister(errorKeysPersistencyStatusCollector) prometheus.MustRegister(errorKeysCountCollector) return &HubMetrics{ - serversHealthyCollector: serversHealthyCollector, - inFlightRequestsCollector: inFlightRequestsCollector, - inFlightRequestsGauge: inFlightRequestsGauge, - rejectedRequestsCounter: rejectedRequestsCounter, - closableConnsCollector: closableConnsCollector, - proxyTrafficCollector: proxyTrafficCollector, - proxyLatencyCollector: proxyLatencyCollector, - errorKeysPersistencyStatusCollector: errorKeysPersistencyStatusCollector, - errorKeysCountCollector: errorKeysCountCollector, + serversHealthyCollector: serversHealthyCollector, + inFlightRequestsCollector: inFlightRequestsCollector, + inFlightRequestsGauge: inFlightRequestsGauge, + rejectedRequestsCounter: rejectedRequestsCounter, + inFlightMultiplexerRequestsCollector: inFlightMultiplexerRequestsCollector, + inFlightMultiplexerRequestsGauge: inFlightMultiplexerRequestsGauge, + rejectedMultiplexerRequestsCounter: rejectedMultiplexerRequestsCounter, + closableConnsCollector: closableConnsCollector, + proxyTrafficCollector: proxyTrafficCollector, + proxyLatencyCollector: proxyLatencyCollector, + errorKeysPersistencyStatusCollector: errorKeysPersistencyStatusCollector, + errorKeysCountCollector: errorKeysCountCollector, } } @@ -150,6 +181,8 @@ func (hm *HubMetrics) Reset() { hm.serversHealthyCollector.Reset() hm.inFlightRequestsCollector.Reset() hm.inFlightRequestsGauge.Set(float64(0)) + hm.inFlightMultiplexerRequestsCollector.Reset() + hm.inFlightMultiplexerRequestsGauge.Set(float64(0)) hm.closableConnsCollector.Reset() hm.proxyTrafficCollector.Reset() hm.proxyLatencyCollector.Reset() @@ -171,10 +204,24 @@ func (hm *HubMetrics) DecInFlightRequests(verb, resource, subresource, client st hm.inFlightRequestsGauge.Dec() } +func (hm *HubMetrics) IncInFlightMultiplexerRequests(verb, resource, subresource, client string) { + hm.inFlightMultiplexerRequestsCollector.WithLabelValues(verb, resource, subresource, client).Inc() + hm.inFlightMultiplexerRequestsGauge.Inc() +} + +func (hm *HubMetrics) DecInFlightMultiplexerRequests(verb, resource, subresource, client string) { + hm.inFlightMultiplexerRequestsCollector.WithLabelValues(verb, resource, subresource, client).Dec() + hm.inFlightMultiplexerRequestsGauge.Dec() +} + func (hm *HubMetrics) IncRejectedRequestCounter() { hm.rejectedRequestsCounter.Inc() } +func (hm *HubMetrics) IncRejectedMultiplexerRequestCounter() { + hm.rejectedMultiplexerRequestsCounter.Inc() +} + func (hm *HubMetrics) IncClosableConns(server string) { hm.closableConnsCollector.WithLabelValues(server).Inc() } diff --git a/pkg/yurthub/multiplexer/fake_multiplexer.go b/pkg/yurthub/multiplexer/fake_multiplexer.go deleted file mode 100644 index 3e8240bfddd..00000000000 --- a/pkg/yurthub/multiplexer/fake_multiplexer.go +++ /dev/null @@ -1,39 +0,0 @@ -/* -Copyright 2024 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package multiplexer - -import "k8s.io/apimachinery/pkg/runtime/schema" - -type FakeCacheManager struct { - cacheMap map[string]Interface - resourceConfigMap map[string]*ResourceCacheConfig -} - -func NewFakeCacheManager(cacheMap map[string]Interface, resourceConfigMap map[string]*ResourceCacheConfig) *FakeCacheManager { - return &FakeCacheManager{ - cacheMap: cacheMap, - resourceConfigMap: resourceConfigMap, - } -} - -func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { - return fcm.resourceConfigMap[gvr.String()], nil -} - -func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { - return fcm.cacheMap[gvr.String()], nil, nil -} diff --git a/pkg/yurthub/multiplexer/multiplexer.go b/pkg/yurthub/multiplexer/multiplexer.go index c442ee0b3d0..7fbce25cbae 100644 --- a/pkg/yurthub/multiplexer/multiplexer.go +++ b/pkg/yurthub/multiplexer/multiplexer.go @@ -17,6 +17,7 @@ limitations under the License. package multiplexer import ( + "fmt" "sync" "github.com/pkg/errors" @@ -26,9 +27,11 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" - kmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" ) @@ -71,67 +74,97 @@ var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { return labels.Set(metadata.GetLabels()), fieldSet, nil } -type MultiplexerManager interface { - ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) - ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) +type MultiplexerManager struct { + restStoreProvider ystorage.StorageProvider + restMapper *hubmeta.RESTMapperManager + poolScopeMetadatas sets.Set[string] + + cacheLock sync.RWMutex + lazyLoadedGVRCache map[string]Interface + lazyLoadedGVRCacheDestroyFunc map[string]func() } -type multiplexerManager struct { - restStoreManager ystorage.StorageProvider - restMapper meta.RESTMapper +func NewRequestMultiplexerManager( + restStoreProvider ystorage.StorageProvider, + restMapperMgr *hubmeta.RESTMapperManager, + poolScopeResources []schema.GroupVersionResource) *MultiplexerManager { - cacheLock sync.RWMutex - gvrToCache map[string]Interface - gvrToCacheDestroyFunc map[string]func() + poolScopeMetadatas := sets.New[string]() + for i := range poolScopeResources { + poolScopeMetadatas.Insert(poolScopeResources[i].String()) + } + klog.Infof("pool scope resources: %v", poolScopeMetadatas) - cacheConfigLock sync.RWMutex - gvrToCacheConfig map[string]*ResourceCacheConfig + return &MultiplexerManager{ + restStoreProvider: restStoreProvider, + restMapper: restMapperMgr, + poolScopeMetadatas: poolScopeMetadatas, + lazyLoadedGVRCache: make(map[string]Interface), + lazyLoadedGVRCacheDestroyFunc: make(map[string]func()), + cacheLock: sync.RWMutex{}, + } } -func NewRequestsMultiplexerManager( - restStoreManager ystorage.StorageProvider) MultiplexerManager { +func (m *MultiplexerManager) IsPoolScopeMetadata(gvr *schema.GroupVersionResource) bool { + return m.poolScopeMetadatas.Has(gvr.String()) +} - return &multiplexerManager{ - restStoreManager: restStoreManager, - restMapper: kmeta.NewDefaultRESTMapperFromScheme(), - gvrToCache: make(map[string]Interface), - gvrToCacheConfig: make(map[string]*ResourceCacheConfig), - gvrToCacheDestroyFunc: make(map[string]func()), - cacheLock: sync.RWMutex{}, - cacheConfigLock: sync.RWMutex{}, +func (m *MultiplexerManager) Ready(gvr *schema.GroupVersionResource) bool { + rc, _, err := m.ResourceCache(gvr) + if err != nil { + klog.Errorf("failed to get resource cache for gvr %s, %v", gvr.String(), err) + return false } + + return rc.ReadinessCheck() == nil } -func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { - if config, ok := m.tryGetResourceCacheConfig(gvr); ok { - return config, nil + +// ResourceCache is used for preparing cache for specified gvr. +// The cache is loaded in a lazy mode, this means cache will not be loaded when yurthub initializes, +// and cache will only be loaded when corresponding request is received. +func (m *MultiplexerManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { + m.cacheLock.Lock() + defer m.cacheLock.Unlock() + + if rc, ok := m.lazyLoadedGVRCache[gvr.String()]; ok { + return rc, m.lazyLoadedGVRCacheDestroyFunc[gvr.String()], nil } - gvk, listGVK, err := m.convertToGVK(gvr) + klog.Infof("start initializing multiplexer cache for gvr: %s", gvr.String()) + restStore, err := m.restStoreProvider.ResourceStorage(gvr) if err != nil { - return nil, errors.Wrapf(err, "failed to convert to gvk from gvr %s", gvr.String()) + return nil, nil, errors.Wrapf(err, "failed to get rest store") + } + + resourceCacheConfig, err := m.resourceCacheConfig(gvr) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to generate resource cache config") } - config := m.newResourceCacheConfig(gvk, listGVK) + rc, destroy, err := NewResourceCache(restStore, gvr, resourceCacheConfig) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to new resource cache") + } - m.saveResourceCacheConfig(gvr, config) - return config, nil -} + m.lazyLoadedGVRCache[gvr.String()] = rc + m.lazyLoadedGVRCacheDestroyFunc[gvr.String()] = destroy -func (m *multiplexerManager) tryGetResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, bool) { - m.cacheConfigLock.RLock() - defer m.cacheConfigLock.RUnlock() + return rc, destroy, nil +} - if config, ok := m.gvrToCacheConfig[gvr.String()]; ok { - return config, true +func (m *MultiplexerManager) resourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { + gvk, listGVK, err := m.convertToGVK(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to convert to gvk from gvr %s", gvr.String()) } - return nil, false + return m.newResourceCacheConfig(gvk, listGVK), nil } -func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, schema.GroupVersionKind, error) { - gvk, err := m.restMapper.KindFor(*gvr) - if err != nil { - return schema.GroupVersionKind{}, schema.GroupVersionKind{}, errors.Wrapf(err, "failed to convert gvk from gvr %s", gvr.String()) +func (m *MultiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, schema.GroupVersionKind, error) { + _, gvk := m.restMapper.KindFor(*gvr) + if gvk.Empty() { + return schema.GroupVersionKind{}, schema.GroupVersionKind{}, fmt.Errorf("failed to convert gvk from gvr %s", gvr.String()) } listGvk := schema.GroupVersionKind{ @@ -143,9 +176,9 @@ func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (sch return gvk, listGvk, nil } -func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, +func (m *MultiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, listGVK schema.GroupVersionKind) *ResourceCacheConfig { - resourceCacheConfig := &ResourceCacheConfig{ + return &ResourceCacheConfig{ NewFunc: func() runtime.Object { obj, _ := scheme.Scheme.New(gvk) return obj @@ -157,56 +190,4 @@ func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, KeyFunc: KeyFunc, GetAttrsFunc: AttrsFunc, } - - return resourceCacheConfig -} - -func (m *multiplexerManager) saveResourceCacheConfig(gvr *schema.GroupVersionResource, config *ResourceCacheConfig) { - m.cacheConfigLock.Lock() - defer m.cacheConfigLock.Unlock() - - m.gvrToCacheConfig[gvr.String()] = config -} - -func (m *multiplexerManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { - if sc, destroy, ok := m.tryGetResourceCache(gvr); ok { - return sc, destroy, nil - } - - restStore, err := m.restStoreManager.ResourceStorage(gvr) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to get rest store") - } - - resourceCacheConfig, err := m.ResourceCacheConfig(gvr) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to generate resource cache config") - } - - sc, destroy, err := NewResourceCache(restStore, gvr, resourceCacheConfig) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to new resource cache") - } - - m.saveResourceCache(gvr, sc, destroy) - - return sc, destroy, nil -} - -func (m *multiplexerManager) tryGetResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), bool) { - m.cacheLock.RLock() - defer m.cacheLock.RUnlock() - - if sc, ok := m.gvrToCache[gvr.String()]; ok { - return sc, m.gvrToCacheDestroyFunc[gvr.String()], true - } - return nil, nil, false -} - -func (m *multiplexerManager) saveResourceCache(gvr *schema.GroupVersionResource, sc Interface, destroy func()) { - m.cacheLock.Lock() - defer m.cacheLock.Unlock() - - m.gvrToCache[gvr.String()] = sc - m.gvrToCacheDestroyFunc[gvr.String()] = destroy } diff --git a/pkg/yurthub/multiplexer/multiplexer_test.go b/pkg/yurthub/multiplexer/multiplexer_test.go index ea39e9f4a01..c922063696a 100644 --- a/pkg/yurthub/multiplexer/multiplexer_test.go +++ b/pkg/yurthub/multiplexer/multiplexer_test.go @@ -18,112 +18,21 @@ package multiplexer import ( "context" - "reflect" + "os" "testing" "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" kstorage "k8s.io/apiserver/pkg/storage" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" ) -func TestShareCacheManager_ResourceCacheConfig(t *testing.T) { - svcStorage := storage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) - storageMap := map[string]kstorage.Interface{ - serviceGVR.String(): svcStorage, - } - - sm := NewRequestsMultiplexerManager( - storage.NewDummyStorageManager(storageMap)) - - for _, tc := range []struct { - tname string - gvr *schema.GroupVersionResource - obj runtime.Object - expectedKey string - expectedObjType string - expectedObjListType string - expectedFieldSet fields.Set - namespaceScoped bool - }{ - { - "generate resource config for services", - &schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "services", - }, - newService(metav1.NamespaceSystem, "coredns"), - "/kube-system/coredns", - "Service", - "ServiceList", - fields.Set{ - "metadata.name": "coredns", - "metadata.namespace": "kube-system", - }, - true, - }, - { - "generate resource config for endpointslices", - &schema.GroupVersionResource{ - Group: "discovery.k8s.io", - Version: "v1", - Resource: "endpointslices", - }, - newEndpointSlice(), - "/kube-system/coredns-12345", - "EndpointSlice", - "EndpointSliceList", - fields.Set{ - "metadata.name": "coredns-12345", - "metadata.namespace": "kube-system", - }, - true, - }, - { - "generate resource config for nodes", - &schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "nodes", - }, - newNode(), - "/test", - "Node", - "NodeList", - fields.Set{ - "metadata.name": "test", - }, - false, - }, - } { - t.Run(tc.tname, func(t *testing.T) { - rc, err := sm.ResourceCacheConfig(tc.gvr) - - assert.Nil(t, err) - - key, _ := rc.KeyFunc(tc.obj) - assert.Equal(t, tc.expectedKey, key) - - obj := rc.NewFunc() - assert.Equal(t, tc.expectedObjType, reflect.TypeOf(obj).Elem().Name()) - - objList := rc.NewListFunc() - assert.Equal(t, tc.expectedObjListType, reflect.TypeOf(objList).Elem().Name()) - - _, fieldSet, _ := rc.GetAttrsFunc(tc.obj) - assert.Equal(t, tc.expectedFieldSet, fieldSet) - }) - } -} func newService(namespace, name string) *v1.Service { return &v1.Service{ TypeMeta: metav1.TypeMeta{ @@ -137,36 +46,6 @@ func newService(namespace, name string) *v1.Service { } } -func newEndpointSlice() *discovery.EndpointSlice { - return &discovery.EndpointSlice{ - TypeMeta: metav1.TypeMeta{ - Kind: "EndpointSlice", - APIVersion: "discovery.k8s.io/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: "kube-system", - Name: "coredns-12345", - }, - Endpoints: []discovery.Endpoint{ - { - Addresses: []string{"192.168.0.10"}, - }, - }, - } -} - -func newNode() *v1.Node { - return &v1.Node{ - TypeMeta: metav1.TypeMeta{ - Kind: "Node", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - } -} - func TestShareCacheManager_ResourceCache(t *testing.T) { svcStorage := storage.NewFakeServiceStorage( []v1.Service{ @@ -177,9 +56,20 @@ func TestShareCacheManager_ResourceCache(t *testing.T) { storageMap := map[string]kstorage.Interface{ serviceGVR.String(): svcStorage, } - dsm := storage.NewDummyStorageManager(storageMap) - scm := NewRequestsMultiplexerManager(dsm) + + tmpDir, err := os.MkdirTemp("", "test") + if err != nil { + t.Fatalf("failed to make temp dir, %v", err) + } + restMapperManager, _ := meta.NewRESTMapperManager(tmpDir) + + poolScopeResources := []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + } + + scm := NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) cache, _, _ := scm.ResourceCache(serviceGVR) wait.PollUntilContextCancel(context.Background(), 100*time.Millisecond, true, func(context.Context) (done bool, err error) { if cache.ReadinessCheck() == nil { @@ -189,7 +79,7 @@ func TestShareCacheManager_ResourceCache(t *testing.T) { }) serviceList := &v1.ServiceList{} - err := cache.GetList(context.Background(), "", mockListOptions(), serviceList) + err = cache.GetList(context.Background(), "", mockListOptions(), serviceList) assert.Nil(t, err) assert.Equal(t, []v1.Service{ diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage.go b/pkg/yurthub/multiplexer/storage/api_server_storage.go index a615b92c4a7..150cf5a01ab 100644 --- a/pkg/yurthub/multiplexer/storage/api_server_storage.go +++ b/pkg/yurthub/multiplexer/storage/api_server_storage.go @@ -19,6 +19,7 @@ package storage import ( "context" "math/rand" + "time" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,7 +30,7 @@ import ( "k8s.io/client-go/rest" ) -const minWatchRequestSeconds = 300 +const minWatchTimeout = 5 * time.Minute var ErrNoSupport = errors.New("Don't Support Method ") @@ -57,12 +58,13 @@ func (rs *apiServerStorage) GetList(ctx context.Context, key string, opts storag } func (rs *apiServerStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - timeoutSeconds := int64(float64(minWatchRequestSeconds) * (rand.Float64() + 1.0)) + timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) listOpts := &metav1.ListOptions{ - ResourceVersion: opts.ResourceVersion, - Watch: true, - TimeoutSeconds: &timeoutSeconds, + ResourceVersion: opts.ResourceVersion, + Watch: true, + TimeoutSeconds: &timeoutSeconds, + AllowWatchBookmarks: true, } w, err := rs.restClient.Get().Resource(rs.resource).VersionedParams(listOpts, scheme.ParameterCodec).Watch(ctx) diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go b/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go index 81e0b14fdbf..13e205a2ad7 100644 --- a/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go +++ b/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go @@ -33,7 +33,7 @@ type apiServerStorageProvider struct { gvrToStorage map[string]storage.Interface } -func NewStorageManager(config *rest.Config) StorageProvider { +func NewStorageProvider(config *rest.Config) StorageProvider { config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() return &apiServerStorageProvider{ config: config, diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go b/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go index 90aaabdc187..10b91f0c01f 100644 --- a/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go +++ b/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go @@ -38,28 +38,25 @@ var endpointSlicesGVR = &schema.GroupVersionResource{ } func TestStorageManager_ResourceStorage(t *testing.T) { - sm := NewStorageManager(&rest.Config{ + sm := NewStorageProvider(&rest.Config{ Host: "http://127.0.0.1:10261", UserAgent: "share-hub", }) - for _, tc := range []struct { - tName string - gvr *schema.GroupVersionResource - Err error + for k, tc := range map[string]struct { + gvr *schema.GroupVersionResource + err error }{ - { - "get resource storage for services", - serviceGVR, - nil, + "get resource storage for services": { + gvr: serviceGVR, + err: nil, }, - { - "get resource storage for endpouintslices", - endpointSlicesGVR, - nil, + "get resource storage for endpouintslices": { + gvr: endpointSlicesGVR, + err: nil, }, } { - t.Run(tc.tName, func(t *testing.T) { + t.Run(k, func(t *testing.T) { restore, err := sm.ResourceStorage(tc.gvr) assert.Nil(t, err) diff --git a/pkg/yurthub/multiplexer/storage/fake_storage.go b/pkg/yurthub/multiplexer/storage/fake_storage.go index 0dd47c1c429..830ad39e343 100644 --- a/pkg/yurthub/multiplexer/storage/fake_storage.go +++ b/pkg/yurthub/multiplexer/storage/fake_storage.go @@ -60,6 +60,10 @@ func (fs *CommonFakeStorage) Count(key string) (int64, error) { return 0, nil } +func (fs *CommonFakeStorage) ReadinessCheck() error { + return nil +} + func (fs *CommonFakeStorage) RequestWatchProgress(ctx context.Context) error { return nil } @@ -96,10 +100,6 @@ func (fs *FakeServiceStorage) AddWatchObject(svc *v1.Service) { fs.watcher.Add(svc) } -func (fs *FakeServiceStorage) ReadinessCheck() error { - return nil -} - type FakeEndpointSliceStorage struct { *CommonFakeStorage items []discovery.EndpointSlice @@ -137,7 +137,3 @@ func (fs *FakeEndpointSliceStorage) AddWatchObject(eps *discovery.EndpointSlice) eps.ResourceVersion = "101" fs.watcher.Add(eps) } - -func (fs *FakeEndpointSliceStorage) ReadinessCheck() error { - return nil -} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerlist.go b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go index cab7a38d16a..a8c56ab063e 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerlist.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go @@ -18,6 +18,7 @@ limitations under the License. package multiplexer import ( + "fmt" "net/http" "github.com/pkg/errors" @@ -27,6 +28,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic/registry" kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" yurtutil "github.com/openyurtio/openyurt/pkg/util" @@ -47,7 +49,7 @@ func (sp *multiplexerProxy) multiplexerList(w http.ResponseWriter, r *http.Reque return } - storageOpts, err := sp.storageOpts(listOpts, gvr) + storageOpts, err := sp.storageOpts(listOpts) if err != nil { util.Err(err, w, r) return @@ -68,7 +70,16 @@ func (sp *multiplexerProxy) listObject(r *http.Request, gvr *schema.GroupVersion return nil, errors.Wrap(err, "failed to get resource cache") } - obj, err := sp.newListObject(gvr) + _, gvk := sp.restMapperManager.KindFor(*gvr) + if gvk.Empty() { + return nil, fmt.Errorf("list object: failed to get gvk for gvr %v", gvr) + } + + obj, err := scheme.Scheme.New(schema.GroupVersionKind{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind + "List", + }) if err != nil { return nil, errors.Wrapf(err, "failed to new list object") } @@ -82,26 +93,15 @@ func (sp *multiplexerProxy) listObject(r *http.Request, gvr *schema.GroupVersion return nil, errors.Wrapf(err, "failed to get list from cache") } - if !yurtutil.IsNil(sp.filterFinder) { - if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists { - if obj, err = sp.filterListObject(obj, objectFilter); err != nil { - return nil, errors.Wrapf(err, "failed to filter list object") - } + if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists { + if obj, err = sp.filterListObject(obj, objectFilter); err != nil { + return nil, errors.Wrapf(err, "failed to filter list object") } } return obj, nil } -func (sp *multiplexerProxy) newListObject(gvr *schema.GroupVersionResource) (runtime.Object, error) { - rcc, err := sp.requestsMultiplexerManager.ResourceCacheConfig(gvr) - if err != nil { - return nil, errors.Wrapf(err, "failed to get resource cache config") - } - - return rcc.NewListFunc(), nil -} - func (sp *multiplexerProxy) getCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { if ns := sp.getNamespace(r); len(ns) > 0 { return sp.getNamespaceScopedCacheKey(r, storageOpts) diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go index fa20768a24d..5969c90c991 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go @@ -18,10 +18,11 @@ limitations under the License. package multiplexer import ( + "fmt" "net/http" "github.com/pkg/errors" - kerrors "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" @@ -37,42 +38,45 @@ import ( "k8s.io/apiserver/pkg/registry/rest" kstorage "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) type multiplexerProxy struct { - requestsMultiplexerManager multiplexer.MultiplexerManager + requestsMultiplexerManager *multiplexer.MultiplexerManager filterFinder filter.FilterFinder + restMapperManager *hubmeta.RESTMapperManager stop <-chan struct{} } func NewMultiplexerProxy(filterFinder filter.FilterFinder, - cacheManager multiplexer.MultiplexerManager, - multiplexerResources []schema.GroupVersionResource, - stop <-chan struct{}) (*multiplexerProxy, error) { - - sp := &multiplexerProxy{ + multiplexerManager *multiplexer.MultiplexerManager, + restMapperMgr *hubmeta.RESTMapperManager, + stop <-chan struct{}) http.Handler { + return &multiplexerProxy{ stop: stop, - requestsMultiplexerManager: cacheManager, + requestsMultiplexerManager: multiplexerManager, filterFinder: filterFinder, + restMapperManager: restMapperMgr, } - - for _, gvr := range multiplexerResources { - if _, _, err := sp.requestsMultiplexerManager.ResourceCache(&gvr); err != nil { - return sp, errors.Wrapf(err, "failed to init resource cache for %s", gvr.String()) - } - } - - return sp, nil } func (sp *multiplexerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { reqInfo, _ := request.RequestInfoFrom(r.Context()) - gvr := sp.getRequestGVR(reqInfo) + gvr := &schema.GroupVersionResource{ + Group: reqInfo.APIGroup, + Version: reqInfo.APIVersion, + Resource: reqInfo.Resource, + } + + if !sp.requestsMultiplexerManager.Ready(gvr) { + w.Header().Set("Retry-After", "1") + util.Err(apierrors.NewTooManyRequestsError(fmt.Sprintf("cacher for gvr(%s) is initializing, please try again later.", gvr.String())), w, r) + return + } switch reqInfo.Verb { case "list": @@ -84,18 +88,10 @@ func (sp *multiplexerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (sp *multiplexerProxy) getRequestGVR(reqInfo *request.RequestInfo) *schema.GroupVersionResource { - return &schema.GroupVersionResource{ - Group: reqInfo.APIGroup, - Version: reqInfo.APIVersion, - Resource: reqInfo.Resource, - } -} - func (sp *multiplexerProxy) getReqScope(gvr *schema.GroupVersionResource) (*handlers.RequestScope, error) { - fqKindToRegister, err := sp.findKind(gvr) - if err != nil { - return nil, err + _, fqKindToRegister := sp.restMapperManager.KindFor(*gvr) + if fqKindToRegister.Empty() { + return nil, fmt.Errorf("gvk is not found for gvr: %v", *gvr) } return &handlers.RequestScope{ @@ -127,33 +123,13 @@ func (sp *multiplexerProxy) getReqScope(gvr *schema.GroupVersionResource) (*hand }, nil } -func (sp *multiplexerProxy) findKind(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, error) { - object, err := sp.newListObject(gvr) - if err != nil { - return schema.GroupVersionKind{}, errors.Wrapf(err, "failed to new list object") - } - - fqKinds, _, err := scheme.Scheme.ObjectKinds(object) - if err != nil { - return schema.GroupVersionKind{}, err - } - - for _, fqKind := range fqKinds { - if fqKind.Group == gvr.Group { - return fqKind, nil - } - } - - return schema.GroupVersionKind{}, nil -} - func (sp *multiplexerProxy) decodeListOptions(req *http.Request, scope *handlers.RequestScope) (opts metainternalversion.ListOptions, err error) { if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil { return opts, err } if errs := validation.ValidateListOptions(&opts, false); len(errs) > 0 { - err := kerrors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs) + err := apierrors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs) return opts, err } @@ -162,7 +138,7 @@ func (sp *multiplexerProxy) decodeListOptions(req *http.Request, scope *handlers return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value) } if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil { - return opts, kerrors.NewBadRequest(err.Error()) + return opts, apierrors.NewBadRequest(err.Error()) } } @@ -177,7 +153,7 @@ func (sp *multiplexerProxy) decodeListOptions(req *http.Request, scope *handlers if opts.FieldSelector != nil && !opts.FieldSelector.Empty() { selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name") if !ok || name != selectedName { - return opts, kerrors.NewBadRequest("fieldSelector metadata.name doesn't match requested name") + return opts, apierrors.NewBadRequest("fieldSelector metadata.name doesn't match requested name") } } else { opts.FieldSelector = nameSelector @@ -187,8 +163,8 @@ func (sp *multiplexerProxy) decodeListOptions(req *http.Request, scope *handlers return opts, nil } -func (sp *multiplexerProxy) storageOpts(listOpts metainternalversion.ListOptions, gvr *schema.GroupVersionResource) (*kstorage.ListOptions, error) { - p := sp.selectionPredicate(listOpts, gvr) +func (sp *multiplexerProxy) storageOpts(listOpts metainternalversion.ListOptions) (*kstorage.ListOptions, error) { + p := sp.selectionPredicate(listOpts) return &kstorage.ListOptions{ ResourceVersion: getResourceVersion(listOpts), @@ -199,7 +175,7 @@ func (sp *multiplexerProxy) storageOpts(listOpts metainternalversion.ListOptions }, nil } -func (sp *multiplexerProxy) selectionPredicate(listOpts metainternalversion.ListOptions, gvr *schema.GroupVersionResource) kstorage.SelectionPredicate { +func (sp *multiplexerProxy) selectionPredicate(listOpts metainternalversion.ListOptions) kstorage.SelectionPredicate { label := labels.Everything() if listOpts.LabelSelector != nil { label = listOpts.LabelSelector @@ -215,7 +191,7 @@ func (sp *multiplexerProxy) selectionPredicate(listOpts metainternalversion.List Field: field, Limit: listOpts.Limit, Continue: listOpts.Continue, - GetAttrs: sp.getAttrFunc(gvr), + GetAttrs: multiplexer.AttrsFunc, AllowWatchBookmarks: listOpts.AllowWatchBookmarks, } } @@ -233,13 +209,3 @@ func isRecursive(p kstorage.SelectionPredicate) bool { } return true } - -func (sp *multiplexerProxy) getAttrFunc(gvr *schema.GroupVersionResource) kstorage.AttrFunc { - rcc, err := sp.requestsMultiplexerManager.ResourceCacheConfig(gvr) - if err != nil { - klog.Errorf("failed to get cache config for %v, error: %v", gvr, err) - return nil - } - - return rcc.GetAttrsFunc -} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go index 10ff9f4b7f1..7c64bfa49bd 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go @@ -19,8 +19,11 @@ package multiplexer import ( "bytes" + "fmt" "net/http" "net/http/httptest" + "os" + "sort" "testing" "time" @@ -32,11 +35,14 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" - "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" + multiplexerstorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" ctesting "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer/testing" ) @@ -61,9 +67,9 @@ var mockEndpoints = []discovery.Endpoint{ }, } -func mockCacheMap() map[string]multiplexer.Interface { - return map[string]multiplexer.Interface{ - endpointSliceGVR.String(): storage.NewFakeEndpointSliceStorage( +func mockCacheMap() map[string]storage.Interface { + return map[string]storage.Interface{ + endpointSliceGVR.String(): multiplexerstorage.NewFakeEndpointSliceStorage( []discovery.EndpointSlice{ *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", mockEndpoints), *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", mockEndpoints), @@ -72,21 +78,6 @@ func mockCacheMap() map[string]multiplexer.Interface { } } -func mockResourceCacheMap() map[string]*multiplexer.ResourceCacheConfig { - return map[string]*multiplexer.ResourceCacheConfig{ - endpointSliceGVR.String(): { - KeyFunc: multiplexer.KeyFunc, - NewListFunc: func() runtime.Object { - return &discovery.EndpointSliceList{} - }, - NewFunc: func() runtime.Object { - return &discovery.EndpointSlice{} - }, - GetAttrsFunc: multiplexer.AttrsFunc, - }, - } -} - func newEndpointSlice(namespace string, name string, resourceVersion string, endpoints []discovery.Endpoint) *discovery.EndpointSlice { return &discovery.EndpointSlice{ TypeMeta: metav1.TypeMeta{ @@ -114,55 +105,76 @@ func (wr *wrapResponse) Write(buf []byte) (int, error) { } func TestShareProxy_ServeHTTP_LIST(t *testing.T) { - for _, tc := range []struct { - tName string - filterManager filter.FilterFinder + tmpDir, err := os.MkdirTemp("", "test") + if err != nil { + t.Fatalf("failed to make temp dir, %v", err) + } + restMapperManager, _ := meta.NewRESTMapperManager(tmpDir) + + poolScopeResources := []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + } + + for k, tc := range map[string]struct { + filterFinder filter.FilterFinder url string expectedEndPointSliceList *discovery.EndpointSliceList err error }{ - { - "test list endpoint slices no filter", - &ctesting.EmptyFilterManager{}, - "/apis/discovery.k8s.io/v1/endpointslices", - expectEndpointSliceListNoFilter(), - - nil, + "test list endpoint slices no filter": { + filterFinder: &ctesting.EmptyFilterManager{}, + url: "/apis/discovery.k8s.io/v1/endpointslices", + expectedEndPointSliceList: expectEndpointSliceListNoFilter(), + err: nil, }, - { - "test list endpoint slice with filter", - &ctesting.FakeEndpointSliceFilter{ + "test list endpoint slice with filter": { + filterFinder: &ctesting.FakeEndpointSliceFilter{ NodeName: "node1", }, - "/apis/discovery.k8s.io/v1/endpointslices", - expectEndpointSliceListWithFilter(), - nil, + url: "/apis/discovery.k8s.io/v1/endpointslices", + expectedEndPointSliceList: expectEndpointSliceListWithFilter(), + err: nil, }, - { - "test list endpoint slice with namespace", - &ctesting.FakeEndpointSliceFilter{ + "test list endpoint slice with namespace": { + filterFinder: &ctesting.FakeEndpointSliceFilter{ NodeName: "node1", }, - "/apis/discovery.k8s.io/v1/namespaces/default/endpointslices", - expectEndpointSliceListWithNamespace(), - nil, + url: "/apis/discovery.k8s.io/v1/namespaces/default/endpointslices", + expectedEndPointSliceList: expectEndpointSliceListWithNamespace(), + err: nil, }, } { - t.Run(tc.tName, func(t *testing.T) { + t.Run(k, func(t *testing.T) { w := &httptest.ResponseRecorder{ Body: &bytes.Buffer{}, } - sp, err := NewMultiplexerProxy(tc.filterManager, - multiplexer.NewFakeCacheManager(mockCacheMap(), mockResourceCacheMap()), - []schema.GroupVersionResource{endpointSliceGVR}, - make(<-chan struct{})) + dsm := multiplexerstorage.NewDummyStorageManager(mockCacheMap()) + rmm := multiplexer.NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + + informerSynced := func() bool { + return rmm.Ready(&schema.GroupVersionResource{ + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", + }) + } + stopCh := make(chan struct{}) + if ok := cache.WaitForCacheSync(stopCh, informerSynced); !ok { + t.Errorf("configuration manager is not ready") + return + } - assert.Equal(t, tc.err, err) + sp := NewMultiplexerProxy(tc.filterFinder, + rmm, + restMapperManager, + make(<-chan struct{})) sp.ServeHTTP(w, newEndpointSliceListRequest(tc.url)) - assert.Equal(t, string(encodeEndpointSliceList(tc.expectedEndPointSliceList)), w.Body.String()) + result := equalEndpointSliceLists(tc.expectedEndPointSliceList, decodeEndpointSliceList(w.Body.Bytes())) + assert.True(t, result, w.Body.String()) }) } } @@ -258,43 +270,71 @@ func resolverRequestInfo(req *http.Request) *request.RequestInfo { return info } -func encodeEndpointSliceList(endpointSliceList *discovery.EndpointSliceList) []byte { +func decodeEndpointSliceList(b []byte) *discovery.EndpointSliceList { discoveryv1Codec := scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) - str := runtime.EncodeOrDie(discoveryv1Codec, endpointSliceList) - return []byte(str) + epsList := &discovery.EndpointSliceList{} + err := runtime.DecodeInto(discoveryv1Codec, b, epsList) + if err != nil { + return nil + } + return epsList } func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { - for _, tc := range []struct { - tName string - filterManager filter.FilterFinder + tmpDir, err := os.MkdirTemp("", "test") + if err != nil { + t.Fatalf("failed to make temp dir, %v", err) + } + restMapperManager, _ := meta.NewRESTMapperManager(tmpDir) + + poolScopeResources := []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + } + + for k, tc := range map[string]struct { + filterFinder filter.FilterFinder url string expectedWatchEvent *metav1.WatchEvent - Err error + err error }{ - {"test watch endpointslice no filter", - &ctesting.EmptyFilterManager{}, - "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=0&&timeoutSeconds=3", - expectedWatchEventNoFilter(), - nil, + "test watch endpointslice no filter": { + filterFinder: &ctesting.EmptyFilterManager{}, + url: "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=100&&timeoutSeconds=3", + expectedWatchEvent: expectedWatchEventNoFilter(), + err: nil, }, - {"test watch endpointslice with filter", - &ctesting.FakeEndpointSliceFilter{ + "test watch endpointslice with filter": { + filterFinder: &ctesting.FakeEndpointSliceFilter{ NodeName: "node1", }, - "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=0&&timeoutSeconds=3", - expectedWatchEventWithFilter(), - nil, + url: "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=100&&timeoutSeconds=3", + expectedWatchEvent: expectedWatchEventWithFilter(), + err: nil, }, } { - t.Run(tc.tName, func(t *testing.T) { - fcm := multiplexer.NewFakeCacheManager(mockCacheMap(), mockResourceCacheMap()) + t.Run(k, func(t *testing.T) { + dsm := multiplexerstorage.NewDummyStorageManager(mockCacheMap()) + rmm := multiplexer.NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + + informerSynced := func() bool { + return rmm.Ready(&schema.GroupVersionResource{ + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", + }) + } + stopCh := make(chan struct{}) + if ok := cache.WaitForCacheSync(stopCh, informerSynced); !ok { + t.Errorf("configuration manager is not ready") + return + } - sp, _ := NewMultiplexerProxy( - tc.filterManager, - fcm, - []schema.GroupVersionResource{endpointSliceGVR}, + sp := NewMultiplexerProxy( + tc.filterFinder, + rmm, + restMapperManager, make(<-chan struct{}), ) @@ -304,7 +344,7 @@ func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { go func() { sp.ServeHTTP(w, req) }() - generateWatchEvent(fcm) + generateWatchEvent(dsm) assertWatchResp(t, tc.expectedWatchEvent, w) }) @@ -357,11 +397,16 @@ func newWatchResponse() *wrapResponse { } } -func generateWatchEvent(fcm *multiplexer.FakeCacheManager) { - fs, _, _ := fcm.ResourceCache(&endpointSliceGVR) +func generateWatchEvent(sp multiplexerstorage.StorageProvider) { + fs, err := sp.ResourceStorage(&endpointSliceGVR) + if err != nil { + return + } - fess, _ := fs.(*storage.FakeEndpointSliceStorage) - fess.AddWatchObject(newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "102", mockEndpoints)) + fess, ok := fs.(*multiplexerstorage.FakeEndpointSliceStorage) + if ok { + fess.AddWatchObject(newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "102", mockEndpoints)) + } } func assertWatchResp(t testing.TB, expectedWatchEvent *metav1.WatchEvent, w *wrapResponse) { @@ -381,3 +426,62 @@ func encodeWatchEventList(watchEvent *metav1.WatchEvent) []byte { str := runtime.EncodeOrDie(metav1Codec, watchEvent) return []byte(str) } + +func equalEndpointSlice(a, b discovery.EndpointSlice) bool { + if len(a.Endpoints) != len(b.Endpoints) { + return false + } + + countA := make(map[string]int) + for _, endpoint := range a.Endpoints { + key := endpointKey(endpoint) + countA[key]++ + } + + for _, endpoint := range b.Endpoints { + key := endpointKey(endpoint) + if countA[key] == 0 { + return false + } + + countA[key]-- + if countA[key] == 0 { + delete(countA, key) + } + } + + return len(countA) == 0 +} + +func endpointKey(endpoint discovery.Endpoint) string { + return fmt.Sprintf("%v/%s", endpoint.Addresses, *endpoint.NodeName) +} + +func equalEndpointSliceLists(a, b *discovery.EndpointSliceList) bool { + if len(a.Items) != len(b.Items) { + return false + } + + sort.Slice(a.Items, func(i, j int) bool { + return endpointSliceKey(a.Items[i]) < endpointSliceKey(a.Items[j]) + }) + sort.Slice(b.Items, func(i, j int) bool { + return endpointSliceKey(b.Items[i]) < endpointSliceKey(b.Items[j]) + }) + + for i := range a.Items { + if !equalEndpointSlice(a.Items[i], b.Items[i]) { + return false + } + } + return true +} + +func endpointSliceKey(slice discovery.EndpointSlice) string { + keys := make([]string, len(slice.Endpoints)) + for i, endpoint := range slice.Endpoints { + keys[i] = endpointKey(endpoint) + } + sort.Strings(keys) + return fmt.Sprint(keys) +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go index 3263cb23fb6..60580e9bee1 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go @@ -38,7 +38,6 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" - yurtutil "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -76,7 +75,7 @@ func (sp *multiplexerProxy) multiplexerWatch(w http.ResponseWriter, r *http.Requ return } - storageOpts, err := sp.storageOpts(listOpts, gvr) + storageOpts, err := sp.storageOpts(listOpts) if err != nil { util.Err(err, w, r) } @@ -110,11 +109,9 @@ func (sp *multiplexerProxy) multiplexerWatch(w http.ResponseWriter, r *http.Requ } klog.V(3).InfoS("Starting watch", "path", r.URL.Path, "resourceVersion", listOpts.ResourceVersion, "labels", listOpts.LabelSelector, "fields", listOpts.FieldSelector, "timeout", timeout) - if !yurtutil.IsNil(sp.filterFinder) { - if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists { - serveWatch(newFilterWatch(watcher, objectFilter), reqScope, outputMediaType, r, w, timeout) - return - } + if objectFilter, exists := sp.filterFinder.FindObjectFilter(r); exists { + serveWatch(newFilterWatch(watcher, objectFilter), reqScope, outputMediaType, r, w, timeout) + return } serveWatch(watcher, reqScope, outputMediaType, r, w, timeout) } @@ -170,7 +167,7 @@ func serveWatchHandler(watcher watch.Interface, scope *handlers.RequestScope, me // locate the appropriate embedded encoder based on the transform var embeddedEncoder runtime.Encoder - contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req) + contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions) if transform { info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) if !ok { @@ -242,7 +239,7 @@ func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Reque return nil, nil } -func targetEncodingForTransform(scope *handlers.RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) { +func targetEncodingForTransform(scope *handlers.RequestScope, mediaType negotiation.MediaTypeOptions) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) { switch target := mediaType.Convert; { case target == nil: case (target.Kind == "PartialObjectMetadata" || target.Kind == "PartialObjectMetadataList" || target.Kind == "Table") && diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go index e6be9b9952b..1fae9f8e444 100644 --- a/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go @@ -19,7 +19,6 @@ package testing import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" ) type IgnoreEndpointslicesWithNodeName struct { @@ -30,10 +29,6 @@ func (ie *IgnoreEndpointslicesWithNodeName) Name() string { return "ignoreendpointsliceswithname" } -func (ie *IgnoreEndpointslicesWithNodeName) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return nil -} - // Filter is used for filtering runtime object // all filter logic should be located in it. func (ie *IgnoreEndpointslicesWithNodeName) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go index a7cf6cb0ba4..d37e2992400 100644 --- a/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go @@ -33,6 +33,10 @@ func (fm *EmptyFilterManager) FindObjectFilter(req *http.Request) (filter.Object return nil, false } +func (fm *EmptyFilterManager) HasSynced() bool { + return true +} + type FakeEndpointSliceFilter struct { NodeName string } @@ -46,3 +50,7 @@ func (fm *FakeEndpointSliceFilter) FindObjectFilter(req *http.Request) (filter.O fm.NodeName, }, true } + +func (fm *FakeEndpointSliceFilter) HasSynced() bool { + return true +} diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index f614b9fe2cb..998cd75c0a0 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -18,10 +18,10 @@ package proxy import ( "errors" - "fmt" "net/http" + "strings" - "k8s.io/apimachinery/pkg/runtime/schema" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/filters" apirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -29,9 +29,11 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" + basemultiplexer "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/autonomy" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer" @@ -42,20 +44,20 @@ import ( hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -const multiplexerProxyPostHookName = "multiplexerProxy" - type yurtReverseProxy struct { - resolver apirequest.RequestInfoResolver - loadBalancer remote.LoadBalancer + cfg *config.YurtHubConfiguration cloudHealthChecker healthchecker.MultipleBackendsHealthChecker + resolver apirequest.RequestInfoResolver + loadBalancer http.Handler localProxy http.Handler autonomyProxy http.Handler + multiplexerProxy http.Handler + multiplexerManager *basemultiplexer.MultiplexerManager maxRequestsInFlight int tenantMgr tenant.Interface workingMode hubutil.WorkingMode - multiplexerProxy http.Handler - multiplexerResources []schema.GroupVersionResource nodeName string + multiplexerUserAgent string } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -87,7 +89,6 @@ func NewYurtReverseProxyHandler( } var localProxy, autonomyProxy http.Handler - if yurtHubCfg.WorkingMode == hubutil.WorkingModeEdge { // When yurthub works in Edge mode, we may use local proxy or pool proxy to handle // the request when offline. @@ -103,30 +104,25 @@ func NewYurtReverseProxyHandler( ) } + multiplexerProxy := multiplexer.NewMultiplexerProxy(yurtHubCfg.FilterFinder, + yurtHubCfg.RequestMultiplexerManager, + yurtHubCfg.RESTMapperManager, + stopCh) + yurtProxy := &yurtReverseProxy{ + cfg: yurtHubCfg, resolver: resolver, loadBalancer: lb, cloudHealthChecker: cloudHealthChecker, localProxy: localProxy, autonomyProxy: autonomyProxy, + multiplexerProxy: multiplexerProxy, + multiplexerManager: yurtHubCfg.RequestMultiplexerManager, maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight, tenantMgr: tenantMgr, workingMode: yurtHubCfg.WorkingMode, - multiplexerResources: yurtHubCfg.MultiplexerResources, nodeName: yurtHubCfg.NodeName, - } - - if yurtHubCfg.PostStartHooks == nil { - yurtHubCfg.PostStartHooks = make(map[string]func() error) - } - yurtHubCfg.PostStartHooks[multiplexerProxyPostHookName] = func() error { - if yurtProxy.multiplexerProxy, err = multiplexer.NewMultiplexerProxy(yurtHubCfg.FilterFinder, - yurtHubCfg.RequestMultiplexerManager, - yurtHubCfg.MultiplexerResources, - stopCh); err != nil { - return fmt.Errorf("failed to new default share proxy, error: %v", err) - } - return nil + multiplexerUserAgent: hubutil.MultiplexerProxyClientUserAgentPrefix + yurtHubCfg.NodeName, } return yurtProxy.buildHandlerChain(yurtProxy), nil @@ -143,9 +139,10 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler handler = util.WithListRequestSelector(handler) } handler = util.WithRequestTraceFull(handler) - handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight) + handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight, p.nodeName) handler = util.WithRequestClientComponent(handler, p.workingMode) handler = util.WithPartialObjectMetadataRequest(handler) + handler = util.WithIsRequestForPoolScopeMetadata(handler, p.multiplexerManager, p.multiplexerUserAgent) if p.tenantMgr != nil && p.tenantMgr.GetTenantNs() != "" { handler = util.WithSaTokenSubstitute(handler, p.tenantMgr) @@ -159,28 +156,39 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler } func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + // allow requests from yurthub itself because yurthub need to get resource from cloud kube-apiserver for initializing, + // and reject all requests from outside of yurthub when yurthub is not ready. + if !p.IsRequestFromHubSelf(req) { + if err := config.ReadinessCheck(p.cfg); err != nil { + klog.Errorf("could not handle request(%s) because hub is not ready for %s", hubutil.ReqString(req), err.Error()) + hubutil.Err(apierrors.NewServiceUnavailable(err.Error()), rw, req) + return + } + } + + // pool scope metadata requests should be handled by multiplexer for both cloud and edge mode. + isRequestForPoolScopeMetadata, ok := hubutil.IsRequestForPoolScopeMetadataFrom(req.Context()) + if ok && isRequestForPoolScopeMetadata { + p.multiplexerProxy.ServeHTTP(rw, req) + return + } + + // requests should be forwarded to cloud kube-apiserver for cloud mode if p.workingMode == hubutil.WorkingModeCloud { p.loadBalancer.ServeHTTP(rw, req) return } + // handle requests for edge mode switch { case util.IsKubeletLeaseReq(req): p.handleKubeletLease(rw, req) case util.IsKubeletGetNodeReq(req): - if p.autonomyProxy != nil { - p.autonomyProxy.ServeHTTP(rw, req) - } else { - p.loadBalancer.ServeHTTP(rw, req) - } + p.autonomyProxy.ServeHTTP(rw, req) case util.IsEventCreateRequest(req): p.eventHandler(rw, req) case util.IsSubjectAccessReviewCreateGetRequest(req): p.subjectAccessReviewHandler(rw, req) - case util.IsMultiplexerRequest(req, p.multiplexerResources, p.nodeName): - if p.multiplexerProxy != nil { - p.multiplexerProxy.ServeHTTP(rw, req) - } default: // handling the request with cloud apiserver or local cache. if p.cloudHealthChecker.IsHealthy() { @@ -216,3 +224,17 @@ func (p *yurtReverseProxy) subjectAccessReviewHandler(rw http.ResponseWriter, re hubutil.Err(err, rw, req) } } + +func (p *yurtReverseProxy) IsRequestFromHubSelf(req *http.Request) bool { + userAgent := req.UserAgent() + + if userAgent == p.multiplexerUserAgent { + // requests from multiplexer manager in yurthub + return true + } else if strings.HasPrefix(userAgent, projectinfo.GetHubName()) { + // requests from sharedInformer for filter and configuration manager in yurthub + return true + } else { + return false + } +} diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 1f2ada92be2..bdddeba9162 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -244,25 +244,23 @@ func (lb *loadBalancer) modifyResponse(resp *http.Response) error { req = req.WithContext(ctx) // filter response data - if !yurtutil.IsNil(lb.filterFinder) { - if responseFilter, ok := lb.filterFinder.FindResponseFilter(req); ok { - wrapBody, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "filter") - size, filterRc, err := responseFilter.Filter(req, wrapBody, lb.stopCh) - if err != nil { - klog.Errorf("could not filter response for %s, %v", hubutil.ReqString(req), err) - return err - } - resp.Body = filterRc - if size > 0 { - resp.ContentLength = int64(size) - resp.Header.Set(yurtutil.HttpHeaderContentLength, fmt.Sprint(size)) - } - - // after gunzip in filter, the header content encoding should be removed. - // because there's no need to gunzip response.body again. - if needUncompressed { - resp.Header.Del("Content-Encoding") - } + if responseFilter, ok := lb.filterFinder.FindResponseFilter(req); ok { + wrapBody, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "filter") + size, filterRc, err := responseFilter.Filter(req, wrapBody, lb.stopCh) + if err != nil { + klog.Errorf("could not filter response for %s, %v", hubutil.ReqString(req), err) + return err + } + resp.Body = filterRc + if size > 0 { + resp.ContentLength = int64(size) + resp.Header.Set(yurtutil.HttpHeaderContentLength, fmt.Sprint(size)) + } + + // after gunzip in filter, the header content encoding should be removed. + // because there's no need to gunzip response.body again. + if needUncompressed { + resp.Header.Del("Content-Encoding") } } diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index 421d0d0ddea..d0a565688b2 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -43,6 +43,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/metrics" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -59,6 +60,44 @@ var needModifyTimeoutVerb = map[string]bool{ "watch": true, } +// WithIsRequestForPoolScopeMetadata add a mark in context for specifying whether a request is used for list/watching pool scope metadata or not, +// request for pool scope metadata will be handled by multiplexer manager instead forwarding to cloud kube-apiserver. +func WithIsRequestForPoolScopeMetadata(handler http.Handler, multiplexerManager *multiplexer.MultiplexerManager, multiplexerUserAgent string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + if isMultiplexerRequest(req, multiplexerManager, multiplexerUserAgent) { + ctx = util.WithIsRequestForPoolScopeMetadata(ctx, true) + } else { + ctx = util.WithIsRequestForPoolScopeMetadata(ctx, false) + } + req = req.WithContext(ctx) + handler.ServeHTTP(w, req) + }) +} + +func isMultiplexerRequest(req *http.Request, multiplexerManager *multiplexer.MultiplexerManager, multiplexerUserAgent string) bool { + // the requests from multiplexer manager + if req.UserAgent() == multiplexerUserAgent { + return false + } + + info, ok := apirequest.RequestInfoFrom(req.Context()) + if !ok { + return false + } + + // list/watch requests + if info.Verb != "list" && info.Verb != "watch" { + return false + } + + return multiplexerManager.IsPoolScopeMetadata(&schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + }) +} + // WithPartialObjectMetadataRequest is used for extracting info for partial object metadata request, // then these info is used by cache manager. func WithPartialObjectMetadataRequest(handler http.Handler) http.Handler { @@ -280,12 +319,19 @@ func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) { // latency for outward requests redirected from proxyserver to apiserver func WithRequestTrace(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - info, ok := apirequest.RequestInfoFrom(req.Context()) - client, _ := util.ClientComponentFrom(req.Context()) + ctx := req.Context() + info, ok := apirequest.RequestInfoFrom(ctx) + client, _ := util.ClientComponentFrom(ctx) + isRequestForPoolScopeMetadata, _ := util.IsRequestForPoolScopeMetadataFrom(ctx) if ok { if info.IsResourceRequest { - metrics.Metrics.IncInFlightRequests(info.Verb, info.Resource, info.Subresource, client) - defer metrics.Metrics.DecInFlightRequests(info.Verb, info.Resource, info.Subresource, client) + if isRequestForPoolScopeMetadata { + metrics.Metrics.IncInFlightMultiplexerRequests(info.Verb, info.Resource, info.Subresource, client) + defer metrics.Metrics.DecInFlightMultiplexerRequests(info.Verb, info.Resource, info.Subresource, client) + } else { + metrics.Metrics.IncInFlightRequests(info.Verb, info.Resource, info.Subresource, client) + defer metrics.Metrics.DecInFlightRequests(info.Verb, info.Resource, info.Subresource, client) + } } } else { info = &apirequest.RequestInfo{} @@ -331,35 +377,57 @@ func WithRequestTraceFull(handler http.Handler) http.Handler { // WithMaxInFlightLimit limits the number of in-flight requests. and when in flight // requests exceeds the threshold, the following incoming requests will be rejected. -func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler { - var reqChan chan bool +func WithMaxInFlightLimit(handler http.Handler, limit int, nodeName string) http.Handler { + var reqChan, multiplexerReqChan chan bool if limit > 0 { reqChan = make(chan bool, limit) } + multiplexerReqChan = make(chan bool, 4096) return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - info, ok := apirequest.RequestInfoFrom(req.Context()) + ctx := req.Context() + info, ok := apirequest.RequestInfoFrom(ctx) if !ok { info = &apirequest.RequestInfo{} } - select { - case reqChan <- true: - if info.Resource == "leases" { - klog.V(5).Infof("%s, in flight requests: %d", util.ReqString(req), len(reqChan)) - } else { - klog.V(2).Infof("%s, in flight requests: %d", util.ReqString(req), len(reqChan)) + isRequestForPoolScopeMetadata, _ := util.IsRequestForPoolScopeMetadataFrom(ctx) + if isRequestForPoolScopeMetadata { + select { + case multiplexerReqChan <- true: + klog.V(2).Infof("%s, in flight requests for pool scope metadata: %d", util.ReqString(req), len(multiplexerReqChan)) + + defer func() { + <-multiplexerReqChan + klog.V(5).Infof("%s request completed, left %d requests for pool scope metadata in flight", util.ReqString(req), len(multiplexerReqChan)) + }() + handler.ServeHTTP(w, req) + default: + // Return a 429 status indicating "Too Many Requests" + klog.Errorf("Too many requests for pool scope metadata, please try again later, %s", util.ReqString(req)) + metrics.Metrics.IncRejectedMultiplexerRequestCounter() + w.Header().Set("Retry-After", "1") + util.Err(errors.NewTooManyRequestsError(fmt.Sprintf("Too many multiplexer requests for node(%s), please try again later.", nodeName)), w, req) + } + } else { + select { + case reqChan <- true: + if info.Resource == "leases" { + klog.V(5).Infof("%s, in flight requests: %d", util.ReqString(req), len(reqChan)) + } else { + klog.V(2).Infof("%s, in flight requests: %d", util.ReqString(req), len(reqChan)) + } + defer func() { + <-reqChan + klog.V(5).Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan)) + }() + handler.ServeHTTP(w, req) + default: + // Return a 429 status indicating "Too Many Requests" + klog.Errorf("Too many requests, please try again later, %s", util.ReqString(req)) + metrics.Metrics.IncRejectedRequestCounter() + w.Header().Set("Retry-After", "1") + util.Err(errors.NewTooManyRequestsError("Too many requests, please try again later."), w, req) } - defer func() { - <-reqChan - klog.V(5).Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan)) - }() - handler.ServeHTTP(w, req) - default: - // Return a 429 status indicating "Too Many Requests" - klog.Errorf("Too many requests, please try again later, %s", util.ReqString(req)) - metrics.Metrics.IncRejectedRequestCounter() - w.Header().Set("Retry-After", "1") - util.Err(errors.NewTooManyRequestsError("Too many requests, please try again later."), w, req) } }) } @@ -547,37 +615,3 @@ func ReListWatchReq(rw http.ResponseWriter, req *http.Request) { klog.Infof("this request write error event back finished.") rw.(http.Flusher).Flush() } - -func IsMultiplexerRequest(req *http.Request, multiplexerResources []schema.GroupVersionResource, nodeName string) bool { - ctx := req.Context() - - if req.UserAgent() == util.MultiplexerProxyClientUserAgentPrefix+nodeName { - return false - } - - info, ok := apirequest.RequestInfoFrom(ctx) - if !ok { - return false - } - - if info.Verb != "list" && info.Verb != "watch" { - return false - } - - return isMultiplexerResource(info, multiplexerResources) -} - -func isMultiplexerResource(info *apirequest.RequestInfo, multiplexerResources []schema.GroupVersionResource) bool { - gvr := schema.GroupVersionResource{ - Group: info.APIGroup, - Version: info.APIVersion, - Resource: info.Resource, - } - - for _, resource := range multiplexerResources { - if gvr.String() == resource.String() { - return true - } - } - return false -} diff --git a/pkg/yurthub/proxy/util/util_test.go b/pkg/yurthub/proxy/util/util_test.go index 890fdcac123..cb75d705c9a 100644 --- a/pkg/yurthub/proxy/util/util_test.go +++ b/pkg/yurthub/proxy/util/util_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "os" "reflect" "sync" "testing" @@ -32,11 +33,21 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/endpoints/request" + kstorage "k8s.io/apiserver/pkg/storage" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) +var serviceGVR = &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", +} + func newTestRequestInfoResolver() *request.RequestInfoFactory { return &request.RequestInfoFactory{ APIPrefixes: sets.NewString("api", "apis"), @@ -44,6 +55,73 @@ func newTestRequestInfoResolver() *request.RequestInfoFactory { } } +func TestWithIsRequestForPoolScopeMetadata(t *testing.T) { + testcases := map[string]struct { + userAgent string + verb string + path string + isRequestForPoolScopeMetadata bool + }{ + "list service resource": { + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/services", + isRequestForPoolScopeMetadata: true, + }, + + "get node resource": { + userAgent: "flanneld/0.11.0", + verb: "GET", + path: "/api/v1/nodes/mynode", + isRequestForPoolScopeMetadata: false, + }, + } + + resolver := newTestRequestInfoResolver() + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + req, _ := http.NewRequest(tc.verb, tc.path, nil) + if len(tc.userAgent) != 0 { + req.Header.Set("User-Agent", tc.userAgent) + } + req.RemoteAddr = "127.0.0.1" + + storageMap := map[string]kstorage.Interface{ + serviceGVR.String(): nil, + } + dsm := storage.NewDummyStorageManager(storageMap) + + tmpDir, err := os.MkdirTemp("", "test") + if err != nil { + t.Fatalf("failed to make temp dir, %v", err) + } + restMapperManager, _ := meta.NewRESTMapperManager(tmpDir) + + poolScopeResources := []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + } + + rmm := multiplexer.NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + + var isRequestForPoolScopeMetadata bool + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + isRequestForPoolScopeMetadata, _ = util.IsRequestForPoolScopeMetadataFrom(ctx) + }) + + handler = WithIsRequestForPoolScopeMetadata(handler, rmm, "test-agent") + handler = filters.WithRequestInfo(handler, resolver) + handler.ServeHTTP(httptest.NewRecorder(), req) + + if isRequestForPoolScopeMetadata != tc.isRequestForPoolScopeMetadata { + t.Errorf("%s: expect isRequestForPoolScopeMetadata %v, but got %v", k, tc.isRequestForPoolScopeMetadata, isRequestForPoolScopeMetadata) + } + }) + } +} + func TestWithPartialObjectMetadataRequest(t *testing.T) { testcases := map[string]struct { Verb string @@ -286,7 +364,7 @@ func TestWithMaxInFlightLimit(t *testing.T) { w.WriteHeader(http.StatusOK) }) - handler = WithMaxInFlightLimit(handler, 10) + handler = WithMaxInFlightLimit(handler, 10, "test-node") handler = filters.WithRequestInfo(handler, resolver) respCodes := make([]int, k) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index ecd851b632a..58ae9613265 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -21,7 +21,6 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -30,7 +29,6 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/util/profile" - "github.com/openyurtio/openyurt/pkg/yurthub/certificate" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" otautil "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate/util" @@ -74,12 +72,6 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, return err } } - - for name, hook := range cfg.PostStartHooks { - if err := hook(); err != nil { - return errors.Wrapf(err, "failed to run post start hooks: %s", name) - } - } return nil } @@ -90,7 +82,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, rest *res // register handler for health check c.HandleFunc("/v1/healthz", healthz).Methods("GET") - c.Handle("/v1/readyz", readyz(cfg.CertManager)).Methods("GET") + c.Handle("/v1/readyz", readyz(cfg)).Methods("GET") // register handler for profile if cfg.EnableProfiling { @@ -104,7 +96,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, rest *res if cfg.WorkingMode == util.WorkingModeEdge { c.Handle("/pods", ota.GetPods(cfg.StorageWrapper)).Methods("GET") } else { - c.Handle("/pods", getPodList(cfg.SharedFactory, cfg.NodeName)).Methods("GET") + c.Handle("/pods", getPodList(cfg.SharedFactory)).Methods("GET") } c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST") @@ -116,21 +108,20 @@ func healthz(w http.ResponseWriter, _ *http.Request) { fmt.Fprintf(w, "OK") } -// readyz is used for checking certificates are ready or not -func readyz(certificateMgr certificate.YurtCertificateManager) http.Handler { +// readyz is used for checking yurthub is ready to proxy requests or not +func readyz(cfg *config.YurtHubConfiguration) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ready := certificateMgr.Ready() - if ready { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "OK") - } else { - http.Error(w, "certificates are not ready", http.StatusInternalServerError) + if err := config.ReadinessCheck(cfg); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "OK") }) } -func getPodList(sharedFactory informers.SharedInformerFactory, nodeName string) http.Handler { - +func getPodList(sharedFactory informers.SharedInformerFactory) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { podLister := sharedFactory.Core().V1().Pods().Lister() podList, err := podLister.List(labels.Everything()) diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 6fc16c7ab12..4f4824abeec 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -66,10 +66,10 @@ const ( ProxyReqCanCache // ProxyListSelector represents label selector and filed selector string for list request ProxyListSelector - // ProxyPartialObjectMetadataRequest represents if this request is getting partial object metadata - ProxyPartialObjectMetadataRequest // ProxyConvertGVK represents the gvk of response when it is a partial object metadata request ProxyConvertGVK + // ProxyPoolScopeMetadata represents a request is going to list/watch pool scope metadata or not. + ProxyPoolScopeMetadata YurtHubNamespace = "kube-system" CacheUserAgentsKey = "cache_agents" @@ -157,6 +157,17 @@ func ConvertGVKFrom(ctx context.Context) (*schema.GroupVersionKind, bool) { return info, ok } +// WithIsRequestForPoolScopeMetadata returns a copy of parent in which request for pool scope metadata value is set +func WithIsRequestForPoolScopeMetadata(parent context.Context, isRequestForPoolScopeMetadata bool) context.Context { + return WithValue(parent, ProxyPoolScopeMetadata, isRequestForPoolScopeMetadata) +} + +// IsRequestForPoolScopeMetadataFrom returns the value of the request for pool scope metadata on the ctx +func IsRequestForPoolScopeMetadataFrom(ctx context.Context) (bool, bool) { + info, ok := ctx.Value(ProxyPoolScopeMetadata).(bool) + return info, ok +} + // ReqString formats a string for request func ReqString(req *http.Request) string { ctx := req.Context()