Skip to content

Commit

Permalink
fix some errors
Browse files Browse the repository at this point in the history
  • Loading branch information
huangchenzhao committed Oct 27, 2024
1 parent 3b5ee51 commit dc085a7
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 125 deletions.
53 changes: 29 additions & 24 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type YurtHubConfiguration struct {
CoordinatorStorageAddr string // ip:port
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
APIServerIPs []string
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -172,36 +173,40 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
APIServerIPs: options.APIServerIPs,
}

certMgr, err := certificatemgr.NewYurtHubCertManager(options, us)
if err != nil {
return nil, err
}
certMgr.Start()
err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 4*time.Minute, true, func(ctx context.Context) (bool, error) {
isReady := certMgr.Ready()
if isReady {
return true, nil
// if yurthub is in local mode, certMgr and networkMgr are no need to start
if cfg.WorkingMode != util.WorkingModeLocal {
certMgr, err := certificatemgr.NewYurtHubCertManager(options, us)
if err != nil {
return nil, err
}
return false, nil
})
if err != nil {
return nil, fmt.Errorf("hub certificates preparation failed, %v", err)
}
cfg.CertManager = certMgr

if options.EnableDummyIf {
klog.V(2).Infof("create dummy network interface %s(%s) and init iptables manager", options.HubAgentDummyIfName, options.HubAgentDummyIfIP)
networkMgr, err := network.NewNetworkManager(options)
certMgr.Start()
err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 4*time.Minute, true, func(ctx context.Context) (bool, error) {
isReady := certMgr.Ready()
if isReady {
return true, nil
}
return false, nil
})
if err != nil {
return nil, fmt.Errorf("could not create network manager, %w", err)
return nil, fmt.Errorf("hub certificates preparation failed, %v", err)
}
cfg.NetworkMgr = networkMgr
}
cfg.CertManager = certMgr

if err = prepareServerServing(options, certMgr, cfg); err != nil {
return nil, err
if options.EnableDummyIf {
klog.V(2).Infof("create dummy network interface %s(%s) and init iptables manager", options.HubAgentDummyIfName, options.HubAgentDummyIfIP)
networkMgr, err := network.NewNetworkManager(options)
if err != nil {
return nil, fmt.Errorf("could not create network manager, %w", err)
}
cfg.NetworkMgr = networkMgr
}

if err = prepareServerServing(options, certMgr, cfg); err != nil {
return nil, err
}
}

return cfg, nil
Expand Down
6 changes: 4 additions & 2 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type YurtHubOptions struct {
EnableIptables bool
HubAgentDummyIfIP string
HubAgentDummyIfName string
APIServerIPs []string
DiskCachePath string
EnableResourceFilter bool
DisabledResourceFilters []string
Expand Down Expand Up @@ -186,7 +187,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver")
fs.IntVar(&o.YurtHubProxySecurePort, "proxy-secure-port", o.YurtHubProxySecurePort, "the port on which to proxy HTTPS requests to kube-apiserver")
fs.StringVar(&o.YurtHubNamespace, "namespace", o.YurtHubNamespace, "the namespace of YurtHub Server")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver,the format is: \"server1,server2,...\"")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver, or the address of host kubernetes cluster that used for yurthub local mode, the format is: \"server1,server2,...\"")
fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...")
fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).")
fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent")
Expand All @@ -212,7 +213,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableResourceFilter, "enable-resource-filter", o.EnableResourceFilter, "enable to filter response that comes back from reverse proxy")
fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response")
fs.StringVar(&o.NodePoolName, "nodepool-name", o.NodePoolName, "the name of node pool that runs hub agent")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud, local).")
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
fs.MarkDeprecated("enable-node-pool", "It is planned to be removed from OpenYurt in the future version, please use --enable-pool-service-topology instead")
Expand All @@ -225,6 +226,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Yurt-Coordinator etcd, in the format host:port")
bindFlags(&o.LeaderElection, fs)
fs.BoolVar(&o.EnablePoolServiceTopology, "enable-pool-service-topology", o.EnablePoolServiceTopology, "enable service topology feature in the node pool.")
fs.StringSliceVar(&o.APIServerIPs, "cloud-provider-ip", o.APIServerIPs, "the ip addresses of APIServers that used for daemonsets APIServers in host-k8s.")
}

// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset
Expand Down
205 changes: 108 additions & 97 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/locallb"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
Expand Down Expand Up @@ -93,119 +94,129 @@ func NewCmdStartYurtHub(ctx context.Context) *cobra.Command {

// Run runs the YurtHubConfiguration. This should never exit
func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
defer cfg.CertManager.Stop()
trace := 1
klog.Infof("%d. new transport manager", trace)
transportManager, err := transport.NewTransportManager(cfg.CertManager, ctx.Done())
if err != nil {
return fmt.Errorf("could not new transport manager, %w", err)
}
trace++

klog.Infof("%d. prepare cloud kube clients", trace)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager)
if err != nil {
return fmt.Errorf("could not create cloud clients, %w", err)
}
trace++

var cloudHealthChecker healthchecker.MultipleBackendsHealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checkers for remote servers and yurt coordinator", trace)
cloudHealthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, cloudClients, ctx.Done())
if cfg.WorkingMode == util.WorkingModeLocal {
klog.Infof("new locallb manager for node %s ", cfg.NodeName)
locallbMgr, err := locallb.NewLocalLBManager(cfg, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cloud health checker, %w", err)
return fmt.Errorf("could not new locallb manager, %w", err)
}
// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
} else {
klog.Infof("%d. disable health checker for node %s because it is a cloud node", trace, cfg.NodeName)
// In cloud mode, cloud health checker is not needed.
// This fake checker will always report that the cloud is healthy and yurt coordinator is unhealthy.
cloudHealthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
trace++
defer cfg.CertManager.Stop()
trace := 1
klog.Infof("%d. new transport manager", trace)
transportManager, err := transport.NewTransportManager(cfg.CertManager, ctx.Done())
if err != nil {
return fmt.Errorf("could not new transport manager, %w", err)
}
trace++

klog.Infof("%d. new restConfig manager", trace)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, cloudHealthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
trace++
klog.Infof("%d. prepare cloud kube clients", trace)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager)
if err != nil {
return fmt.Errorf("could not create cloud clients, %w", err)
}
trace++

var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++
var cloudHealthChecker healthchecker.MultipleBackendsHealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checkers for remote servers and yurt coordinator", trace)
cloudHealthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, cloudClients, ctx.Done())
if err != nil {
return fmt.Errorf("could not new cloud health checker, %w", err)
}
} else {
klog.Infof("%d. disable health checker for node %s because it is a cloud node", trace, cfg.NodeName)
// In cloud mode, cloud health checker is not needed.
// This fake checker will always report that the cloud is healthy and yurt coordinator is unhealthy.
cloudHealthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
trace++

if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, ctx.Done())
klog.Infof("%d. new restConfig manager", trace)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, cloudHealthChecker)
if err != nil {
return fmt.Errorf("could not new gc manager, %w", err)
return fmt.Errorf("could not new restConfig manager, %w", err)
}
gcMgr.Run()
} else {
klog.Infof("%d. disable gc manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++
trace++

klog.Infof("%d. new tenant sa manager", trace)
tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, ctx.Done())
trace++
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++

var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager
var coordinatorGetter func() yurtcoordinator.Coordinator = getFakeCoordinator
var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, ctx.Done())
if err != nil {
return fmt.Errorf("could not new gc manager, %w", err)
}
gcMgr.Run()
} else {
klog.Infof("%d. disable gc manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
klog.Infof("%d. new tenant sa manager", trace)
tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, ctx.Done())
trace++

coordinatorInformerRegistryChan := make(chan struct{})
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has
// been registered into informer factory.
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter =
coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
// wait for coordinator informer registry
klog.Info("waiting for coordinator informer registry")
<-coordinatorInformerRegistryChan
klog.Info("coordinator informer registry finished")
}
var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager
var coordinatorGetter func() yurtcoordinator.Coordinator = getFakeCoordinator
var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
trace++

coordinatorInformerRegistryChan := make(chan struct{})
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has
// been registered into informer factory.
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter =
coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
// wait for coordinator informer registry
klog.Info("waiting for coordinator informer registry")
<-coordinatorInformerRegistryChan
klog.Info("coordinator informer registry finished")
}

// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.NodePoolInformerFactory.Start(ctx.Done())

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
restConfigMgr,
transportManager,
cloudHealthChecker,
tenantMgr,
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
coordinatorServerURLGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
}
trace++
// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.NodePoolInformerFactory.Start(ctx.Done())

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
restConfigMgr,
transportManager,
cloudHealthChecker,
tenantMgr,
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
coordinatorServerURLGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
}
trace++

if cfg.NetworkMgr != nil {
cfg.NetworkMgr.Run(ctx.Done())
}
if cfg.NetworkMgr != nil {
cfg.NetworkMgr.Run(ctx.Done())
}

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, ctx.Done()); err != nil {
return fmt.Errorf("could not run hub servers, %w", err)
klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, ctx.Done()); err != nil {
return fmt.Errorf("could not run hub servers, %w", err)
}
}
<-ctx.Done()
klog.Info("hub agent exited")
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/openyurtio/openyurt

go 1.20
go 1.21

toolchain go1.21.5

require (
github.com/aliyun/alibaba-cloud-sdk-go v1.62.156
Expand Down Expand Up @@ -83,6 +85,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/coreos/go-iptables v0.8.0
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
Expand Down
Loading

0 comments on commit dc085a7

Please sign in to comment.