Skip to content

Fix issue providing a custom timestamp #576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion canopen/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def transmit(self, timestamp: Optional[float] = None):
:param float timestamp:
Optional Unix timestamp to use, otherwise the current time is used.
"""
delta = timestamp or time.time() - OFFSET
delta = (timestamp or time.time()) - OFFSET
days, seconds = divmod(delta, ONE_DAY)
data = TIME_OF_DAY_STRUCT.pack(int(seconds * 1000), int(days))
self.network.send_message(self.cob_id, data)
39 changes: 33 additions & 6 deletions test/test_time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import unittest
from unittest.mock import patch
import time
from datetime import datetime

import canopen
import canopen.timestamp


class TestTime(unittest.TestCase):
Expand All @@ -10,13 +14,36 @@ def test_time_producer(self):
network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
network.connect(interface="virtual", receive_own_messages=True)
producer = canopen.timestamp.TimeProducer(network)
producer.transmit(1486236238)
msg = network.bus.recv(1)
network.disconnect()
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43")

# Test that the epoch is correct
epoch = datetime.strptime("1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z").timestamp()
self.assertEqual(int(epoch), canopen.timestamp.OFFSET)

current = time.time()
with patch("canopen.timestamp.time.time", return_value=current):
current_in_epoch = current - epoch

# Test it looking up the current time
producer.transmit()
msg = network.bus.recv(1)
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data)
self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY)
self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000))

# Test providing a timestamp
faketime = 1_927_999_438 # 2031-02-04 20:23:58
producer.transmit(faketime)
msg = network.bus.recv(1)
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data)
current_in_epoch = faketime - epoch
self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY)
self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000))

network.disconnect()

if __name__ == "__main__":
unittest.main()