Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update nifi.properties template for NiFi 2.0.0-M1 #344

Closed
wants to merge 17 commits into from
Closed
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- [PR #394](https://github.com/konpyutaika/nifikop/pull/394) - **[Operator/NifiCluster]** Added support to let users configure AdditionalNifiEnvs.
- [PR #430](https://github.com/konpyutaika/nifikop/pull/430) - **[Operator/NifiCluster]** Added Python extensions properties in the `nifi.properties` file template.
- [PR #344](https://github.com/konpyutaika/nifikop/pull/344) - **[Operator/NifiCluster]** Added `clusterManager` field to choose between `ZooKeeper` or `Kubernetes`.

### Changed

Expand All @@ -13,6 +14,7 @@
- [PR #422](https://github.com/konpyutaika/nifikop/pull/422) - **[NiGoApi]** Upgrade NiGoApi to v0.1.0.
- [PR #425](https://github.com/konpyutaika/nifikop/pull/425) - **[Operator]** Upgrade golang to 1.22.3.
- [PR #428](https://github.com/konpyutaika/nifikop/pull/428) - **[Documentation]** Upgrade node 22.2.0.
- [PR #344](https://github.com/konpyutaika/nifikop/pull/344) - **[Operator/NifiCluster]** Updated `nifi.properties` template for NiFi `2.0.0-M1`.

### Fixed Bugs

Expand Down
12 changes: 12 additions & 0 deletions api/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type ClientConfigType string
// +kubebuilder:validation:Enum={"external","internal"}
type ClusterType string

// ClusterManagerType defines the type of manager will handle the cluster.
// +kubebuilder:validation:Enum={"zookeeper","kubernetes"}
type ClusterManagerType string

// AccessPolicyType represents the type of access policy.
type AccessPolicyType string

Expand Down Expand Up @@ -270,6 +274,14 @@ const (
InternalCluster ClusterType = "internal"
)

const (
// ZookeeperClusterManager indicates that the cluster leader election and state management will be managed with ZooKeeper.
ZookeeperClusterManager ClusterManagerType = "zookeeper"
// ZookeeperClusterManager indicates that the cluster leader election and state management will be managed with Kubernetes resources,
// respectively with Leases and ConfigMaps.
KubernetesClusterManager ClusterManagerType = "kubernetes"
)

const (
// DataflowStateCreated describes the status of a NifiDataflow as created.
DataflowStateCreated DataflowState = "Created"
Expand Down
6 changes: 6 additions & 0 deletions api/v1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ type NifiClusterSpec struct {
Service ServicePolicy `json:"service,omitempty"`
// Pod defines the policy for pods owned by NiFiKop operator.
Pod PodPolicy `json:"pod,omitempty"`
// clusterManager specifies which manager will handle the cluster election and states.
// +kubebuilder:default:=zookeeper
// +optional
ClusterManager ClusterManagerType `json:"clusterManager,omitempty"`
// zKAddress specifies the ZooKeeper connection string
// in the form hostname:port where host and port are those of a Zookeeper server.
// TODO: rework for nice zookeeper connect string =
// +optional
ZKAddress string `json:"zkAddress,omitempty"`
// zKPath specifies the Zookeeper chroot path as part
// of its Zookeeper connection string which puts its data under same path in the global ZooKeeper namespace.
// +optional
ZKPath string `json:"zkPath,omitempty"`
// initContainerImage can override the default image used into the init container to check if
// ZoooKeeper server is reachable.
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/nificluster_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func convertNifiClusterSpec(src *NifiClusterSpec, dst *v1.NifiCluster) error {
dst.Spec.NifiURI = src.NifiURI
dst.Spec.RootProcessGroupId = src.RootProcessGroupId
dst.Spec.ProxyUrl = src.ProxyUrl
dst.Spec.ClusterManager = v1.ZookeeperClusterManager
dst.Spec.ZKAddress = src.ZKAddress
dst.Spec.ZKPath = src.ZKPath
dst.Spec.InitContainerImage = src.InitContainerImage
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/nifi.konpyutaika.com_nificlusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ spec:
type: string
clusterImage:
type: string
clusterManager:
default: zookeeper
enum:
- zookeeper
- kubernetes
type: string
controllerUserIdentity:
type: string
disruptionBudget:
Expand Down
6 changes: 6 additions & 0 deletions helm/nifikop/crds/nifi.konpyutaika.com_nificlusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ spec:
type: string
clusterImage:
type: string
clusterManager:
default: zookeeper
enum:
- zookeeper
- kubernetes
type: string
controllerUserIdentity:
type: string
disruptionBudget:
Expand Down
80 changes: 42 additions & 38 deletions pkg/resources/nifi/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,44 +152,7 @@ func (r *Reconciler) pod(node v1.Node, nodeConfig *v1.NodeConfig, pvcs []corev1.
FSGroup: nodeConfig.GetFSGroup(),
SeccompProfile: seccompProfile,
},
InitContainers: r.injectAdditionalFields(nodeConfig, append(initContainers, []corev1.Container{
{
Name: "zookeeper",
Image: r.NifiCluster.Spec.GetInitContainerImage(),
ImagePullPolicy: nodeConfig.GetImagePullPolicy(),
Env: []corev1.EnvVar{
{
Name: "ZK_ADDRESS",
Value: zkAddress,
},
},
// The zookeeper init check here just ensures that at least one configured zookeeper host is alive
Command: []string{"bash", "-c", `
set -e
echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}"

connected=0
IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}"
until [ $connected -eq 1 ]
do
for zk_host in "${zk_hosts[@]}"
do
IFS=':' read -r -a zk_host_port <<< "${zk_host}"

echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]}
if [ $? -eq 0 ]; then
echo "Connected to ${zk_host_port}"
connected=1
fi
done

sleep 1
done
`},
Resources: generateInitContainerResources(),
},
}...)),
InitContainers: r.injectAdditionalEnvVars(initContainers),
Affinity: &corev1.Affinity{
PodAntiAffinity: generatePodAntiAffinity(r.NifiCluster.Name, r.NifiCluster.Spec.OneNifiNodePerNode),
},
Expand Down Expand Up @@ -217,6 +180,47 @@ done
if nodeConfig.NodeAffinity != nil {
pod.Spec.Affinity.NodeAffinity = nodeConfig.NodeAffinity
}

if len(zkAddress) > 0 {
pod.Spec.InitContainers = r.injectAdditionalEnvVars(append(initContainers, []corev1.Container{
{
Name: "zookeeper",
Image: r.NifiCluster.Spec.GetInitContainerImage(),
ImagePullPolicy: nodeConfig.GetImagePullPolicy(),
Env: []corev1.EnvVar{
{
Name: "ZK_ADDRESS",
Value: zkAddress,
},
},
// The zookeeper init check here just ensures that at least one configured zookeeper host is alive
Command: []string{"bash", "-c", `
set -e
echo "Trying to contact Zookeeper using connection string: ${ZK_ADDRESS}"

connected=0
IFS=',' read -r -a zk_hosts <<< "${ZK_ADDRESS}"
until [ $connected -eq 1 ]
do
for zk_host in "${zk_hosts[@]}"
do
IFS=':' read -r -a zk_host_port <<< "${zk_host}"

echo "Checking Zookeeper Host: [${zk_host_port[0]}] Port: [${zk_host_port[1]}]"
nc -vzw 1 ${zk_host_port[0]} ${zk_host_port[1]}
if [ $? -eq 0 ]; then
echo "Connected to ${zk_host_port}"
connected=1
fi
done

sleep 1
done
`},
Resources: generateInitContainerResources(),
},
}...))
}
return pod
}

Expand Down
1 change: 1 addition & 0 deletions pkg/resources/nifi/secretconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (r *Reconciler) getNifiPropertiesConfigString(nConfig *v1.NodeConfig, id in
"IsNode": nConfig.GetIsNode(),
"ZookeeperConnectString": r.NifiCluster.Spec.ZKAddress,
"ZookeeperPath": r.NifiCluster.Spec.GetZkPath(),
"ClusterManager": r.NifiCluster.Spec.ClusterManager,
}); err != nil {
log.Error("error occurred during parsing the config template",
zap.String("clusterName", r.NifiCluster.Name),
Expand Down
Loading