1
+ from __future__ import annotations
1
2
import base64
2
3
import importlib
3
4
import json
4
5
import os
5
6
import shlex
7
+ from typing import Any , Dict , List , Optional
6
8
7
9
import kubernetes
8
10
import urllib3
11
+ from kubernetes .dynamic import DynamicClient
9
12
from ocp_resources .image_content_source_policy import ImageContentSourcePolicy
10
13
from ocp_resources .node import Node
14
+ from ocp_resources .pod import Pod
11
15
from ocp_resources .resource import ResourceEditor
12
16
from ocp_resources .secret import Secret
13
17
from ocp_wrapper_data_collector .data_collector import (
14
18
get_data_collector_base_dir ,
15
19
get_data_collector_dict ,
16
20
)
21
+ from pyhelper_utils .shell import run_command
17
22
from simple_logger .logger import get_logger
18
23
from urllib3 .exceptions import MaxRetryError
19
24
23
28
NodeUnschedulableError ,
24
29
PodsFailedOrPendingError ,
25
30
)
26
- from ocp_utilities .utils import run_command
27
31
28
32
29
33
urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning )
32
36
LOGGER = get_logger (name = __name__ )
33
37
34
38
35
- def get_client (config_file = None , config_dict = None , context = None , ** kwargs ) :
39
+ def get_client (** kwargs : Any ) -> DynamicClient :
36
40
"""
37
41
Get a kubernetes client.
38
42
39
43
Pass either config_file or config_dict.
40
44
If none of them are passed, client will be created from default OS kubeconfig
41
45
(environment variable or .kube folder).
46
+ All kwargs, except config_file, config_dict and context will be passed to kubernetes.config.new_client_from_config_dict.
42
47
43
- Args:
48
+
49
+ kwargs:
44
50
config_file (str): path to a kubeconfig file.
45
51
config_dict (dict): dict with kubeconfig configuration.
46
52
context (str): name of the context to use.
@@ -49,6 +55,10 @@ def get_client(config_file=None, config_dict=None, context=None, **kwargs):
49
55
DynamicClient: a kubernetes client.
50
56
"""
51
57
# Ref: https://github.com/kubernetes-client/python/blob/v26.1.0/kubernetes/base/config/kube_config.py
58
+ config_file = kwargs .pop ("config_file" , None )
59
+ config_dict = kwargs .pop ("config_dict" , None )
60
+ context = kwargs .pop ("context" , None )
61
+
52
62
if config_dict :
53
63
return kubernetes .dynamic .DynamicClient (
54
64
client = kubernetes .config .new_client_from_config_dict (config_dict = config_dict , context = context , ** kwargs )
@@ -75,7 +85,7 @@ def get_client(config_file=None, config_dict=None, context=None, **kwargs):
75
85
)
76
86
77
87
78
- def assert_nodes_ready (nodes ) :
88
+ def assert_nodes_ready (nodes : List [ Node ]) -> None :
79
89
"""
80
90
Validates all nodes are in ready
81
91
@@ -91,7 +101,7 @@ def assert_nodes_ready(nodes):
91
101
raise NodeNotReadyError (f"Following nodes are not in ready state: { not_ready_nodes } " )
92
102
93
103
94
- def assert_nodes_schedulable (nodes ) :
104
+ def assert_nodes_schedulable (nodes : List [ Node ]) -> None :
95
105
"""
96
106
Validates all nodes are in schedulable state
97
107
@@ -107,7 +117,7 @@ def assert_nodes_schedulable(nodes):
107
117
raise NodeUnschedulableError (f"Following nodes are in unscheduled state: { unschedulable_nodes } " )
108
118
109
119
110
- def assert_pods_failed_or_pending (pods ) :
120
+ def assert_pods_failed_or_pending (pods : List [ Pod ]) -> None :
111
121
"""
112
122
Validates all pods are not in failed nor pending phase
113
123
@@ -134,9 +144,9 @@ def assert_pods_failed_or_pending(pods):
134
144
135
145
136
146
def assert_nodes_in_healthy_condition (
137
- nodes ,
138
- healthy_node_condition_type = None ,
139
- ):
147
+ nodes : List [ Node ] ,
148
+ healthy_node_condition_type : Optional [ Dict [ str , str ]] ,
149
+ ) -> None :
140
150
"""
141
151
Validates nodes are in a healthy condition.
142
152
Nodes Ready condition is True and the following node conditions are False:
@@ -155,7 +165,7 @@ def assert_nodes_in_healthy_condition(
155
165
156
166
Raises:
157
167
NodesNotHealthyConditionError: if any nodes DiskPressure MemoryPressure,
158
- PIDPressure, NetworkUnavailable, etc condition is True
168
+ PIDPressure, NetworkUnavailable, etc. condition is True
159
169
"""
160
170
LOGGER .info ("Verify all nodes are in a healthy condition." )
161
171
@@ -199,29 +209,29 @@ class DynamicClassCreator:
199
209
Taken from https://stackoverflow.com/a/66815839
200
210
"""
201
211
202
- def __init__ (self ):
203
- self .created_classes = {}
212
+ def __init__ (self ) -> None :
213
+ self .created_classes : Dict [ Any , Any ] = {}
204
214
205
- def __call__ (self , base_class ):
215
+ def __call__ (self , base_class : Any ) -> Any : # TODO: return `BaseResource` class
206
216
if base_class in self .created_classes :
207
217
return self .created_classes [base_class ]
208
218
209
219
class BaseResource (base_class ):
210
- def __init__ (self , * args , ** kwargs ) :
220
+ def __init__ (self , * args : Any , ** kwargs : Any ) -> None :
211
221
super ().__init__ (* args , ** kwargs )
212
222
213
- def _set_dynamic_class_creator_label (self ):
223
+ def _set_dynamic_class_creator_label (self ) -> None :
214
224
self .res .setdefault ("metadata" , {}).setdefault ("labels" , {}).update ({
215
225
"created-by-dynamic-class-creator" : "Yes"
216
226
})
217
227
218
- def to_dict (self ):
228
+ def to_dict (self ) -> None :
219
229
if not self .res :
220
230
super ().to_dict ()
221
231
222
232
self ._set_dynamic_class_creator_label ()
223
233
224
- def clean_up (self ):
234
+ def clean_up (self ) -> None :
225
235
try :
226
236
data_collector_dict = get_data_collector_dict ()
227
237
if data_collector_dict :
@@ -248,7 +258,7 @@ def clean_up(self):
248
258
return BaseResource
249
259
250
260
251
- def cluster_resource (base_class ) :
261
+ def cluster_resource (base_class : Any ) -> Any :
252
262
"""
253
263
Base class for all resources in order to override clean_up() method to collect resource data.
254
264
data_collect_yaml dict can be set via py_config pytest plugin or via
@@ -282,7 +292,13 @@ def cluster_resource(base_class):
282
292
return creator (base_class = base_class )
283
293
284
294
285
- def create_icsp_command (image , source_url , folder_name , pull_secret = None , filter_options = "" ):
295
+ def create_icsp_command (
296
+ image : str ,
297
+ source_url : str ,
298
+ folder_name : str ,
299
+ pull_secret : str = "" ,
300
+ filter_options : str = "" ,
301
+ ) -> str :
286
302
"""
287
303
Create ImageContentSourcePolicy command.
288
304
@@ -304,7 +320,13 @@ def create_icsp_command(image, source_url, folder_name, pull_secret=None, filter
304
320
return base_command
305
321
306
322
307
- def generate_icsp_file (folder_name , image , source_url , pull_secret = None , filter_options = "" ):
323
+ def generate_icsp_file (
324
+ folder_name : str ,
325
+ image : str ,
326
+ source_url : str ,
327
+ pull_secret : str = "" ,
328
+ filter_options : str = "" ,
329
+ ) -> str :
308
330
base_command = create_icsp_command (
309
331
image = image ,
310
332
source_url = source_url ,
@@ -323,19 +345,19 @@ def generate_icsp_file(folder_name, image, source_url, pull_secret=None, filter_
323
345
return icsp_file_path
324
346
325
347
326
- def create_icsp_from_file (icsp_file_path ) :
348
+ def create_icsp_from_file (icsp_file_path : str ) -> ImageContentSourcePolicy :
327
349
icsp = ImageContentSourcePolicy (yaml_file = icsp_file_path )
328
350
icsp .deploy ()
329
351
return icsp
330
352
331
353
332
- def create_icsp (icsp_name , repository_digest_mirrors ) :
354
+ def create_icsp (icsp_name : str , repository_digest_mirrors : List [ Dict [ str , Any ]]) -> ImageContentSourcePolicy :
333
355
icsp = ImageContentSourcePolicy (name = icsp_name , repository_digest_mirrors = repository_digest_mirrors )
334
356
icsp .deploy ()
335
357
return icsp
336
358
337
359
338
- def dict_base64_encode (_dict ) :
360
+ def dict_base64_encode (_dict : Dict [ Any , Any ]) -> str :
339
361
"""
340
362
Encoding dict in base64
341
363
@@ -345,10 +367,15 @@ def dict_base64_encode(_dict):
345
367
Returns:
346
368
str: given _dict encoded in base64
347
369
"""
348
- return base64 .b64encode (json .dumps (_dict ).encode ("ascii" )).decode ("utf-8" )
370
+ return base64 .b64encode (json .dumps (_dict ).encode ("ascii" )).decode ()
349
371
350
372
351
- def create_update_secret (secret_data_dict , name , namespace , admin_client = None ):
373
+ def create_update_secret (
374
+ secret_data_dict : Dict [str , Dict [str , Dict [str , str ]]],
375
+ name : str ,
376
+ namespace : str ,
377
+ admin_client : DynamicClient = None ,
378
+ ) -> Secret :
352
379
"""
353
380
Update existing secret or create a new secret; secret type - dockerconfigjson
354
381
0 commit comments