From 28e296a80e900f1187dcb0bbcb99979d89ffd738 Mon Sep 17 00:00:00 2001
From: Nigel Foucha <nigel@defenseunicorns.com>
Date: Thu, 7 Nov 2024 15:20:31 -0500
Subject: [PATCH] feat(operator): update pod definition and nifi reconciliation
 for istio compatibility

this pushes the zk connectivity check into the pod command instead of the init container and removes istio overrides from reconciliation checks

relates to #172
---
 pkg/resources/nifi/nifi.go | 57 ++++++++++++++++++++++++++++--
 pkg/resources/nifi/pod.go  | 72 ++++++++++++++++----------------------
 2 files changed, 85 insertions(+), 44 deletions(-)

diff --git a/pkg/resources/nifi/nifi.go b/pkg/resources/nifi/nifi.go
index 80c821d775..2111eeaebb 100644
--- a/pkg/resources/nifi/nifi.go
+++ b/pkg/resources/nifi/nifi.go
@@ -661,6 +661,12 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
 	}
 
 	if err == nil {
+		// k8s-objectmatcher options
+		opts := []patch.CalculateOption{
+			patch.IgnoreStatusFields(),
+			patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
+			patch.IgnorePDBSelector(),
+		}
 		// Since toleration does not support patchStrategy:"merge,retainKeys", we need to add all toleration from the current pod if the toleration is set in the CR
 		if len(desiredPod.Spec.Tolerations) > 0 {
 			desiredPod.Spec.Tolerations = append(desiredPod.Spec.Tolerations, currentPod.Spec.Tolerations...)
@@ -674,8 +680,55 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
 			}
 			desiredPod.Spec.Tolerations = uniqueTolerations
 		}
+		// If there are extra initContainers from webhook injections we need to add them
+		if len(currentPod.Spec.InitContainers) > len(desiredPod.Spec.InitContainers) {
+			desiredPod.Spec.InitContainers = append(currentPod.Spec.InitContainers, desiredPod.Spec.InitContainers...)
+			uniqueContainers := []corev1.Container{}
+			keys := make(map[string]bool)
+			for _, c := range desiredPod.Spec.InitContainers {
+				if _, value := keys[c.Name]; !value {
+					keys[c.Name] = true
+					uniqueContainers = append(uniqueContainers, c)
+				}
+			}
+			desiredPod.Spec.InitContainers = uniqueContainers
+		}
+		// If there are extra containers from webhook injections we need to add them
+		if len(currentPod.Spec.Containers) > len(desiredPod.Spec.Containers) {
+			desiredPod.Spec.Containers = append(currentPod.Spec.Containers, desiredPod.Spec.Containers...)
+			uniqueContainers := []corev1.Container{}
+			keys := make(map[string]bool)
+			for _, c := range desiredPod.Spec.Containers {
+				if _, value := keys[c.Name]; !value {
+					keys[c.Name] = true
+					uniqueContainers = append(uniqueContainers, c)
+				}
+			}
+			desiredPod.Spec.Containers = uniqueContainers
+		}
+		// Remove problematic fields if istio
+		if _, ok := currentPod.Annotations["istio.io/rev"]; ok {
+			// Prometheus scrape port is overridden by istio injection
+			delete(currentPod.Annotations, "prometheus.io/port")
+			delete(desiredPod.Annotations, "prometheus.io/port")
+			// Liveness probe port is overridden by istio injection
+			desiredContainer := corev1.Container{}
+			for _, c := range desiredPod.Spec.Containers {
+				if c.Name == "nifi" {
+					desiredContainer = c
+				}
+			}
+			currentContainers := []corev1.Container{}
+			for _, c := range currentPod.Spec.Containers {
+				if c.Name == "nifi" {
+					c.LivenessProbe = desiredContainer.LivenessProbe
+				}
+				currentContainers = append(currentContainers, c)
+			}
+			currentPod.Spec.Containers = currentContainers
+		}
 		// Check if the resource actually updated
-		patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
+		patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod, opts...)
 		if err != nil {
 			log.Error("could not match pod objects",
 				zap.String("clusterName", r.NifiCluster.Name),
@@ -697,7 +750,7 @@ func (r *Reconciler) reconcileNifiPod(log zap.Logger, desiredPod *corev1.Pod) (e
 
 				log.Debug("pod resource is in sync",
 					zap.String("clusterName", r.NifiCluster.Name),
-					zap.String("podName", desiredPod.Name))
+					zap.String("podName", currentPod.Name))
 
 				return nil, k8sutil.PodReady(currentPod)
 			}
diff --git a/pkg/resources/nifi/pod.go b/pkg/resources/nifi/pod.go
index 013fcfad02..f341698dfe 100644
--- a/pkg/resources/nifi/pod.go
+++ b/pkg/resources/nifi/pod.go
@@ -95,46 +95,6 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1.
 	}...)
 
 	podInitContainers := r.injectAdditionalFields(nodeConfig, initContainers)
-	if len(zkAddress) > 0 {
-		podInitContainers = r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{
-			{
-				Name:            "zookeeper",
-				Image:           r.NifiCluster.Spec.GetInitContainerImage(),
-				ImagePullPolicy: nodeConfig.GetImagePullPolicy(),
-				Env: []corev1.EnvVar{
-					{
-						Name:  "ZK_ADDRESS",
-						Value: zkAddress,
-					},
-				},
-				// The zookeeper init check here just ensures that at least one configured zookeeper host is alive
-				Command: []string{"bash", "-c", `
-set -e
-echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}"
-
-connected=0
-IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}"
-until [ $connected -eq 1 ]
-do
-for zk_host in "${zk_hosts[@]}"
-do
-	IFS=':' read -r -a zk_host_port <<< "${zk_host}"
-	
-	echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
-	nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]}
-	if [ $? -eq 0 ]; then
-		echo "Connected to ${zk_host_port}"
-		connected=1
-	fi
-done
-
-sleep 1
-done
-`},
-				Resources: generateInitContainerResources(),
-			},
-		}...))
-	}
 
 	sort.Slice(podVolumes, func(i, j int) bool {
 		return podVolumes[i].Name < podVolumes[j].Name
@@ -534,12 +494,40 @@ do
 		notMatchedIp=false
 	fi
 done
-echo "Hostname is successfully binded withy IP address"`, nodeAddress, nodeAddress)
+echo "Hostname is successfully binded with IP address"`, nodeAddress, nodeAddress)
+	}
+
+	zkResolve := ""
+	if len(zkAddress) > 0 {
+		zkResolve = `echo "Trying to contact Zookeeper using connection string: ${NIFI_ZOOKEEPER_CONNECT_STRING}"
+
+connected=0
+IFS=',' read -r -a zk_hosts <<< "${NIFI_ZOOKEEPER_CONNECT_STRING}"
+until [ $connected -eq 1 ]
+do
+	for zk_host in "${zk_hosts[@]}"
+	do
+		IFS=':' read -r -a zk_host_port <<< "${zk_host}"
+
+		echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
+		set +e
+		curl --telnet-option 'BOGUS=1' --connect-timeout 2 -s telnet://${zk_host_port[0]}:${zk_host_port[1]} < /dev/null
+		if [ $? -eq 48 ]; then
+			echo "Connected to ${zk_host_port}"
+			connected=1
+		fi
+		set -e
+	done
+
+	sleep 1
+done`
 	}
+
 	command := []string{"bash", "-ce", fmt.Sprintf(`cp ${NIFI_HOME}/tmp/* ${NIFI_HOME}/conf/
 %s
 %s
-exec bin/nifi.sh run`, resolveIp, singleUser)}
+%s
+exec bin/nifi.sh run`, zkResolve, resolveIp, singleUser)}
 
 	return corev1.Container{
 		Name:            ContainerName,