1
1
import logging
2
+ import sys
2
3
from typing import Any , Dict , List , Optional
3
4
4
5
import kopf
14
15
15
16
from os_utils import in_cluster
16
17
17
- csecs : Dict [str , Any ] = {}
18
-
19
- # Loading kubeconfig
20
- if in_cluster ():
18
+ if "unittest" not in sys .modules :
21
19
# Loading kubeconfig
22
- config .load_incluster_config ()
23
- else :
24
- # Loading using the local kubevonfig.
25
- config .load_kube_config ()
20
+ if in_cluster ():
21
+ # Loading kubeconfig
22
+ config .load_incluster_config ()
23
+ else :
24
+ # Loading using the local kubevonfig.
25
+ config .load_kube_config ()
26
26
27
27
v1 = client .CoreV1Api ()
28
28
custom_objects_api = client .CustomObjectsApi ()
@@ -92,7 +92,7 @@ def on_field_match_namespace(
92
92
uid = uid ,
93
93
name = name ,
94
94
namespace = namespace ,
95
- data = body . get ( 'data' ) ,
95
+ body = body ,
96
96
synced_namespace = updated_matched ,
97
97
))
98
98
@@ -113,6 +113,8 @@ def on_field_data(
113
113
body : Dict [str , Any ],
114
114
meta : kopf .Meta ,
115
115
name : str ,
116
+ namespace : Optional [str ],
117
+ uid : str ,
116
118
logger : logging .Logger ,
117
119
** _ ,
118
120
):
@@ -126,9 +128,14 @@ def on_field_data(
126
128
127
129
secret_type = body .get ('type' , 'Opaque' )
128
130
131
+ cached_cluster_secret = csecs_cache .get_cluster_secret (uid )
132
+ if cached_cluster_secret is None :
133
+ logger .error ('Received an event for an unknown ClusterSecret.' )
134
+
135
+ updated_syncedns = syncedns .copy ()
129
136
for ns in syncedns :
130
137
logger .info (f'Re Syncing secret { name } in ns { ns } ' )
131
- body = client .V1Secret (
138
+ ns_sec_body = client .V1Secret (
132
139
api_version = 'v1' ,
133
140
data = {str (key ): str (value ) for key , value in new .items ()},
134
141
kind = 'Secret' ,
@@ -140,14 +147,42 @@ def on_field_data(
140
147
),
141
148
type = secret_type ,
142
149
)
143
- logger .debug (f'body: { body } ' )
150
+ logger .debug (f'body: { ns_sec_body } ' )
144
151
# Ensuring the secret still exist.
145
152
if secret_exists (logger = logger , name = name , namespace = ns , v1 = v1 ):
146
- response = v1 .replace_namespaced_secret (name = name , namespace = ns , body = body )
153
+ response = v1 .replace_namespaced_secret (name = name , namespace = ns , body = ns_sec_body )
147
154
else :
148
- response = v1 .create_namespaced_secret (namespace = ns , body = body )
155
+ try :
156
+ v1 .read_namespace (name = ns )
157
+ except client .exceptions .ApiException as e :
158
+ if e .status != 404 :
159
+ raise
160
+ response = f'Namespace { ns } not found'
161
+ updated_syncedns .remove (ns )
162
+ logger .info (f'Namespace { ns } not found while Syncing secret { name } ' )
163
+ else :
164
+ response = v1 .create_namespaced_secret (namespace = ns , body = ns_sec_body )
149
165
logger .debug (response )
150
166
167
+ if updated_syncedns != syncedns :
168
+ # Patch synced_ns field
169
+ logger .debug (f'Patching clustersecret { name } in namespace { namespace } ' )
170
+ body = patch_clustersecret_status (
171
+ logger = logger ,
172
+ name = name ,
173
+ new_status = {'create_fn' : {'syncedns' : updated_syncedns }},
174
+ custom_objects_api = custom_objects_api ,
175
+ )
176
+
177
+ # Updating the cache
178
+ csecs_cache .set_cluster_secret (BaseClusterSecret (
179
+ uid = uid ,
180
+ name = name ,
181
+ namespace = namespace or "" ,
182
+ body = body ,
183
+ synced_namespace = updated_syncedns ,
184
+ ))
185
+
151
186
152
187
@kopf .on .resume ('clustersecret.io' , 'v1' , 'clustersecrets' )
153
188
@kopf .on .create ('clustersecret.io' , 'v1' , 'clustersecrets' )
@@ -164,8 +199,8 @@ async def create_fn(
164
199
165
200
# sync in all matched NS
166
201
logger .info (f'Syncing on Namespaces: { matchedns } ' )
167
- for namespace in matchedns :
168
- sync_secret (logger , namespace , body , v1 )
202
+ for ns in matchedns :
203
+ sync_secret (logger , ns , body , v1 )
169
204
170
205
# store status in memory
171
206
cached_cluster_secret = csecs_cache .get_cluster_secret (uid )
@@ -176,8 +211,8 @@ async def create_fn(
176
211
csecs_cache .set_cluster_secret (BaseClusterSecret (
177
212
uid = uid ,
178
213
name = name ,
179
- namespace = namespace ,
180
- data = body . get ( 'data' ) ,
214
+ namespace = namespace or "" ,
215
+ body = body ,
181
216
synced_namespace = matchedns ,
182
217
))
183
218
@@ -193,10 +228,10 @@ async def namespace_watcher(logger: logging.Logger, meta: kopf.Meta, **_):
193
228
logger .debug (f'New namespace created: { new_ns } re-syncing' )
194
229
ns_new_list = []
195
230
for cluster_secret in csecs_cache .all_cluster_secret ():
196
- obj_body = cluster_secret [ ' body' ]
197
- name = obj_body [ 'metadata' ][ ' name' ]
231
+ obj_body = cluster_secret . body
232
+ name = cluster_secret . name
198
233
199
- matcheddns = cluster_secret [ 'syncedns' ]
234
+ matcheddns = cluster_secret . synced_namespace
200
235
201
236
logger .debug (f'Old matched namespace: { matcheddns } - name: { name } ' )
202
237
ns_new_list = get_ns_list (logger , obj_body , v1 )
@@ -211,11 +246,16 @@ async def namespace_watcher(logger: logging.Logger, meta: kopf.Meta, **_):
211
246
)
212
247
213
248
# if there is a new matching ns, refresh cache
214
- cluster_secret .namespace = ns_new_list
249
+ cluster_secret .synced_namespace = ns_new_list
215
250
csecs_cache .set_cluster_secret (cluster_secret )
216
251
217
- # update ns_new_list on the object so then we also delete from there
218
- return {'syncedns' : ns_new_list }
252
+ # update ns_new_list on the object so then we also delete from there
253
+ patch_clustersecret_status (
254
+ logger = logger ,
255
+ name = cluster_secret .name ,
256
+ new_status = {'create_fn' : {'syncedns' : ns_new_list }},
257
+ custom_objects_api = custom_objects_api ,
258
+ )
219
259
220
260
221
261
@kopf .on .startup ()
@@ -243,8 +283,8 @@ async def startup_fn(logger: logging.Logger, **_):
243
283
BaseClusterSecret (
244
284
uid = metadata .get ('uid' ),
245
285
name = metadata .get ('name' ),
246
- namespace = metadata .get ('namespace' ),
247
- data = item . get ( 'data' ) ,
286
+ namespace = metadata .get ('namespace' , '' ),
287
+ body = item ,
248
288
synced_namespace = item .get ('status' , {}).get ('create_fn' , {}).get ('syncedns' , []),
249
289
)
250
290
)
0 commit comments