Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions test/back2back_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,37 +164,33 @@ def test_message_is_rx(self):
)
def test_message_is_rx_receive_own_messages(self):
"""The same as `test_message_direction` but testing with `receive_own_messages=True`."""
bus3 = can.Bus(
with can.Bus(
channel=self.CHANNEL_2,
interface=self.INTERFACE_2,
bitrate=self.BITRATE,
fd=TEST_CAN_FD,
single_handle=True,
receive_own_messages=True,
)
try:
) as bus3:
msg = can.Message(
is_extended_id=False, arbitration_id=0x300, data=[2, 1, 3], is_rx=False
)
bus3.send(msg)
self_recv_msg_bus3 = bus3.recv(self.TIMEOUT)
self.assertTrue(self_recv_msg_bus3.is_rx)
finally:
bus3.shutdown()

def test_unique_message_instances(self):
"""Verify that we have a different instances of message for each bus even with
`receive_own_messages=True`.
"""
bus3 = can.Bus(
with can.Bus(
channel=self.CHANNEL_2,
interface=self.INTERFACE_2,
bitrate=self.BITRATE,
fd=TEST_CAN_FD,
single_handle=True,
receive_own_messages=True,
)
try:
) as bus3:
msg = can.Message(
is_extended_id=False, arbitration_id=0x300, data=[2, 1, 3]
)
Expand All @@ -209,8 +205,6 @@ def test_unique_message_instances(self):
recv_msg_bus1.data[0] = 4
self.assertNotEqual(recv_msg_bus1.data, recv_msg_bus2.data)
self.assertEqual(recv_msg_bus2.data, self_recv_msg_bus3.data)
finally:
bus3.shutdown()

def test_fd_message(self):
msg = can.Message(
Expand Down
24 changes: 24 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest

from can.interfaces import virtual


@pytest.fixture(autouse=True)
def check_unclosed_virtual_channel():
"""
Pytest fixture for detecting leaked virtual CAN channels.

- The fixture yields control to the test.
- After the test completes, it acquires `virtual.channels_lock` and asserts
that `virtual.channels` is empty.
- If a test leaves behind any unclosed virtual CAN channels, the assertion
will fail, surfacing resource leaks early.

This helps maintain test isolation and prevents subtle bugs caused by
leftover state between tests.
"""

yield

with virtual.channels_lock:
assert len(virtual.channels) == 0
36 changes: 19 additions & 17 deletions test/notifier_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@ def test_single_bus(self):
self.assertTrue(notifier.stopped)

def test_multiple_bus(self):
with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1:
with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2:
reader = can.BufferedReader()
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
self.assertFalse(notifier.stopped)
msg = can.Message()
bus1.send(msg)
time.sleep(0.1)
bus2.send(msg)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 0)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 1)
notifier.stop()
self.assertTrue(notifier.stopped)
with (
can.Bus(0, interface="virtual", receive_own_messages=True) as bus1,
can.Bus(1, interface="virtual", receive_own_messages=True) as bus2,
):
reader = can.BufferedReader()
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
self.assertFalse(notifier.stopped)
msg = can.Message()
bus1.send(msg)
time.sleep(0.1)
bus2.send(msg)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 0)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 1)
notifier.stop()
self.assertTrue(notifier.stopped)

def test_context_manager(self):
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
Expand Down
224 changes: 111 additions & 113 deletions test/simplecyclic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,123 +36,120 @@ def test_cycle_time(self):
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
)

with can.interface.Bus(interface="virtual") as bus1:
with can.interface.Bus(interface="virtual") as bus2:
# disabling the garbage collector makes the time readings more reliable
gc.disable()

task = bus1.send_periodic(msg, 0.01, 1)
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)
with (
can.interface.Bus(interface="virtual") as bus1,
can.interface.Bus(interface="virtual") as bus2,
):
# disabling the garbage collector makes the time readings more reliable
gc.disable()

task = bus1.send_periodic(msg, 0.01, 1)
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)

sleep(2)
size = bus2.queue.qsize()
# About 100 messages should have been transmitted
self.assertTrue(
80 <= size <= 120,
"100 +/- 20 messages should have been transmitted. But queue contained {}".format(
size
),
)
last_msg = bus2.recv()
next_last_msg = bus2.recv()
sleep(2)
size = bus2.queue.qsize()
# About 100 messages should have been transmitted
self.assertTrue(
80 <= size <= 120,
"100 +/- 20 messages should have been transmitted. But queue contained {}".format(
size
),
)
last_msg = bus2.recv()
next_last_msg = bus2.recv()

# we need to reenable the garbage collector again
gc.enable()
# we need to reenable the garbage collector again
gc.enable()

# Check consecutive messages are spaced properly in time and have
# the same id/data
self.assertMessageEqual(last_msg, next_last_msg)
# Check consecutive messages are spaced properly in time and have
# the same id/data
self.assertMessageEqual(last_msg, next_last_msg)

# Check the message id/data sent is the same as message received
# Set timestamp and channel to match recv'd because we don't care
# and they are not initialized by the can.Message constructor.
msg.timestamp = last_msg.timestamp
msg.channel = last_msg.channel
self.assertMessageEqual(msg, last_msg)
# Check the message id/data sent is the same as message received
# Set timestamp and channel to match recv'd because we don't care
# and they are not initialized by the can.Message constructor.
msg.timestamp = last_msg.timestamp
msg.channel = last_msg.channel
self.assertMessageEqual(msg, last_msg)

def test_removing_bus_tasks(self):
bus = can.interface.Bus(interface="virtual")
tasks = []
for task_i in range(10):
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)
msg.arbitration_id = task_i
task = bus.send_periodic(msg, 0.1, 1)
tasks.append(task)
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)
with can.interface.Bus(interface="virtual") as bus:
tasks = []
for task_i in range(10):
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)
msg.arbitration_id = task_i
task = bus.send_periodic(msg, 0.1, 1)
tasks.append(task)
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)

assert len(bus._periodic_tasks) == 10
assert len(bus._periodic_tasks) == 10

for task in tasks:
# Note calling task.stop will remove the task from the Bus's internal task management list
task.stop()
for task in tasks:
# Note calling task.stop will remove the task from the Bus's internal task management list
task.stop()

self.join_threads([task.thread for task in tasks], 5.0)
self.join_threads([task.thread for task in tasks], 5.0)

assert len(bus._periodic_tasks) == 0
bus.shutdown()
assert len(bus._periodic_tasks) == 0

def test_managed_tasks(self):
bus = can.interface.Bus(interface="virtual", receive_own_messages=True)
tasks = []
for task_i in range(3):
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)
msg.arbitration_id = task_i
task = bus.send_periodic(msg, 0.1, 10, store_task=False)
tasks.append(task)
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)

assert len(bus._periodic_tasks) == 0
with can.interface.Bus(interface="virtual", receive_own_messages=True) as bus:
tasks = []
for task_i in range(3):
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)
msg.arbitration_id = task_i
task = bus.send_periodic(msg, 0.1, 10, store_task=False)
tasks.append(task)
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)

# Self managed tasks should still be sending messages
for _ in range(50):
received_msg = bus.recv(timeout=5.0)
assert received_msg is not None
assert received_msg.arbitration_id in {0, 1, 2}
assert len(bus._periodic_tasks) == 0

for task in tasks:
task.stop()
# Self managed tasks should still be sending messages
for _ in range(50):
received_msg = bus.recv(timeout=5.0)
assert received_msg is not None
assert received_msg.arbitration_id in {0, 1, 2}

self.join_threads([task.thread for task in tasks], 5.0)
for task in tasks:
task.stop()

bus.shutdown()
self.join_threads([task.thread for task in tasks], 5.0)

def test_stopping_perodic_tasks(self):
bus = can.interface.Bus(interface="virtual")
tasks = []
for task_i in range(10):
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)
msg.arbitration_id = task_i
task = bus.send_periodic(msg, 0.1, 1)
tasks.append(task)

assert len(bus._periodic_tasks) == 10
# stop half the tasks using the task object
for task in tasks[::2]:
task.stop()
with can.interface.Bus(interface="virtual") as bus:
tasks = []
for task_i in range(10):
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)
msg.arbitration_id = task_i
task = bus.send_periodic(msg, 0.1, 1)
tasks.append(task)

assert len(bus._periodic_tasks) == 5
assert len(bus._periodic_tasks) == 10
# stop half the tasks using the task object
for task in tasks[::2]:
task.stop()

# stop the other half using the bus api
bus.stop_all_periodic_tasks(remove_tasks=False)
self.join_threads([task.thread for task in tasks], 5.0)
assert len(bus._periodic_tasks) == 5

# Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should
# still be associated with the bus (e.g. for restarting)
assert len(bus._periodic_tasks) == 5
# stop the other half using the bus api
bus.stop_all_periodic_tasks(remove_tasks=False)
self.join_threads([task.thread for task in tasks], 5.0)

bus.shutdown()
# Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should
# still be associated with the bus (e.g. for restarting)
assert len(bus._periodic_tasks) == 5

def test_restart_perodic_tasks(self):
period = 0.01
Expand Down Expand Up @@ -214,25 +211,26 @@ def _read_all_messages(_bus: "can.interfaces.virtual.VirtualBus") -> None:

@unittest.skipIf(IS_CI, "fails randomly when run on CI server")
def test_thread_based_cyclic_send_task(self):
bus = can.ThreadSafeBus(interface="virtual")
msg = can.Message(
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
)
with can.ThreadSafeBus(interface="virtual") as bus:
msg = can.Message(
is_extended_id=False,
arbitration_id=0x123,
data=[0, 1, 2, 3, 4, 5, 6, 7],
)

# good case, bus is up
on_error_mock = MagicMock(return_value=False)
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
bus=bus,
lock=bus._lock_send_periodic,
messages=msg,
period=0.1,
duration=3,
on_error=on_error_mock,
)
sleep(1)
on_error_mock.assert_not_called()
task.stop()
bus.shutdown()
# good case, bus is up
on_error_mock = MagicMock(return_value=False)
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
bus=bus,
lock=bus._lock_send_periodic,
messages=msg,
period=0.1,
duration=3,
on_error=on_error_mock,
)
sleep(1)
on_error_mock.assert_not_called()
task.stop()

# bus has been shut down
on_error_mock = MagicMock(return_value=False)
Expand Down
8 changes: 4 additions & 4 deletions test/test_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ def test_bus_ignore_config():
with patch.object(
target=can.util, attribute="load_config", side_effect=can.util.load_config
):
_ = can.Bus(interface="virtual", ignore_config=True)
assert not can.util.load_config.called
with can.Bus(interface="virtual", ignore_config=True):
assert not can.util.load_config.called

_ = can.Bus(interface="virtual")
assert can.util.load_config.called
with can.Bus(interface="virtual"):
assert can.util.load_config.called


@patch.object(can.bus.BusABC, "shutdown")
Expand Down
Loading
Loading