@@ -36,123 +36,120 @@ def test_cycle_time(self):
36
36
is_extended_id = False , arbitration_id = 0x123 , data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ]
37
37
)
38
38
39
- with can .interface .Bus (interface = "virtual" ) as bus1 :
40
- with can .interface .Bus (interface = "virtual" ) as bus2 :
41
- # disabling the garbage collector makes the time readings more reliable
42
- gc .disable ()
43
-
44
- task = bus1 .send_periodic (msg , 0.01 , 1 )
45
- self .assertIsInstance (task , can .broadcastmanager .CyclicSendTaskABC )
39
+ with (
40
+ can .interface .Bus (interface = "virtual" ) as bus1 ,
41
+ can .interface .Bus (interface = "virtual" ) as bus2 ,
42
+ ):
43
+ # disabling the garbage collector makes the time readings more reliable
44
+ gc .disable ()
45
+
46
+ task = bus1 .send_periodic (msg , 0.01 , 1 )
47
+ self .assertIsInstance (task , can .broadcastmanager .CyclicSendTaskABC )
46
48
47
- sleep (2 )
48
- size = bus2 .queue .qsize ()
49
- # About 100 messages should have been transmitted
50
- self .assertTrue (
51
- 80 <= size <= 120 ,
52
- "100 +/- 20 messages should have been transmitted. But queue contained {}" .format (
53
- size
54
- ),
55
- )
56
- last_msg = bus2 .recv ()
57
- next_last_msg = bus2 .recv ()
49
+ sleep (2 )
50
+ size = bus2 .queue .qsize ()
51
+ # About 100 messages should have been transmitted
52
+ self .assertTrue (
53
+ 80 <= size <= 120 ,
54
+ "100 +/- 20 messages should have been transmitted. But queue contained {}" .format (
55
+ size
56
+ ),
57
+ )
58
+ last_msg = bus2 .recv ()
59
+ next_last_msg = bus2 .recv ()
58
60
59
- # we need to reenable the garbage collector again
60
- gc .enable ()
61
+ # we need to reenable the garbage collector again
62
+ gc .enable ()
61
63
62
- # Check consecutive messages are spaced properly in time and have
63
- # the same id/data
64
- self .assertMessageEqual (last_msg , next_last_msg )
64
+ # Check consecutive messages are spaced properly in time and have
65
+ # the same id/data
66
+ self .assertMessageEqual (last_msg , next_last_msg )
65
67
66
- # Check the message id/data sent is the same as message received
67
- # Set timestamp and channel to match recv'd because we don't care
68
- # and they are not initialized by the can.Message constructor.
69
- msg .timestamp = last_msg .timestamp
70
- msg .channel = last_msg .channel
71
- self .assertMessageEqual (msg , last_msg )
68
+ # Check the message id/data sent is the same as message received
69
+ # Set timestamp and channel to match recv'd because we don't care
70
+ # and they are not initialized by the can.Message constructor.
71
+ msg .timestamp = last_msg .timestamp
72
+ msg .channel = last_msg .channel
73
+ self .assertMessageEqual (msg , last_msg )
72
74
73
75
def test_removing_bus_tasks (self ):
74
- bus = can .interface .Bus (interface = "virtual" )
75
- tasks = []
76
- for task_i in range (10 ):
77
- msg = can .Message (
78
- is_extended_id = False ,
79
- arbitration_id = 0x123 ,
80
- data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
81
- )
82
- msg .arbitration_id = task_i
83
- task = bus .send_periodic (msg , 0.1 , 1 )
84
- tasks .append (task )
85
- self .assertIsInstance (task , can .broadcastmanager .CyclicSendTaskABC )
76
+ with can .interface .Bus (interface = "virtual" ) as bus :
77
+ tasks = []
78
+ for task_i in range (10 ):
79
+ msg = can .Message (
80
+ is_extended_id = False ,
81
+ arbitration_id = 0x123 ,
82
+ data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
83
+ )
84
+ msg .arbitration_id = task_i
85
+ task = bus .send_periodic (msg , 0.1 , 1 )
86
+ tasks .append (task )
87
+ self .assertIsInstance (task , can .broadcastmanager .CyclicSendTaskABC )
86
88
87
- assert len (bus ._periodic_tasks ) == 10
89
+ assert len (bus ._periodic_tasks ) == 10
88
90
89
- for task in tasks :
90
- # Note calling task.stop will remove the task from the Bus's internal task management list
91
- task .stop ()
91
+ for task in tasks :
92
+ # Note calling task.stop will remove the task from the Bus's internal task management list
93
+ task .stop ()
92
94
93
- self .join_threads ([task .thread for task in tasks ], 5.0 )
95
+ self .join_threads ([task .thread for task in tasks ], 5.0 )
94
96
95
- assert len (bus ._periodic_tasks ) == 0
96
- bus .shutdown ()
97
+ assert len (bus ._periodic_tasks ) == 0
97
98
98
99
def test_managed_tasks (self ):
99
- bus = can .interface .Bus (interface = "virtual" , receive_own_messages = True )
100
- tasks = []
101
- for task_i in range (3 ):
102
- msg = can .Message (
103
- is_extended_id = False ,
104
- arbitration_id = 0x123 ,
105
- data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
106
- )
107
- msg .arbitration_id = task_i
108
- task = bus .send_periodic (msg , 0.1 , 10 , store_task = False )
109
- tasks .append (task )
110
- self .assertIsInstance (task , can .broadcastmanager .CyclicSendTaskABC )
111
-
112
- assert len (bus ._periodic_tasks ) == 0
100
+ with can .interface .Bus (interface = "virtual" , receive_own_messages = True ) as bus :
101
+ tasks = []
102
+ for task_i in range (3 ):
103
+ msg = can .Message (
104
+ is_extended_id = False ,
105
+ arbitration_id = 0x123 ,
106
+ data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
107
+ )
108
+ msg .arbitration_id = task_i
109
+ task = bus .send_periodic (msg , 0.1 , 10 , store_task = False )
110
+ tasks .append (task )
111
+ self .assertIsInstance (task , can .broadcastmanager .CyclicSendTaskABC )
113
112
114
- # Self managed tasks should still be sending messages
115
- for _ in range (50 ):
116
- received_msg = bus .recv (timeout = 5.0 )
117
- assert received_msg is not None
118
- assert received_msg .arbitration_id in {0 , 1 , 2 }
113
+ assert len (bus ._periodic_tasks ) == 0
119
114
120
- for task in tasks :
121
- task .stop ()
115
+ # Self managed tasks should still be sending messages
116
+ for _ in range (50 ):
117
+ received_msg = bus .recv (timeout = 5.0 )
118
+ assert received_msg is not None
119
+ assert received_msg .arbitration_id in {0 , 1 , 2 }
122
120
123
- self .join_threads ([task .thread for task in tasks ], 5.0 )
121
+ for task in tasks :
122
+ task .stop ()
124
123
125
- bus . shutdown ( )
124
+ self . join_threads ([ task . thread for task in tasks ], 5.0 )
126
125
127
126
def test_stopping_perodic_tasks (self ):
128
- bus = can .interface .Bus (interface = "virtual" )
129
- tasks = []
130
- for task_i in range (10 ):
131
- msg = can .Message (
132
- is_extended_id = False ,
133
- arbitration_id = 0x123 ,
134
- data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
135
- )
136
- msg .arbitration_id = task_i
137
- task = bus .send_periodic (msg , 0.1 , 1 )
138
- tasks .append (task )
139
-
140
- assert len (bus ._periodic_tasks ) == 10
141
- # stop half the tasks using the task object
142
- for task in tasks [::2 ]:
143
- task .stop ()
127
+ with can .interface .Bus (interface = "virtual" ) as bus :
128
+ tasks = []
129
+ for task_i in range (10 ):
130
+ msg = can .Message (
131
+ is_extended_id = False ,
132
+ arbitration_id = 0x123 ,
133
+ data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
134
+ )
135
+ msg .arbitration_id = task_i
136
+ task = bus .send_periodic (msg , 0.1 , 1 )
137
+ tasks .append (task )
144
138
145
- assert len (bus ._periodic_tasks ) == 5
139
+ assert len (bus ._periodic_tasks ) == 10
140
+ # stop half the tasks using the task object
141
+ for task in tasks [::2 ]:
142
+ task .stop ()
146
143
147
- # stop the other half using the bus api
148
- bus .stop_all_periodic_tasks (remove_tasks = False )
149
- self .join_threads ([task .thread for task in tasks ], 5.0 )
144
+ assert len (bus ._periodic_tasks ) == 5
150
145
151
- # Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should
152
- # still be associated with the bus (e.g. for restarting )
153
- assert len ( bus . _periodic_tasks ) == 5
146
+ # stop the other half using the bus api
147
+ bus . stop_all_periodic_tasks ( remove_tasks = False )
148
+ self . join_threads ([ task . thread for task in tasks ], 5.0 )
154
149
155
- bus .shutdown ()
150
+ # Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should
151
+ # still be associated with the bus (e.g. for restarting)
152
+ assert len (bus ._periodic_tasks ) == 5
156
153
157
154
def test_restart_perodic_tasks (self ):
158
155
period = 0.01
@@ -214,25 +211,26 @@ def _read_all_messages(_bus: "can.interfaces.virtual.VirtualBus") -> None:
214
211
215
212
@unittest .skipIf (IS_CI , "fails randomly when run on CI server" )
216
213
def test_thread_based_cyclic_send_task (self ):
217
- bus = can .ThreadSafeBus (interface = "virtual" )
218
- msg = can .Message (
219
- is_extended_id = False , arbitration_id = 0x123 , data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ]
220
- )
214
+ with can .ThreadSafeBus (interface = "virtual" ) as bus :
215
+ msg = can .Message (
216
+ is_extended_id = False ,
217
+ arbitration_id = 0x123 ,
218
+ data = [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ],
219
+ )
221
220
222
- # good case, bus is up
223
- on_error_mock = MagicMock (return_value = False )
224
- task = can .broadcastmanager .ThreadBasedCyclicSendTask (
225
- bus = bus ,
226
- lock = bus ._lock_send_periodic ,
227
- messages = msg ,
228
- period = 0.1 ,
229
- duration = 3 ,
230
- on_error = on_error_mock ,
231
- )
232
- sleep (1 )
233
- on_error_mock .assert_not_called ()
234
- task .stop ()
235
- bus .shutdown ()
221
+ # good case, bus is up
222
+ on_error_mock = MagicMock (return_value = False )
223
+ task = can .broadcastmanager .ThreadBasedCyclicSendTask (
224
+ bus = bus ,
225
+ lock = bus ._lock_send_periodic ,
226
+ messages = msg ,
227
+ period = 0.1 ,
228
+ duration = 3 ,
229
+ on_error = on_error_mock ,
230
+ )
231
+ sleep (1 )
232
+ on_error_mock .assert_not_called ()
233
+ task .stop ()
236
234
237
235
# bus has been shut down
238
236
on_error_mock = MagicMock (return_value = False )
0 commit comments