Skip to content

Commit

Permalink
feat: add Kubernetes-NodePort network plugin
Browse files Browse the repository at this point in the history
Signed-off-by: ChrisLiu <[email protected]>
  • Loading branch information
chrisliu1995 committed Apr 23, 2024
1 parent 69babe6 commit 8a2a66c
Show file tree
Hide file tree
Showing 3 changed files with 443 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ $(CONTROLLER_GEN): $(LOCALBIN)
.PHONY: envtest
envtest: $(ENVTEST) ## Download envtest-setup locally if necessary.
$(ENVTEST): $(LOCALBIN)
GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@c7e1dc9b5302d649d5531e19168dd7ea0013736d

HELM = $(shell pwd)/bin/helm
helm: ## Download helm locally if necessary.
Expand Down
259 changes: 259 additions & 0 deletions cloudprovider/kubernetes/nodeport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
Copyright 2024 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubernetes

import (
"context"
gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1"
"github.com/openkruise/kruise-game/cloudprovider"
cperrors "github.com/openkruise/kruise-game/cloudprovider/errors"
"github.com/openkruise/kruise-game/cloudprovider/utils"
"github.com/openkruise/kruise-game/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)

const (
NodePortNetwork = "Kubernetes-NodePort"

PortProtocolsConfigName = "PortProtocols"
)

type NodePortPlugin struct {
}

func (n *NodePortPlugin) Name() string {
return NodePortNetwork
}

func (n *NodePortPlugin) Alias() string {
return ""
}

func (n *NodePortPlugin) Init(client client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error {
return nil
}

func (n *NodePortPlugin) OnPodAdded(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) {
return pod, nil
}

func (n *NodePortPlugin) OnPodUpdated(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) {
networkManager := utils.NewNetworkManager(pod, client)

networkStatus, _ := networkManager.GetNetworkStatus()
networkConfig := networkManager.GetNetworkConfig()
npc, err := parseNodePortConfig(networkConfig)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error())
}

if networkStatus == nil {
pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{
CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady,
}, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}

// get svc
svc := &corev1.Service{}
err = client.Get(ctx, types.NamespacedName{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
}, svc)
if err != nil {
if errors.IsNotFound(err) {
return pod, cperrors.ToPluginError(client.Create(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError)
}
return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error())
}

// update svc
if util.GetHash(npc) != svc.GetAnnotations()[ServiceHashKey] {
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.InternalError, err.Error())
}
return pod, cperrors.ToPluginError(client.Update(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError)
}

// allow not ready containers
if util.IsAllowNotReadyContainers(networkManager.GetNetworkConfig()) {
toUpDateSvc, err := utils.AllowNotReadyContainers(client, ctx, pod, svc, false)
if err != nil {
return pod, err
}

if toUpDateSvc {
err := client.Update(ctx, svc)
if err != nil {
return pod, cperrors.ToPluginError(err, cperrors.ApiCallError)
}
}
}

// network ready
node := &corev1.Node{}
err = client.Get(ctx, types.NamespacedName{
Name: pod.Spec.NodeName,
}, node)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error())
}

if pod.Status.PodIP == "" {
// Pod IP not exist, Network NotReady
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}

internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0)
externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0)
for _, port := range svc.Spec.Ports {
instrIPort := port.TargetPort
if port.NodePort == 0 {
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}
instrEPort := intstr.FromInt(int(port.NodePort))
internalAddress := gamekruiseiov1alpha1.NetworkAddress{
IP: pod.Status.PodIP,
Ports: []gamekruiseiov1alpha1.NetworkPort{
{
Name: instrIPort.String(),
Port: &instrIPort,
Protocol: port.Protocol,
},
},
}
externalAddress := gamekruiseiov1alpha1.NetworkAddress{
IP: getAddress(node),
Ports: []gamekruiseiov1alpha1.NetworkPort{
{
Name: instrIPort.String(),
Port: &instrEPort,
Protocol: port.Protocol,
},
},
}
internalAddresses = append(internalAddresses, internalAddress)
externalAddresses = append(externalAddresses, externalAddress)
}
networkStatus.InternalAddresses = internalAddresses
networkStatus.ExternalAddresses = externalAddresses
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}

func (n *NodePortPlugin) OnPodDeleted(client client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError {
return nil
}

func init() {
kubernetesProvider.registerPlugin(&NodePortPlugin{})
}

type nodePortConfig struct {
ports []int
protocols []corev1.Protocol
isFixed bool
}

func parseNodePortConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nodePortConfig, error) {
var ports []int
var protocols []corev1.Protocol
isFixed := false

for _, c := range conf {
switch c.Name {
case PortProtocolsConfigName:
ports, protocols = parsePortProtocols(c.Value)

case FixedKey:
var err error
isFixed, err = strconv.ParseBool(c.Value)
if err != nil {
return nil, err
}
}
}
return &nodePortConfig{
ports: ports,
protocols: protocols,
isFixed: isFixed,
}, nil
}

func parsePortProtocols(value string) ([]int, []corev1.Protocol) {
ports := make([]int, 0)
protocols := make([]corev1.Protocol, 0)
for _, pp := range strings.Split(value, ",") {
ppSlice := strings.Split(pp, "/")
port, err := strconv.Atoi(ppSlice[0])
if err != nil {
continue
}
ports = append(ports, port)
if len(ppSlice) != 2 {
protocols = append(protocols, corev1.ProtocolTCP)
} else {
protocols = append(protocols, corev1.Protocol(ppSlice[1]))
}
}
return ports, protocols
}

func consNodePortSvc(npc *nodePortConfig, pod *corev1.Pod, c client.Client, ctx context.Context) *corev1.Service {
svcPorts := make([]corev1.ServicePort, 0)
for i := 0; i < len(npc.ports); i++ {
svcPorts = append(svcPorts, corev1.ServicePort{
Name: strconv.Itoa(npc.ports[i]),
Port: int32(npc.ports[i]),
Protocol: npc.protocols[i],
TargetPort: intstr.FromInt(npc.ports[i]),
})
}

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
Annotations: map[string]string{
ServiceHashKey: util.GetHash(npc),
},
OwnerReferences: consOwnerReference(c, ctx, pod, npc.isFixed),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: map[string]string{
SvcSelectorKey: pod.GetName(),
},
Ports: svcPorts,
},
}
return svc
}
Loading

0 comments on commit 8a2a66c

Please sign in to comment.