1
1
"""
2
2
Utilities and configuration file parsing.
3
3
"""
4
+
4
5
import functools
5
6
import warnings
6
- from typing import Dict , Optional , Union
7
+ from typing import Any , Callable , cast , Dict , Iterable , Tuple , Optional , Union
7
8
from time import time , perf_counter , get_clock_info
8
-
9
- from can import typechecking
10
-
11
9
import json
12
10
import os
13
11
import os .path
18
16
19
17
import can
20
18
from can .interfaces import VALID_INTERFACES
19
+ from can import typechecking
21
20
22
21
log = logging .getLogger ("can.util" )
23
22
@@ -89,9 +88,8 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]:
89
88
"bitrate" : "CAN_BITRATE" ,
90
89
}
91
90
92
- context_suffix = "_{}" .format (context ) if context else ""
93
-
94
- can_config_key = "CAN_CONFIG" + context_suffix
91
+ context_suffix = f"_{ context } " if context else ""
92
+ can_config_key = f"CAN_CONFIG{ context_suffix } "
95
93
config : Dict [str , str ] = json .loads (os .environ .get (can_config_key , "{}" ))
96
94
97
95
for key , val in mapper .items ():
@@ -104,7 +102,7 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]:
104
102
105
103
def load_config (
106
104
path : Optional [typechecking .AcceptedIOType ] = None ,
107
- config = None ,
105
+ config : Optional [ Dict [ str , Any ]] = None ,
108
106
context : Optional [str ] = None ,
109
107
) -> typechecking .BusConfig :
110
108
"""
@@ -158,16 +156,19 @@ def load_config(
158
156
config = {}
159
157
160
158
# use the given dict for default values
161
- config_sources = [
162
- given_config ,
163
- can .rc ,
164
- lambda _context : load_environment_config ( # pylint: disable=unnecessary-lambda
165
- _context
166
- ),
167
- lambda _context : load_environment_config (),
168
- lambda _context : load_file_config (path , _context ),
169
- lambda _context : load_file_config (path ),
170
- ]
159
+ config_sources = cast (
160
+ Iterable [Union [Dict [str , Any ], Callable [[Any ], Dict [str , Any ]]]],
161
+ [
162
+ given_config ,
163
+ can .rc ,
164
+ lambda _context : load_environment_config ( # pylint: disable=unnecessary-lambda
165
+ _context
166
+ ),
167
+ lambda _context : load_environment_config (),
168
+ lambda _context : load_file_config (path , _context ),
169
+ lambda _context : load_file_config (path ),
170
+ ],
171
+ )
171
172
172
173
# Slightly complex here to only search for the file config if required
173
174
for cfg in config_sources :
@@ -189,9 +190,7 @@ def load_config(
189
190
config [key ] = None
190
191
191
192
if config ["interface" ] not in VALID_INTERFACES :
192
- raise NotImplementedError (
193
- "Invalid CAN Bus Type - {}" .format (config ["interface" ])
194
- )
193
+ raise NotImplementedError (f'Invalid CAN Bus Type "{ config ["interface" ]} "' )
195
194
196
195
if "bitrate" in config :
197
196
config ["bitrate" ] = int (config ["bitrate" ])
@@ -216,30 +215,33 @@ def load_config(
216
215
timing_conf [key ] = int (config [key ], base = 0 )
217
216
del config [key ]
218
217
if timing_conf :
219
- timing_conf ["bitrate" ] = config . get ( "bitrate" )
218
+ timing_conf ["bitrate" ] = config [ "bitrate" ]
220
219
config ["timing" ] = can .BitTiming (** timing_conf )
221
220
222
- can .log .debug ("can config: {}" .format (config ))
223
- return config
221
+ can .log .debug ("can config: %s" , config )
222
+
223
+ return cast (typechecking .BusConfig , config )
224
+
224
225
226
+ def set_logging_level (level_name : str ) -> None :
227
+ """Set the logging level for the `"can"` logger.
225
228
226
- def set_logging_level (level_name : Optional [str ] = None ):
227
- """Set the logging level for the "can" logger.
228
- Expects one of: 'critical', 'error', 'warning', 'info', 'debug', 'subdebug'
229
+ :param level_name: One of: `'critical'`, `'error'`, `'warning'`, `'info'`,
230
+ `'debug'`, `'subdebug'`, or the value `None` (=default). Defaults to `'debug'`.
229
231
"""
230
232
can_logger = logging .getLogger ("can" )
231
233
232
234
try :
233
- can_logger .setLevel (getattr (logging , level_name .upper ())) # type: ignore
235
+ can_logger .setLevel (getattr (logging , level_name .upper ()))
234
236
except AttributeError :
235
237
can_logger .setLevel (logging .DEBUG )
236
- log .debug ("Logging set to {}" . format ( level_name ) )
238
+ log .debug ("Logging set to %s" , level_name )
237
239
238
240
239
241
def len2dlc (length : int ) -> int :
240
242
"""Calculate the DLC from data length.
241
243
242
- :param int length: Length in number of bytes (0-64)
244
+ :param length: Length in number of bytes (0-64)
243
245
244
246
:returns: DLC (0-15)
245
247
"""
@@ -261,20 +263,17 @@ def dlc2len(dlc: int) -> int:
261
263
return CAN_FD_DLC [dlc ] if dlc <= 15 else 64
262
264
263
265
264
- def channel2int (channel : Optional [Union [ typechecking .Channel ] ]) -> Optional [int ]:
266
+ def channel2int (channel : Optional [typechecking .Channel ]) -> Optional [int ]:
265
267
"""Try to convert the channel to an integer.
266
268
267
269
:param channel:
268
- Channel string (e.g. can0, CAN1) or integer
270
+ Channel string (e.g. `" can0"`, `" CAN1"` ) or an integer
269
271
270
- :returns: Channel integer or `None` if unsuccessful
272
+ :returns: Channel integer or `` None` ` if unsuccessful
271
273
"""
272
- if channel is None :
273
- return None
274
274
if isinstance (channel , int ):
275
275
return channel
276
- # String and byte objects have a lower() method
277
- if hasattr (channel , "lower" ):
276
+ if isinstance (channel , str ):
278
277
match = re .match (r".*(\d+)$" , channel )
279
278
if match :
280
279
return int (match .group (1 ))
@@ -299,35 +298,33 @@ def library_function(new_arg):
299
298
def deco (f ):
300
299
@functools .wraps (f )
301
300
def wrapper (* args , ** kwargs ):
302
- rename_kwargs (f .__name__ , kwargs , aliases )
301
+ _rename_kwargs (f .__name__ , kwargs , aliases )
303
302
return f (* args , ** kwargs )
304
303
305
304
return wrapper
306
305
307
306
return deco
308
307
309
308
310
- def rename_kwargs (func_name , kwargs , aliases ):
309
+ def _rename_kwargs (
310
+ func_name : str , kwargs : Dict [str , str ], aliases : Dict [str , str ]
311
+ ) -> None :
311
312
"""Helper function for `deprecated_args_alias`"""
312
313
for alias , new in aliases .items ():
313
314
if alias in kwargs :
314
315
value = kwargs .pop (alias )
315
316
if new is not None :
316
- warnings .warn (
317
- "{} is deprecated; use {}" .format (alias , new ), DeprecationWarning
318
- )
317
+ warnings .warn (f"{ alias } is deprecated; use { new } " , DeprecationWarning )
319
318
if new in kwargs :
320
319
raise TypeError (
321
- "{} received both {} (deprecated) and {}" .format (
322
- func_name , alias , new
323
- )
320
+ f"{ func_name } received both { alias } (deprecated) and { new } "
324
321
)
325
322
kwargs [new ] = value
326
323
else :
327
324
warnings .warn ("{} is deprecated" .format (alias ), DeprecationWarning )
328
325
329
326
330
- def time_perfcounter_correlation ():
327
+ def time_perfcounter_correlation () -> Tuple [ float , float ] :
331
328
"""Get the `perf_counter` value nearest to when time.time() is updated
332
329
333
330
Computed if the default timer used by `time.time` on this platform has a resolution
0 commit comments