-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathmagic_tunable.py
379 lines (287 loc) · 11.7 KB
/
magic_tunable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
import collections.abc
import functools
import inspect
import typing
import warnings
from typing import Callable, Generic, Optional, TypeVar, Union, overload
from collections.abc import Sequence
import ntcore
from ntcore import NetworkTableInstance
from ntcore.types import ValueT
class StructSerializable(typing.Protocol):
"""Any type that is a wpiutil.wpistruct."""
WPIStruct: typing.ClassVar
T = TypeVar("T")
V = TypeVar("V", bound=Union[ValueT, StructSerializable, Sequence[StructSerializable]])
class tunable(Generic[V]):
"""
This allows you to define simple properties that allow you to easily
communicate with other programs via NetworkTables.
The following example will define a NetworkTable variable at
``/components/my_component/foo``::
class MyRobot(magicbot.MagicRobot):
my_component: MyComponent
...
from magicbot import tunable
class MyComponent:
# define the tunable property
foo = tunable(True)
def execute(self):
# set the variable
self.foo = True
# get the variable
foo = self.foo
The key of the NetworkTables variable will vary based on what kind of
object the decorated method belongs to:
* A component: ``/components/COMPONENTNAME/VARNAME``
* An autonomous mode: ``/autonomous/MODENAME/VARNAME``
* Your main robot class: ``/robot/VARNAME``
.. note:: When executing unit tests on objects that create tunables,
you will want to use setup_tunables to set the object up.
In normal usage, MagicRobot does this for you, so you don't
have to do anything special.
.. versionchanged:: 2024.1.0
Added support for WPILib Struct serializable types.
Integer defaults now create integer topics instead of double topics.
"""
# the way this works is we use a special class to indicate that it
# is a tunable, and MagicRobot adds _ntattr and _global_table variables
# to the class property
# The tricky bit is that you need to do late binding on these, because
# the networktables key is not known when the object is created. Instead,
# the name of the key is related to the name of the variable name in the
# robot class
__slots__ = (
"_ntdefault",
"_ntsubtable",
"_ntwritedefault",
# "__doc__",
"__orig_class__",
"_topic_type",
"_nt",
)
def __init__(
self,
default: V,
*,
writeDefault: bool = True,
subtable: Optional[str] = None,
doc=None,
) -> None:
if doc is not None:
warnings.warn("tunable no longer uses the doc argument", stacklevel=2)
self._ntdefault = default
self._ntsubtable = subtable
self._ntwritedefault = writeDefault
# self.__doc__ = doc
# Defer checks for empty sequences to check type hints.
# Report errors here when we can so the error points to the tunable line.
if default or not isinstance(default, collections.abc.Sequence):
topic_type = _get_topic_type_for_value(default)
if topic_type is None:
checked_type: type = type(default)
raise TypeError(
f"tunable is not publishable to NetworkTables, type: {checked_type.__name__}"
)
self._topic_type = topic_type
def __set_name__(self, owner: type, name: str) -> None:
type_hint: Optional[type] = None
# __orig_class__ is set after __init__, check it here.
orig_class = getattr(self, "__orig_class__", None)
if orig_class is not None:
# Accept field = tunable[Sequence[int]]([])
type_hint = typing.get_args(orig_class)[0]
else:
type_hint = typing.get_type_hints(owner).get(name)
origin = typing.get_origin(type_hint)
if origin is typing.ClassVar:
# Accept field: ClassVar[tunable[Sequence[int]]] = tunable([])
type_hint = typing.get_args(type_hint)[0]
origin = typing.get_origin(type_hint)
if origin is tunable:
# Accept field: tunable[Sequence[int]] = tunable([])
type_hint = typing.get_args(type_hint)[0]
if type_hint is not None:
topic_type = _get_topic_type(type_hint)
else:
topic_type = _get_topic_type_for_value(self._ntdefault)
if topic_type is None:
checked_type: type = type_hint or type(self._ntdefault)
raise TypeError(
f"tunable is not publishable to NetworkTables, type: {checked_type.__name__}"
)
self._topic_type = topic_type
@overload
def __get__(self, instance: None, owner=None) -> "tunable[V]": ...
@overload
def __get__(self, instance, owner=None) -> V: ...
def __get__(self, instance, owner=None):
if instance is not None:
return instance._tunables[self].get()
return self
def __set__(self, instance, value: V) -> None:
instance._tunables[self].set(value)
def _get_topic_type_for_value(value) -> Optional[Callable[[ntcore.Topic], typing.Any]]:
topic_type = _get_topic_type(type(value))
# bytes and str are Sequences. They must be checked before Sequence.
if topic_type is None and isinstance(value, collections.abc.Sequence):
if not value:
raise ValueError(
f"tunable default cannot be an empty sequence, got {value}"
)
topic_type = _get_topic_type(Sequence[type(value[0])]) # type: ignore [misc]
return topic_type
def setup_tunables(component, cname: str, prefix: Optional[str] = "components") -> None:
"""
Connects the tunables on an object to NetworkTables.
:param component: Component object
:param cname: Name of component
:param prefix: Prefix to use, or no prefix if None
.. note:: This is not needed in normal use, only useful
for testing
"""
cls = component.__class__
if prefix is None:
prefix = f"/{cname}"
else:
prefix = f"/{prefix}/{cname}"
NetworkTables = NetworkTableInstance.getDefault()
tunables: dict[tunable, ntcore.Topic] = {}
for n in dir(cls):
if n.startswith("_"):
continue
prop = getattr(cls, n)
if not isinstance(prop, tunable):
continue
if prop._ntsubtable:
key = f"{prefix}/{prop._ntsubtable}/{n}"
else:
key = f"{prefix}/{n}"
topic = prop._topic_type(NetworkTables.getTopic(key))
ntvalue = topic.getEntry(prop._ntdefault)
if prop._ntwritedefault:
ntvalue.set(prop._ntdefault)
else:
ntvalue.setDefault(prop._ntdefault)
tunables[prop] = ntvalue
component._tunables = tunables
@overload
def feedback(f: Callable[[T], V]) -> Callable[[T], V]: ...
@overload
def feedback(*, key: str) -> Callable[[Callable[[T], V]], Callable[[T], V]]: ...
def feedback(f=None, *, key: Optional[str] = None) -> Callable:
"""
This decorator allows you to create NetworkTables values that are
automatically updated with the return value of a method.
``key`` is an optional parameter, and if it is not supplied,
the key will default to the method name with a leading ``get_`` removed.
If the method does not start with ``get_``, the key will be the full
name of the method.
The key of the NetworkTables value will vary based on what kind of
object the decorated method belongs to:
* A component: ``/components/COMPONENTNAME/VARNAME``
* Your main robot class: ``/robot/VARNAME``
The NetworkTables value will be auto-updated in all modes.
.. warning:: The function should only act as a getter, and must not
take any arguments (other than self).
Example::
from magicbot import feedback
class MyComponent:
navx: ...
@feedback
def get_angle(self) -> float:
return self.navx.getYaw()
class MyRobot(magicbot.MagicRobot):
my_component: MyComponent
...
In this example, the NetworkTable key is stored at
``/components/my_component/angle``.
.. seealso:: :class:`~wpilib.LiveWindow` may suit your needs,
especially if you wish to monitor WPILib objects.
.. versionadded:: 2018.1.0
.. versionchanged:: 2024.1.0
WPILib Struct serializable types are supported when the return type is type hinted.
An ``int`` return type hint now creates an integer topic.
"""
if f is None:
return functools.partial(feedback, key=key)
if not callable(f):
raise TypeError(f"Illegal use of feedback decorator on non-callable {f!r}")
sig = inspect.signature(f)
name = f.__name__
if len(sig.parameters) != 1:
raise ValueError(
f"{name} may not take arguments other than 'self' (must be a simple getter method)"
)
# Set attributes to be checked during injection
f._magic_feedback = True
f._magic_feedback_key = key
return f
_topic_types = {
bool: ntcore.BooleanTopic,
int: ntcore.IntegerTopic,
float: ntcore.DoubleTopic,
str: ntcore.StringTopic,
bytes: ntcore.RawTopic,
}
_array_topic_types = {
bool: ntcore.BooleanArrayTopic,
int: ntcore.IntegerArrayTopic,
float: ntcore.DoubleArrayTopic,
str: ntcore.StringArrayTopic,
}
def _get_topic_type(
return_annotation,
) -> Optional[Callable[[ntcore.Topic], typing.Any]]:
if return_annotation in _topic_types:
return _topic_types[return_annotation]
if hasattr(return_annotation, "WPIStruct"):
return lambda topic: ntcore.StructTopic(topic, return_annotation)
# Check for PEP 484 generic types
origin = getattr(return_annotation, "__origin__", None)
args = typing.get_args(return_annotation)
if origin in (list, tuple, collections.abc.Sequence) and args:
# Ensure tuples are tuple[T, ...] or homogenous
if origin is tuple and not (
(len(args) == 2 and args[1] is Ellipsis) or len(set(args)) == 1
):
return None
inner_type = args[0]
if inner_type in _array_topic_types:
return _array_topic_types[inner_type]
if hasattr(inner_type, "WPIStruct"):
return lambda topic: ntcore.StructArrayTopic(topic, inner_type)
return None
def collect_feedbacks(component, cname: str, prefix: Optional[str] = "components"):
"""
Finds all methods decorated with :func:`feedback` on an object
and returns a list of 2-tuples (method, NetworkTables entry setter).
.. note:: This isn't useful for normal use.
"""
if prefix is None:
prefix = f"/{cname}"
else:
prefix = f"/{prefix}/{cname}"
nt = NetworkTableInstance.getDefault().getTable(prefix)
feedbacks = []
for name, method in inspect.getmembers(component, inspect.ismethod):
if getattr(method, "_magic_feedback", False):
key = method._magic_feedback_key
if key is None:
if name.startswith("get_"):
key = name[4:]
else:
key = name
return_annotation = typing.get_type_hints(method).get("return", None)
if return_annotation is not None:
topic_type = _get_topic_type(return_annotation)
else:
topic_type = None
if topic_type is None:
entry = nt.getEntry(key)
setter = entry.setValue
else:
publisher = topic_type(nt.getTopic(key)).publish()
setter = publisher.set
feedbacks.append((method, setter))
return feedbacks