@@ -37,27 +37,26 @@ import (
37
37
"github.com/mitchellh/hashstructure/v2"
38
38
)
39
39
40
-
41
40
var logBk = logf .Log .WithName ("controller_zookeeperbackup" )
42
41
43
42
// ZookeeperBackupReconciler reconciles a ZookeeperBackup object
44
43
type ZookeeperBackupReconciler struct {
45
- client client.Client
46
- scheme * runtime.Scheme
47
- log logr.Logger
44
+ Client client.Client
45
+ Scheme * runtime.Scheme
46
+ Log logr.Logger
48
47
}
49
48
50
49
//+kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperbackups,verbs=get;list;watch;create;update;patch;delete
51
50
//+kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperbackups/status,verbs=get;update;patch
52
51
//+kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperbackups/finalizers,verbs=update
53
52
54
- func (r * ZookeeperBackupReconciler ) Reconcile (request reconcile.Request ) (reconcile.Result , error ) {
55
- r .log = logBk .WithValues ("Request.Namespace" , request .Namespace , "Request.Name" , request .Name )
56
- r .log .Info ("Reconciling ZookeeperBackup" )
53
+ func (r * ZookeeperBackupReconciler ) Reconcile (_ context. Context , request reconcile.Request ) (reconcile.Result , error ) {
54
+ r .Log = logBk .WithValues ("Request.Namespace" , request .Namespace , "Request.Name" , request .Name )
55
+ r .Log .Info ("Reconciling ZookeeperBackup" )
57
56
58
57
// Fetch the ZookeeperBackup instance
59
58
zookeeperBackup := & zookeeperv1beta1.ZookeeperBackup {}
60
- err := r .client .Get (context .TODO (), request .NamespacedName , zookeeperBackup )
59
+ err := r .Client .Get (context .TODO (), request .NamespacedName , zookeeperBackup )
61
60
if err != nil {
62
61
if errors .IsNotFound (err ) {
63
62
// Request object not found, could have been deleted after reconcile request.
@@ -74,16 +73,16 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
74
73
pvc := newPVCForZookeeperBackup (zookeeperBackup )
75
74
76
75
// Set ZookeeperBackup instance as the owner and controller
77
- if err := controllerutil .SetControllerReference (zookeeperBackup , pvc , r .scheme ); err != nil {
76
+ if err := controllerutil .SetControllerReference (zookeeperBackup , pvc , r .Scheme ); err != nil {
78
77
return reconcile.Result {}, err
79
78
}
80
79
81
80
// Check if PVC already created
82
81
foundPVC := & corev1.PersistentVolumeClaim {}
83
- err = r .client .Get (context .TODO (), types.NamespacedName {Name : pvc .Name , Namespace : pvc .Namespace }, foundPVC )
82
+ err = r .Client .Get (context .TODO (), types.NamespacedName {Name : pvc .Name , Namespace : pvc .Namespace }, foundPVC )
84
83
if err != nil && errors .IsNotFound (err ) {
85
- r .log .Info ("Creating a new PersistenVolumeClaim" )
86
- err = r .client .Create (context .TODO (), pvc )
84
+ r .Log .Info ("Creating a new PersistenVolumeClaim" )
85
+ err = r .Client .Create (context .TODO (), pvc )
87
86
if err != nil {
88
87
return reconcile.Result {}, err
89
88
}
@@ -95,20 +94,20 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
95
94
cronJob := newCronJobForCR (zookeeperBackup )
96
95
97
96
// Set ZookeeperBackup instance as the owner and controller
98
- if err := controllerutil .SetControllerReference (zookeeperBackup , cronJob , r .scheme ); err != nil {
97
+ if err := controllerutil .SetControllerReference (zookeeperBackup , cronJob , r .Scheme ); err != nil {
99
98
return reconcile.Result {}, err
100
99
}
101
100
102
101
// Check if zookeeper cluster exists
103
102
foundZookeeperCluster := & zookeeperv1beta1.ZookeeperCluster {}
104
103
zkCluster := zookeeperBackup .Spec .ZookeeperCluster
105
- err = r .client .Get (context .TODO (), types.NamespacedName {Name : zkCluster , Namespace : zookeeperBackup .Namespace }, foundZookeeperCluster )
104
+ err = r .Client .Get (context .TODO (), types.NamespacedName {Name : zkCluster , Namespace : zookeeperBackup .Namespace }, foundZookeeperCluster )
106
105
if err != nil && errors .IsNotFound (err ) {
107
- r .log .Error (err , fmt .Sprintf ("Zookeeper cluster '%s' not found" , zkCluster ))
106
+ r .Log .Error (err , fmt .Sprintf ("Zookeeper cluster '%s' not found" , zkCluster ))
108
107
return reconcile.Result {}, err
109
108
}
110
109
if foundZookeeperCluster .Status .Replicas != foundZookeeperCluster .Status .ReadyReplicas {
111
- r .log .Info (fmt .Sprintf ("Not all cluster replicas are ready: %d/%d. Suspend CronJob" ,
110
+ r .Log .Info (fmt .Sprintf ("Not all cluster replicas are ready: %d/%d. Suspend CronJob" ,
112
111
foundZookeeperCluster .Status .ReadyReplicas , foundZookeeperCluster .Status .Replicas ))
113
112
* cronJob .Spec .Suspend = true
114
113
} else {
@@ -120,7 +119,7 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
120
119
if err != nil && errors .IsNotFound (err ) {
121
120
return reconcile.Result {}, err
122
121
}
123
- r .log .Info (fmt .Sprintf ("Leader IP (hostname): %s" , leaderIp ))
122
+ r .Log .Info (fmt .Sprintf ("Leader IP (hostname): %s" , leaderIp ))
124
123
leaderHostname := strings .Split (leaderIp , "." )[0 ]
125
124
126
125
// Landing backup pod on the same node with leader
@@ -129,11 +128,11 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
129
128
client .InNamespace (request .NamespacedName .Namespace ),
130
129
client.MatchingLabels {"app" : zkCluster },
131
130
}
132
- err = r .client .List (context .TODO (), podList , opts ... )
131
+ err = r .Client .List (context .TODO (), podList , opts ... )
133
132
if err != nil {
134
133
if errors .IsNotFound (err ) {
135
134
msg := fmt .Sprintf ("Pods cannot be found by label app:%s" , zookeeperBackup .Name )
136
- r .log .Error (err , msg )
135
+ r .Log .Error (err , msg )
137
136
}
138
137
return reconcile.Result {}, err
139
138
}
@@ -142,7 +141,7 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
142
141
for _ , pod := range podList .Items {
143
142
if pod .Spec .Hostname == leaderHostname {
144
143
leaderFound = true
145
- r .log .Info (fmt .Sprintf ("Leader was found. Pod: %s (node: %s)" , pod .Name , pod .Spec .NodeName ))
144
+ r .Log .Info (fmt .Sprintf ("Leader was found. Pod: %s (node: %s)" , pod .Name , pod .Spec .NodeName ))
146
145
// Set appropriate NodeSelector and PVC ClaimName
147
146
cronJob .Spec .JobTemplate .Spec .Template .Spec .NodeSelector =
148
147
map [string ]string {"kubernetes.io/hostname" : pod .Spec .NodeName }
@@ -152,7 +151,7 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
152
151
}
153
152
}
154
153
if ! leaderFound {
155
- r .log .Info ("Pod with leader role wasn't found. Suspend CronJob" )
154
+ r .Log .Info ("Pod with leader role wasn't found. Suspend CronJob" )
156
155
* cronJob .Spec .Suspend = true
157
156
}
158
157
@@ -169,50 +168,50 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
169
168
170
169
// Check if this CronJob already exists
171
170
foundCJ := & batchv1beta1.CronJob {}
172
- err = r .client .Get (context .TODO (), types.NamespacedName {Name : cronJob .Name , Namespace : cronJob .Namespace }, foundCJ )
171
+ err = r .Client .Get (context .TODO (), types.NamespacedName {Name : cronJob .Name , Namespace : cronJob .Namespace }, foundCJ )
173
172
if err != nil && errors .IsNotFound (err ) {
174
- r .log .Info ("Creating a new CronJob" , "CronJob.Namespace" , cronJob .Namespace , "CronJob.Name" , cronJob .Name )
173
+ r .Log .Info ("Creating a new CronJob" , "CronJob.Namespace" , cronJob .Namespace , "CronJob.Name" , cronJob .Name )
175
174
cronJob .Annotations ["last-applied-hash" ] = hashStr
176
- err = r .client .Create (context .TODO (), cronJob )
175
+ err = r .Client .Create (context .TODO (), cronJob )
177
176
if err != nil {
178
177
return reconcile.Result {}, err
179
178
}
180
179
181
180
// CronJob created successfully
182
- r .log .Info ("CronJob created successfully." , "RequeueAfter" , ReconcileTime )
181
+ r .Log .Info ("CronJob created successfully." , "RequeueAfter" , ReconcileTime )
183
182
return reconcile.Result {RequeueAfter : ReconcileTime }, nil
184
183
} else if err != nil {
185
184
return reconcile.Result {}, err
186
185
}
187
186
188
187
if foundCJ .Annotations ["last-applied-hash" ] == hashStr {
189
- r .log .Info ("CronJob already exists and looks updated" , "CronJob.Namespace" , foundCJ .Namespace , "CronJob.Name" , foundCJ .Name )
188
+ r .Log .Info ("CronJob already exists and looks updated" , "CronJob.Namespace" , foundCJ .Namespace , "CronJob.Name" , foundCJ .Name )
190
189
} else {
191
190
cronJob .Annotations ["last-applied-hash" ] = hashStr
192
- r .log .Info ("Update CronJob" , "Namespace" , cronJob .Namespace , "Name" , cronJob .Name )
191
+ r .Log .Info ("Update CronJob" , "Namespace" , cronJob .Namespace , "Name" , cronJob .Name )
193
192
//cronJob.ObjectMeta.ResourceVersion = foundCJ.ObjectMeta.ResourceVersion
194
- err = r .client .Update (context .TODO (), cronJob )
193
+ err = r .Client .Update (context .TODO (), cronJob )
195
194
if err != nil {
196
- r .log .Error (err , "CronJob cannot be updated" )
195
+ r .Log .Error (err , "CronJob cannot be updated" )
197
196
return reconcile.Result {}, err
198
197
}
199
198
}
200
199
201
200
// Requeue
202
- r .log .Info (fmt .Sprintf ("Rerun reconclie after %s sec." , ReconcileTime ))
201
+ r .Log .Info (fmt .Sprintf ("Rerun reconclie after %s sec." , ReconcileTime ))
203
202
return reconcile.Result {RequeueAfter : ReconcileTime }, nil
204
203
}
205
204
206
205
func (r * ZookeeperBackupReconciler ) GetLeaderIP (zkCluster * zookeeperv1beta1.ZookeeperCluster ) (string , error ) {
207
206
// Get zookeeper leader via zookeeper admin server
208
207
svcAdminName := zkCluster .GetAdminServerServiceName ()
209
208
foundSvcAdmin := & corev1.Service {}
210
- err := r .client .Get (context .TODO (), types.NamespacedName {
209
+ err := r .Client .Get (context .TODO (), types.NamespacedName {
211
210
Name : svcAdminName ,
212
211
Namespace : zkCluster .Namespace ,
213
212
}, foundSvcAdmin )
214
213
if err != nil && errors .IsNotFound (err ) {
215
- r .log .Error (err , fmt .Sprintf ("Zookeeper admin service '%s' not found" , svcAdminName ))
214
+ r .Log .Error (err , fmt .Sprintf ("Zookeeper admin service '%s' not found" , svcAdminName ))
216
215
return "" , err
217
216
}
218
217
@@ -221,19 +220,19 @@ func (r *ZookeeperBackupReconciler) GetLeaderIP(zkCluster *zookeeperv1beta1.Zook
221
220
222
221
resp , err := http .Get (fmt .Sprintf ("http://%s:%d/commands/leader" , adminIp , svcPort .Port ))
223
222
if err != nil {
224
- r .log .Error (err , "Admin service error response" )
223
+ r .Log .Error (err , "Admin service error response" )
225
224
return "" , err
226
225
}
227
226
defer resp .Body .Close ()
228
227
body , err := io .ReadAll (resp .Body )
229
228
if err != nil {
230
- r .log .Error (err , "Can't read response body" )
229
+ r .Log .Error (err , "Can't read response body" )
231
230
return "" , err
232
231
}
233
232
var result map [string ]interface {}
234
233
err = json .Unmarshal (body , & result )
235
234
if err != nil {
236
- r .log .Error (err , "Can't unmarshal json" )
235
+ r .Log .Error (err , "Can't unmarshal json" )
237
236
return "" , err
238
237
}
239
238
leaderIp := result ["leader_ip" ].(string )
0 commit comments