Skip to content

Commit

Permalink
feat: improve direct clientsets for yurthub
Browse files Browse the repository at this point in the history
Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch committed Feb 2, 2025
1 parent 175c309 commit 90ba12e
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 178 deletions.
12 changes: 6 additions & 6 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/directclient"
"github.com/openyurtio/openyurt/pkg/yurthub/locallb"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
Expand Down Expand Up @@ -121,8 +121,8 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
}
trace++

klog.Infof("%d. new restConfig manager", trace)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, cloudHealthChecker)
klog.Infof("%d. new direct client manager", trace)
directClientManager, err := directclient.NewRestClientManager(cfg.RemoteServers, transportManager, cloudHealthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
Expand All @@ -139,7 +139,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {

if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, ctx.Done())
gcMgr, err := gc.NewGCManager(cfg, directClientManager, ctx.Done())
if err != nil {
return fmt.Errorf("could not new gc manager, %w", err)
}
Expand All @@ -161,7 +161,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
restConfigMgr,
directClientManager,
transportManager,
cloudHealthChecker,
tenantMgr,
Expand All @@ -176,7 +176,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
}

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, ctx.Done()); err != nil {
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, directClientManager, ctx.Done()); err != nil {
return fmt.Errorf("could not run hub servers, %w", err)
}
} else {
Expand Down
31 changes: 11 additions & 20 deletions pkg/yurthub/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/directclient"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)
Expand All @@ -43,15 +43,15 @@ var (
// GCManager is responsible for cleanup garbage of yurthub
type GCManager struct {
store cachemanager.StorageWrapper
restConfigManager *rest.RestConfigManager
manager *directclient.DirectClientManager
nodeName string
eventsGCFrequency time.Duration
lastTime time.Time
stopCh <-chan struct{}
}

// NewGCManager creates a *GCManager object
func NewGCManager(cfg *config.YurtHubConfiguration, restConfigManager *rest.RestConfigManager, stopCh <-chan struct{}) (*GCManager, error) {
func NewGCManager(cfg *config.YurtHubConfiguration, directClientManager *directclient.DirectClientManager, stopCh <-chan struct{}) (*GCManager, error) {
gcFrequency := cfg.GCFrequency
if gcFrequency == 0 {
gcFrequency = defaultEventGcInterval
Expand All @@ -60,7 +60,7 @@ func NewGCManager(cfg *config.YurtHubConfiguration, restConfigManager *rest.Rest
// TODO: use disk storage directly
store: cfg.StorageWrapper,
nodeName: cfg.NodeName,
restConfigManager: restConfigManager,
manager: directClientManager,
eventsGCFrequency: time.Duration(gcFrequency) * time.Minute,
stopCh: stopCh,
}
Expand All @@ -75,14 +75,9 @@ func (m *GCManager) Run() {
go wait.JitterUntil(func() {
klog.V(2).Infof("start gc events after waiting %v from previous gc", time.Since(m.lastTime))
m.lastTime = time.Now()
cfg := m.restConfigManager.GetRestConfig(true)
if cfg == nil {
klog.Errorf("could not get rest config, so skip gc")
return
}
kubeClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Errorf("could not new kube client, %v", err)
kubeClient := m.manager.GetDirectClientset(true)
if kubeClient == nil {
klog.Warningf("all remote servers are unhealthy, skip gc events")
return
}

Expand All @@ -109,14 +104,10 @@ func (m *GCManager) gcPodsWhenRestart() {
if len(localPodKeys) == 0 {
return
}
cfg := m.restConfigManager.GetRestConfig(true)
if cfg == nil {
klog.Errorf("could not get rest config, so skip gc pods when restart")
return
}
kubeClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Errorf("could not new kube client, %v", err)

kubeClient := m.manager.GetDirectClientset(true)
if kubeClient == nil {
klog.Warningf("all remote servers are unhealthy, skip gc pods")
return
}

Expand Down
85 changes: 85 additions & 0 deletions pkg/yurthub/kubernetes/directclient/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package directclient

import (
"fmt"
"net/url"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
)

// DirectClientManager is a holder for clientsets which are used to connecting cloud kube-apiserver directly.
// All clientsets are prepared when yurthub startup, so it is efficient to get a clientset by this manager
// for accessing cloud kube-apiserver.
type DirectClientManager struct {
checker healthchecker.MultipleBackendsHealthChecker
serverToClientset map[string]*kubernetes.Clientset
}

func NewRestClientManager(servers []*url.URL, tansportManager transport.Interface, healthChecker healthchecker.MultipleBackendsHealthChecker) (*DirectClientManager, error) {
mgr := &DirectClientManager{
checker: healthChecker,
serverToClientset: make(map[string]*kubernetes.Clientset),
}

for i := range servers {
config := &rest.Config{
Host: servers[i].Host,
Transport: tansportManager.CurrentTransport(),
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

if len(servers[i].Host) != 0 {
mgr.serverToClientset[servers[i].Host] = clientset
}
}

if len(mgr.serverToClientset) == 0 {
return nil, fmt.Errorf("clientset should not be empty")
}

return mgr, nil
}

// GetDirectClientset gets kube clientset according to the healthy status of server
func (rcm *DirectClientManager) GetDirectClientset(needHealthyServer bool) *kubernetes.Clientset {
var serverHost string
if needHealthyServer {
healthyServer, _ := rcm.checker.PickHealthyServer()
if healthyServer == nil {
klog.Infof("all of remote servers are unhealthy, so return nil for clientset")
return nil
}
serverHost = healthyServer.Host
} else {
for host := range rcm.serverToClientset {
serverHost = host
}
}

return rcm.serverToClientset[serverHost]
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package rest
package directclient

import (
"context"
Expand All @@ -29,18 +29,18 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/testdata"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
)

var (
testDir = "/tmp/rest/"
)

func TestGetRestConfig(t *testing.T) {
func TestGetDirectClientset(t *testing.T) {
testDir, err := os.MkdirTemp("", "test-client")
if err != nil {
t.Fatalf("failed to make temp dir, %v", err)
}
nodeName := "foo"
servers := map[string]int{"https://10.10.10.113:6443": 2}
u, _ := url.Parse("https://10.10.10.113:6443")
remoteServers := []*url.URL{u}
fakeHealthyChecker := healthchecker.NewFakeChecker(false, servers)

client, err := testdata.CreateCertFakeClient("../../certificate/testdata")
if err != nil {
Expand Down Expand Up @@ -73,32 +73,46 @@ func TestGetRestConfig(t *testing.T) {
t.Errorf("certificates are not ready, %v", err)
}

rcm, _ := NewRestConfigManager(certManager, fakeHealthyChecker)
transportManager, err := transport.NewTransportManager(certManager, context.Background().Done())
if err != nil {
t.Fatalf("could not new transport manager, %v", err)
}

testcases := map[string]struct {
healthy bool
needHealthyServer bool
cfgIsNil bool
clientIsNil bool
}{
"do not need healthy server": {
needHealthyServer: false,
cfgIsNil: false,
"get client from healthy servers": {
healthy: true,
needHealthyServer: true,
clientIsNil: false,
},
"need healthy server": {
"get client from unhealthy servers": {
healthy: false,
needHealthyServer: true,
cfgIsNil: true,
clientIsNil: true,
},
"get client at random": {
healthy: true,
needHealthyServer: false,
clientIsNil: false,
},
}

for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
cfg := rcm.GetRestConfig(tc.needHealthyServer)
if tc.cfgIsNil {
if cfg != nil {
t.Errorf("expect rest config is nil, but got %v", cfg)
fakeHealthyChecker := healthchecker.NewFakeChecker(tc.healthy, servers)
rcm, _ := NewRestClientManager(remoteServers, transportManager, fakeHealthyChecker)

client := rcm.GetDirectClientset(tc.needHealthyServer)
if tc.clientIsNil {
if client != nil {
t.Errorf("expect rest client is nil, but got %v", client)
}
} else {
if cfg == nil {
t.Errorf("expect non nil rest config, but got nil")
if client == nil {
t.Errorf("expect non nil rest client, but got nil")
}
}
})
Expand Down
67 changes: 0 additions & 67 deletions pkg/yurthub/kubernetes/rest/config.go

This file was deleted.

Loading

0 comments on commit 90ba12e

Please sign in to comment.