From 83f878514eb97f578d5e33d410952e05cf608aab Mon Sep 17 00:00:00 2001 From: ChrisLiu Date: Mon, 15 Apr 2024 20:40:15 +0800 Subject: [PATCH] feat: add Kubernetes-NodePort network plugin Signed-off-by: ChrisLiu --- cloudprovider/kubernetes/nodeport.go | 259 ++++++++++++++++++++++ cloudprovider/kubernetes/nodeport_test.go | 183 +++++++++++++++ 2 files changed, 442 insertions(+) create mode 100644 cloudprovider/kubernetes/nodeport.go create mode 100644 cloudprovider/kubernetes/nodeport_test.go diff --git a/cloudprovider/kubernetes/nodeport.go b/cloudprovider/kubernetes/nodeport.go new file mode 100644 index 00000000..ebf5a08b --- /dev/null +++ b/cloudprovider/kubernetes/nodeport.go @@ -0,0 +1,259 @@ +/* +Copyright 2024 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + cperrors "github.com/openkruise/kruise-game/cloudprovider/errors" + "github.com/openkruise/kruise-game/cloudprovider/utils" + "github.com/openkruise/kruise-game/pkg/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" +) + +const ( + NodePortNetwork = "Kubernetes-NodePort" + + PortProtocolsConfigName = "PortProtocols" +) + +type NodePortPlugin struct { +} + +func (n *NodePortPlugin) Name() string { + return NodePortNetwork +} + +func (n *NodePortPlugin) Alias() string { + return "" +} + +func (n *NodePortPlugin) Init(client client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error { + return nil +} + +func (n *NodePortPlugin) OnPodAdded(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + return pod, nil +} + +func (n *NodePortPlugin) OnPodUpdated(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + networkManager := utils.NewNetworkManager(pod, client) + + networkStatus, _ := networkManager.GetNetworkStatus() + networkConfig := networkManager.GetNetworkConfig() + npc, err := parseNodePortConfig(networkConfig) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error()) + } + + if networkStatus == nil { + pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{ + CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady, + }, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + // get svc + svc := &corev1.Service{} + err = client.Get(ctx, types.NamespacedName{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + }, svc) + if err != nil { + if errors.IsNotFound(err) { + return pod, cperrors.ToPluginError(client.Create(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError) + } + return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + // update svc + if util.GetHash(npc) != svc.GetAnnotations()[ServiceHashKey] { + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.InternalError, err.Error()) + } + return pod, cperrors.ToPluginError(client.Update(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError) + } + + // allow not ready containers + if util.IsAllowNotReadyContainers(networkManager.GetNetworkConfig()) { + toUpDateSvc, err := utils.AllowNotReadyContainers(client, ctx, pod, svc, false) + if err != nil { + return pod, err + } + + if toUpDateSvc { + err := client.Update(ctx, svc) + if err != nil { + return pod, cperrors.ToPluginError(err, cperrors.ApiCallError) + } + } + } + + // network ready + node := &corev1.Node{} + err = client.Get(ctx, types.NamespacedName{ + Name: pod.Spec.NodeName, + }, node) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + if pod.Status.PodIP == "" { + // Pod IP not exist, Network NotReady + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + for _, port := range svc.Spec.Ports { + instrIPort := port.TargetPort + if port.NodePort == 0 { + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + instrEPort := intstr.FromInt(int(port.NodePort)) + internalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: pod.Status.PodIP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: instrIPort.String(), + Port: &instrIPort, + Protocol: port.Protocol, + }, + }, + } + externalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: getAddress(node), + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: instrIPort.String(), + Port: &instrEPort, + Protocol: port.Protocol, + }, + }, + } + internalAddresses = append(internalAddresses, internalAddress) + externalAddresses = append(externalAddresses, externalAddress) + } + networkStatus.InternalAddresses = internalAddresses + networkStatus.ExternalAddresses = externalAddresses + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) +} + +func (n *NodePortPlugin) OnPodDeleted(client client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError { + return nil +} + +func init() { + kubernetesProvider.registerPlugin(&NodePortPlugin{}) +} + +type nodePortConfig struct { + ports []int + protocols []corev1.Protocol + isFixed bool +} + +func parseNodePortConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nodePortConfig, error) { + var ports []int + var protocols []corev1.Protocol + isFixed := false + + for _, c := range conf { + switch c.Name { + case PortProtocolsConfigName: + ports, protocols = parsePortProtocols(c.Value) + + case FixedKey: + var err error + isFixed, err = strconv.ParseBool(c.Value) + if err != nil { + return nil, err + } + } + } + return &nodePortConfig{ + ports: ports, + protocols: protocols, + isFixed: isFixed, + }, nil +} + +func parsePortProtocols(value string) ([]int, []corev1.Protocol) { + ports := make([]int, 0) + protocols := make([]corev1.Protocol, 0) + for _, pp := range strings.Split(value, ",") { + ppSlice := strings.Split(pp, "/") + port, err := strconv.Atoi(ppSlice[0]) + if err != nil { + continue + } + ports = append(ports, port) + if len(ppSlice) != 2 { + protocols = append(protocols, corev1.ProtocolTCP) + } else { + protocols = append(protocols, corev1.Protocol(ppSlice[1])) + } + } + return ports, protocols +} + +func consNodePortSvc(npc *nodePortConfig, pod *corev1.Pod, c client.Client, ctx context.Context) *corev1.Service { + svcPorts := make([]corev1.ServicePort, 0) + for i := 0; i < len(npc.ports); i++ { + svcPorts = append(svcPorts, corev1.ServicePort{ + Name: strconv.Itoa(npc.ports[i]), + Port: int32(npc.ports[i]), + Protocol: npc.protocols[i], + TargetPort: intstr.FromInt(npc.ports[i]), + }) + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + Annotations: map[string]string{ + ServiceHashKey: util.GetHash(npc), + }, + OwnerReferences: consOwnerReference(c, ctx, pod, npc.isFixed), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{ + SvcSelectorKey: pod.GetName(), + }, + Ports: svcPorts, + }, + } + return svc +} diff --git a/cloudprovider/kubernetes/nodeport_test.go b/cloudprovider/kubernetes/nodeport_test.go new file mode 100644 index 00000000..4e9ebedb --- /dev/null +++ b/cloudprovider/kubernetes/nodeport_test.go @@ -0,0 +1,183 @@ +package kubernetes + +import ( + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/pkg/util" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/pointer" + "reflect" + "testing" +) + +func TestParseNPConfig(t *testing.T) { + tests := []struct { + conf []gamekruiseiov1alpha1.NetworkConfParams + podNetConfig *nodePortConfig + }{ + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: PortProtocolsConfigName, + Value: "80", + }, + }, + podNetConfig: &nodePortConfig{ + ports: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: PortProtocolsConfigName, + Value: "8021/UDP", + }, + }, + podNetConfig: &nodePortConfig{ + ports: []int{8021}, + protocols: []corev1.Protocol{corev1.ProtocolUDP}, + }, + }, + } + + for _, test := range tests { + podNetConfig, _ := parseNodePortConfig(test.conf) + if !reflect.DeepEqual(podNetConfig, test.podNetConfig) { + t.Errorf("expect podNetConfig: %v, but actual: %v", test.podNetConfig, podNetConfig) + } + } +} + +func TestConsNPSvc(t *testing.T) { + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + Namespace: "ns", + UID: "bff0afd6-bb30-4641-8607-8329547324eb", + }, + } + + // case 0 + npcCase0 := &nodePortConfig{ + ports: []int{ + 80, + 8080, + }, + protocols: []corev1.Protocol{ + corev1.ProtocolTCP, + corev1.ProtocolTCP, + }, + isFixed: false, + } + svcCase0 := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + Namespace: "ns", + Annotations: map[string]string{ + ServiceHashKey: util.GetHash(npcCase0), + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: "pod-3", + UID: "bff0afd6-bb30-4641-8607-8329547324eb", + Controller: pointer.BoolPtr(true), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{ + SvcSelectorKey: "pod-3", + }, + Ports: []corev1.ServicePort{ + { + Name: "80", + Port: int32(80), + TargetPort: intstr.FromInt(80), + Protocol: corev1.ProtocolTCP, + }, + { + Name: "8080", + Port: int32(8080), + TargetPort: intstr.FromInt(8080), + Protocol: corev1.ProtocolTCP, + }, + }, + }, + } + + // case 1 + npcCase1 := &nodePortConfig{ + ports: []int{ + 8021, + }, + protocols: []corev1.Protocol{ + corev1.ProtocolUDP, + }, + isFixed: false, + } + svcCase1 := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + Namespace: "ns", + Annotations: map[string]string{ + ServiceHashKey: util.GetHash(npcCase1), + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: "pod-3", + UID: "bff0afd6-bb30-4641-8607-8329547324eb", + Controller: pointer.BoolPtr(true), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{ + SvcSelectorKey: "pod-3", + }, + Ports: []corev1.ServicePort{ + { + Name: "8021", + Port: int32(8021), + TargetPort: intstr.FromInt(8021), + Protocol: corev1.ProtocolUDP, + }, + }, + }, + } + + tests := []struct { + npc *nodePortConfig + svc *corev1.Service + }{ + { + npc: npcCase0, + svc: svcCase0, + }, + { + npc: npcCase1, + svc: svcCase1, + }, + } + + for i, test := range tests { + actual := consNodePortSvc(test.npc, pod, nil, nil) + if !reflect.DeepEqual(actual, test.svc) { + t.Errorf("case %d: expect service: %v , but actual: %v", i, test.svc, actual) + } + } +}