Skip to content

Commit

Permalink
feat: improve readiness probe for yurthub component (#2284)
Browse files Browse the repository at this point in the history
1. yurthub becomes ready after certificate, yurt-hub-cfg configmap, and resource for filters is synced.
2. improve multiplexer storage to lazy cache, this means multiplexer manager starts to list/watch pool scope metadata only when corresponding requests are received.
3. pool scope metadata requests will be rejected before multiplexer storage cache is ready.

Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch authored Jan 31, 2025
1 parent 21473cf commit 175c309
Show file tree
Hide file tree
Showing 34 changed files with 1,116 additions and 693 deletions.
12 changes: 10 additions & 2 deletions charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ spec:
host: {{ .Values.yurthubBindingAddr }}
path: /v1/healthz
port: 10267
initialDelaySeconds: 300
periodSeconds: 5
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
host: {{ .Values.yurthubBindingAddr }}
path: /v1/readyz
port: 10267
initialDelaySeconds: 120
periodSeconds: 20
failureThreshold: 3
resources:
requests:
Expand Down
12 changes: 10 additions & 2 deletions charts/yurthub/templates/yurthub-yurtstaticset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ spec:
host: {{ .Values.yurthubBindingAddr }}
path: /v1/healthz
port: 10267
initialDelaySeconds: 300
periodSeconds: 5
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
host: {{ .Values.yurthubBindingAddr }}
path: /v1/readyz
port: 10267
initialDelaySeconds: 120
periodSeconds: 20
failureThreshold: 3
resources:
requests:
Expand Down
51 changes: 23 additions & 28 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand Down Expand Up @@ -63,19 +62,6 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var AllowedMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -111,9 +97,7 @@ type YurtHubConfiguration struct {
ProxiedClient kubernetes.Interface
DiskCachePath string
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
RequestMultiplexerManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
RequestMultiplexerManager *multiplexer.MultiplexerManager
ConfigManager *configuration.Manager
}

Expand Down Expand Up @@ -178,8 +162,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
ProxiedClient: proxiedClient,
DiskCachePath: options.DiskCachePath,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: AllowedMultiplexerResources,
RequestMultiplexerManager: newMultiplexerCacheManager(options),
RequestMultiplexerManager: newRequestMultiplexerManager(options, restMapperManager),
ConfigManager: configManager,
}

Expand Down Expand Up @@ -302,7 +285,7 @@ func registerInformers(options *options.YurtHubOptions,
workingMode util.WorkingMode,
tenantNs string) {

// configmap informer is used by Yurthub filter approver
// configmap informer is used by Yurthub filter approver/cache manager
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
Expand Down Expand Up @@ -409,16 +392,28 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y
return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)
func newRequestMultiplexerManager(options *options.YurtHubOptions, restMapperManager *meta.RESTMapperManager) *multiplexer.MultiplexerManager {
config := &rest.Config{
Host: fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + options.NodeName,
}
storageProvider := storage.NewStorageProvider(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
return multiplexer.NewRequestMultiplexerManager(storageProvider, restMapperManager, options.PoolScopeResources)
}

func newRestConfig(nodeName string, host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName,
func ReadinessCheck(cfg *YurtHubConfiguration) error {
if ready := cfg.CertManager.Ready(); !ready {
return fmt.Errorf("certificates are not ready")
}

if ready := cfg.ConfigManager.HasSynced(); !ready {
return fmt.Errorf("yurt-hub-cfg configmap is not synced")
}

if synced := cfg.FilterFinder.HasSynced(); !synced {
return fmt.Errorf("resources needed by filters are not synced")
}

return nil
}
7 changes: 7 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
Expand Down Expand Up @@ -81,6 +82,7 @@ type YurtHubOptions struct {
UnsafeSkipCAVerification bool
ClientForTest kubernetes.Interface
EnablePoolServiceTopology bool
PoolScopeResources PoolScopeMetadatas
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -116,6 +118,10 @@ func NewYurtHubOptions() *YurtHubOptions {
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
EnablePoolServiceTopology: false,
PoolScopeResources: []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "services"},
{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"},
},
}
return o
}
Expand Down Expand Up @@ -208,6 +214,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.UnsafeSkipCAVerification, "discovery-token-unsafe-skip-ca-verification", o.UnsafeSkipCAVerification, "For token-based discovery, allow joining without --discovery-token-ca-cert-hash pinning.")
fs.BoolVar(&o.EnablePoolServiceTopology, "enable-pool-service-topology", o.EnablePoolServiceTopology, "enable service topology feature in the node pool.")
fs.StringVar(&o.HostControlPlaneAddr, "host-control-plane-address", o.HostControlPlaneAddr, "the address (ip:port) of host kubernetes cluster that used for yurthub local mode.")
fs.Var(&o.PoolScopeResources, "pool-scope-resources", "The list/watch requests for these resources will be multiplexered in yurthub in order to reduce overhead of kube-apiserver. comma-separated list of GroupVersionResource in the format Group/Version/Resource")
}

// verifyDummyIP verify the specified ip is valid or not and set the default ip if empty
Expand Down
5 changes: 5 additions & 0 deletions cmd/yurthub/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
Expand Down Expand Up @@ -61,6 +62,10 @@ func TestNewYurtHubOptions(t *testing.T) {
MinRequestTimeout: time.Second * 1800,
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
PoolScopeResources: []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "services"},
{Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"},
},
}

options := NewYurtHubOptions()
Expand Down
68 changes: 68 additions & 0 deletions cmd/yurthub/app/options/pool_scope_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2025 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
)

type PoolScopeMetadatas []schema.GroupVersionResource

// String returns the string representation of the GVR slice.
func (psm *PoolScopeMetadatas) String() string {
var result strings.Builder
for i, gvr := range *psm {
if i > 0 {
result.WriteString(",")
}

result.WriteString(gvr.Group)
result.WriteString("/")
result.WriteString(gvr.Version)
result.WriteString("/")
result.WriteString(gvr.Resource)
}

return result.String()
}

// Set parses the input string and updates the PoolScopeMetadata slice.
func (psm *PoolScopeMetadatas) Set(value string) error {
parts := strings.Split(value, ",")
for _, part := range parts {
subParts := strings.Split(part, "/")
if len(subParts) != 3 {
return fmt.Errorf("invalid GVR format: %s, expected format is Group/Version/Resource", part)
}

*psm = append(*psm, schema.GroupVersionResource{
Group: strings.TrimSpace(subParts[0]),
Version: strings.TrimSpace(subParts[1]),
Resource: strings.TrimSpace(subParts[2]),
})
}

return nil
}

// Type returns the type of the flag as a string.
func (psm *PoolScopeMetadatas) Type() string {
return "PoolScopeMetadatas"
}
118 changes: 118 additions & 0 deletions cmd/yurthub/app/options/pool_scope_metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2025 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"testing"

"k8s.io/apimachinery/pkg/runtime/schema"
)

func TestSet(t *testing.T) {
testcases := map[string]struct {
input string
expected []schema.GroupVersionResource
expectErr bool
}{
"single pool scope metadata": {
input: "group1/v1/resource1",
expected: []schema.GroupVersionResource{
{Group: "group1", Version: "v1", Resource: "resource1"},
},
},
"multiple pool scope metadatas": {
input: "group1/v1/resource1, group2/v2/resource2",
expected: []schema.GroupVersionResource{
{Group: "group1", Version: "v1", Resource: "resource1"},
{Group: "group2", Version: "v2", Resource: "resource2"},
},
},
"multiple pool scope metadatas with empty group": {
input: "/v1/resource1, /v2/resource2",
expected: []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "resource1"},
{Group: "", Version: "v2", Resource: "resource2"},
},
},
"invalid format of pool scope metadata": {
input: "group1/v1",
expectErr: true,
},
"empty string of pool scope metadata": {
input: "",
expectErr: true,
},
}

for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
var psm PoolScopeMetadatas
err := psm.Set(tc.input)
if (err != nil) != tc.expectErr {
t.Errorf("expected error %v, but got %v", tc.expectErr, err != nil)
}

if !tc.expectErr && !comparePoolScopeMetadatas(psm, tc.expected) {
t.Errorf("expected pool scope metadatas: %+v, but got %+v", tc.expected, psm)
}
})
}
}

func comparePoolScopeMetadatas(a, b []schema.GroupVersionResource) bool {
if len(a) != len(b) {
return false
}

for i := range a {
if a[i] != b[i] {
return false
}
}

return true
}

func TestString(t *testing.T) {
testcases := map[string]struct {
psm PoolScopeMetadatas
expectedStr string
}{
"single pool scope metadata": {
psm: PoolScopeMetadatas{
{Group: "group1", Version: "v1", Resource: "resource1"},
},
expectedStr: "group1/v1/resource1",
},
"multiple pool scope metadatas": {
psm: PoolScopeMetadatas{
{Group: "group1", Version: "v1", Resource: "resource1"},
{Group: "group2", Version: "v2", Resource: "resource2"},
},

expectedStr: "group1/v1/resource1,group2/v2/resource2",
},
}

for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
if tc.psm.String() != tc.expectedStr {
t.Errorf("expected string %s, but got %s", tc.expectedStr, tc.psm.String())
}
})
}
}
6 changes: 3 additions & 3 deletions pkg/node-servant/components/yurthub.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,16 @@ func hubHealthcheck(timeout time.Duration) error {
if err != nil {
return err
}
serverHealthzURL.Path = constants.ServerHealthzURLPath
serverHealthzURL.Path = constants.ServerReadyzURLPath

start := time.Now()
return wait.PollUntilContextTimeout(context.Background(), hubHealthzCheckFrequency, timeout, true, func(ctx context.Context) (bool, error) {
_, err := pingClusterHealthz(http.DefaultClient, serverHealthzURL.String())
if err != nil {
klog.Infof("yurt-hub is not ready, ping cluster healthz with result: %v", err)
klog.Infof("yurt-hub is not ready, ping cluster readyz with result: %v", err)
return false, nil
}
klog.Infof("yurt-hub healthz is OK after %f seconds", time.Since(start).Seconds())
klog.Infof("yurt-hub readyz is OK after %f seconds", time.Since(start).Seconds())
return true, nil
})
}
Expand Down
Loading

0 comments on commit 175c309

Please sign in to comment.