1313See the License for the specific language governing permissions and
1414limitations under the License.
1515"""
16+
1617import logging
1718import os
1819from typing import Dict , List , Optional
2425logger = logging .getLogger (__name__ )
2526
2627
28+ class RadasConfig (object ):
29+ def __init__ (self , data : Dict ):
30+ self .__umb_host : str = data .get ("umb_host" , None )
31+ self .__umb_host_port : str = data .get ("umb_host_port" , "5671" )
32+ self .__result_queue : str = data .get ("result_queue" , None )
33+ self .__request_chan : str = data .get ("request_channel" , None )
34+ self .__client_ca : str = data .get ("client_ca" , None )
35+ self .__client_key : str = data .get ("client_key" , None )
36+ self .__client_key_pass_file : str = data .get ("client_key_pass_file" , None )
37+ self .__root_ca : str = data .get ("root_ca" , "/etc/pki/tls/certs/ca-bundle.crt" )
38+ self .__quay_radas_registry_config : Optional [str ] = data .get (
39+ "quay_radas_registry_config" , None
40+ )
41+ self .__radas_sign_timeout_retry_count : int = data .get ("radas_sign_timeout_retry_count" , 10 )
42+ self .__radas_sign_timeout_retry_interval : int = data .get (
43+ "radas_sign_timeout_retry_interval" , 60
44+ )
45+ self .__radas_receiver_timeout : int = int (data .get ("radas_receiver_timeout" , 1800 ))
46+
47+ def validate (self ) -> bool :
48+ if not self .__umb_host :
49+ logger .error ("Missing host name setting for UMB!" )
50+ return False
51+ if not self .__result_queue :
52+ logger .error ("Missing the queue setting to receive signing result in UMB!" )
53+ return False
54+ if not self .__request_chan :
55+ logger .error ("Missing the queue setting to send signing request in UMB!" )
56+ return False
57+ if self .__client_ca and not os .access (self .__client_ca , os .R_OK ):
58+ logger .error ("The client CA file is not valid!" )
59+ return False
60+ if self .__client_key and not os .access (self .__client_key , os .R_OK ):
61+ logger .error ("The client key file is not valid!" )
62+ return False
63+ if self .__client_key_pass_file and not os .access (self .__client_key_pass_file , os .R_OK ):
64+ logger .error ("The client key password file is not valid!" )
65+ return False
66+ if self .__root_ca and not os .access (self .__root_ca , os .R_OK ):
67+ logger .error ("The root ca file is not valid!" )
68+ return False
69+ if self .__quay_radas_registry_config and not os .access (
70+ self .__quay_radas_registry_config , os .R_OK
71+ ):
72+ self .__quay_radas_registry_config = None
73+ logger .warning (
74+ "The quay registry config for oras is not valid, will ignore the registry config!"
75+ )
76+ return True
77+
78+ def umb_target (self ) -> str :
79+ if self .ssl_enabled ():
80+ return f"amqps://{ self .__umb_host .strip ()} :{ self .__umb_host_port } "
81+ else :
82+ return f"amqp://{ self .__umb_host .strip ()} :{ self .__umb_host_port } "
83+
84+ def result_queue (self ) -> str :
85+ return self .__result_queue .strip ()
86+
87+ def request_channel (self ) -> str :
88+ return self .__request_chan .strip ()
89+
90+ def client_ca (self ) -> str :
91+ return self .__client_ca .strip ()
92+
93+ def client_key (self ) -> str :
94+ return self .__client_key .strip ()
95+
96+ def client_key_password (self ) -> str :
97+ pass_file = self .__client_key_pass_file
98+ if os .access (pass_file , os .R_OK ):
99+ with open (pass_file , "r" ) as f :
100+ return f .read ().strip ()
101+ elif pass_file :
102+ logger .warning ("The key password file is not accessible. Will ignore the password." )
103+ return ""
104+
105+ def root_ca (self ) -> str :
106+ return self .__root_ca .strip ()
107+
108+ def ssl_enabled (self ) -> bool :
109+ return bool (self .__client_ca and self .__client_key and self .__root_ca )
110+
111+ def quay_radas_registry_config (self ) -> Optional [str ]:
112+ if self .__quay_radas_registry_config :
113+ return self .__quay_radas_registry_config .strip ()
114+ return None
115+
116+ def radas_sign_timeout_retry_count (self ) -> int :
117+ return self .__radas_sign_timeout_retry_count
118+
119+ def radas_sign_timeout_retry_interval (self ) -> int :
120+ return self .__radas_sign_timeout_retry_interval
121+
122+ def receiver_timeout (self ) -> int :
123+ return self .__radas_receiver_timeout
124+
125+
27126class CharonConfig (object ):
28127 """CharonConfig is used to store all configurations for charon
29128 tools.
@@ -39,6 +138,13 @@ def __init__(self, data: Dict):
39138 self .__ignore_signature_suffix : Dict = data .get ("ignore_signature_suffix" , None )
40139 self .__signature_command : str = data .get ("detach_signature_command" , None )
41140 self .__aws_cf_enable : bool = data .get ("aws_cf_enable" , False )
141+ radas_config : Dict = data .get ("radas" , None )
142+ self .__radas_config : Optional [RadasConfig ] = None
143+ if radas_config :
144+ self .__radas_config = RadasConfig (radas_config )
145+ self .__radas_enabled = bool (self .__radas_config and self .__radas_config .validate ())
146+ else :
147+ self .__radas_enabled = False
42148
43149 def get_ignore_patterns (self ) -> List [str ]:
44150 return self .__ignore_patterns
@@ -67,19 +173,23 @@ def get_detach_signature_command(self) -> str:
67173 def is_aws_cf_enable (self ) -> bool :
68174 return self .__aws_cf_enable
69175
176+ def is_radas_enabled (self ) -> bool :
177+ return self .__radas_enabled
178+
179+ def get_radas_config (self ) -> Optional [RadasConfig ]:
180+ return self .__radas_config
181+
70182
71183def get_config (cfgPath = None ) -> CharonConfig :
72184 config_file_path = cfgPath
73185 if not config_file_path or not os .path .isfile (config_file_path ):
74186 config_file_path = os .path .join (os .getenv ("HOME" , "" ), ".charon" , CONFIG_FILE )
75- data = read_yaml_from_file_path (config_file_path , ' schemas/charon.json' )
187+ data = read_yaml_from_file_path (config_file_path , " schemas/charon.json" )
76188 return CharonConfig (data )
77189
78190
79191def get_template (template_file : str ) -> str :
80- template = os .path .join (
81- os .getenv ("HOME" , '' ), ".charon/template" , template_file
82- )
192+ template = os .path .join (os .getenv ("HOME" , "" ), ".charon/template" , template_file )
83193 if os .path .isfile (template ):
84194 with open (template , encoding = "utf-8" ) as file_ :
85195 return file_ .read ()
0 commit comments