Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion canopen/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def transmit(self, timestamp: Optional[float] = None):
:param float timestamp:
Optional Unix timestamp to use, otherwise the current time is used.
"""
delta = timestamp or time.time() - OFFSET
delta = (timestamp or time.time()) - OFFSET
days, seconds = divmod(delta, ONE_DAY)
data = TIME_OF_DAY_STRUCT.pack(int(seconds * 1000), int(days))
self.network.send_message(self.cob_id, data)
31 changes: 29 additions & 2 deletions test/test_time.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,49 @@
import struct
import time
import unittest
from datetime import datetime
from unittest.mock import patch

import canopen
import canopen.timestamp


class TestTime(unittest.TestCase):

def test_epoch(self):
"""Verify that the epoch matches the standard definition."""
epoch = datetime.strptime(
"1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z"
).timestamp()
self.assertEqual(int(epoch), canopen.timestamp.OFFSET)

def test_time_producer(self):
network = canopen.Network()
network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
network.connect(interface="virtual", receive_own_messages=True)
producer = canopen.timestamp.TimeProducer(network)
producer.transmit(1486236238)

# Provide a specific time to verify the proper encoding
producer.transmit(1_927_999_438) # 2031-02-04T19:23:58+00:00
msg = network.bus.recv(1)
network.disconnect()
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43")

# Test again with the current time as implicit timestamp
current = time.time()
with patch("canopen.timestamp.time.time", return_value=current):
current_from_epoch = current - canopen.timestamp.OFFSET
producer.transmit()
msg = network.bus.recv(1)
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
ms, days = struct.unpack("<LH", msg.data)
self.assertEqual(days, int(current_from_epoch) // 86400)
self.assertEqual(ms, int(current_from_epoch % 86400 * 1000))

network.disconnect()


if __name__ == "__main__":
unittest.main()
Loading