Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions canopen/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def unsubscribe(self, can_id, callback=None) -> None:
If given, remove only this callback. Otherwise all callbacks for
the CAN ID.
"""
if callback is None:
del self.subscribers[can_id]
else:
if callback is not None:
self.subscribers[can_id].remove(callback)
if not self.subscribers[can_id] or callback is None:
del self.subscribers[can_id]

def connect(self, *args, **kwargs) -> Network:
"""Connect to CAN bus using python-can.
Expand Down
4 changes: 4 additions & 0 deletions canopen/node/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(
self.emcy = EmcyProducer(0x80 + self.id)

def associate_network(self, network: canopen.network.Network):
if self.has_network():
raise RuntimeError("Node is already associated with a network")
self.network = network
self.sdo.network = network
self.tpdo.network = network
Expand All @@ -49,6 +51,8 @@ def associate_network(self, network: canopen.network.Network):
network.subscribe(0, self.nmt.on_command)

def remove_network(self) -> None:
if not self.has_network():
return
self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request)
self.network.unsubscribe(0, self.nmt.on_command)
self.network = canopen.network._UNINITIALIZED_NETWORK
Expand Down
4 changes: 4 additions & 0 deletions canopen/node/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def __init__(
self.load_configuration()

def associate_network(self, network: canopen.network.Network):
if self.has_network():
raise RuntimeError("Node is already associated with a network")
self.network = network
self.sdo.network = network
self.pdo.network = network
Expand All @@ -64,6 +66,8 @@ def associate_network(self, network: canopen.network.Network):
network.subscribe(0, self.nmt.on_command)

def remove_network(self) -> None:
if not self.has_network():
return
for sdo in self.sdo_channels:
self.network.unsubscribe(sdo.tx_cobid, sdo.on_response)
self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
Expand Down
104 changes: 104 additions & 0 deletions test/test_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import unittest

import canopen


def count_subscribers(network: canopen.Network) -> int:
"""Count the number of subscribers in the network."""
return sum(len(n) for n in network.subscribers.values())


class TestLocalNode(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.network = canopen.Network()
cls.network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
cls.network.connect(interface="virtual")

cls.node = canopen.LocalNode(2, canopen.objectdictionary.ObjectDictionary())

@classmethod
def tearDownClass(cls):
cls.network.disconnect()

def test_associate_network(self):
# Need to store the number of subscribers before associating because the
# network implementation automatically adds subscribers to the list
n_subscribers = count_subscribers(self.network)

# Associating the network with the local node
self.node.associate_network(self.network)
self.assertIs(self.node.network, self.network)
self.assertIs(self.node.sdo.network, self.network)
self.assertIs(self.node.tpdo.network, self.network)
self.assertIs(self.node.rpdo.network, self.network)
self.assertIs(self.node.nmt.network, self.network)
self.assertIs(self.node.emcy.network, self.network)

# Test that its not possible to associate the network multiple times
with self.assertRaises(RuntimeError) as cm:
self.node.associate_network(self.network)
self.assertIn("already associated with a network", str(cm.exception))

# Test removal of the network. The count of subscribers should
# be the same as before the association
self.node.remove_network()
uninitalized = canopen.network._UNINITIALIZED_NETWORK
self.assertIs(self.node.network, uninitalized)
self.assertIs(self.node.sdo.network, uninitalized)
self.assertIs(self.node.tpdo.network, uninitalized)
self.assertIs(self.node.rpdo.network, uninitalized)
self.assertIs(self.node.nmt.network, uninitalized)
self.assertIs(self.node.emcy.network, uninitalized)
self.assertEqual(count_subscribers(self.network), n_subscribers)

# Test that its possible to deassociate the network multiple times
self.node.remove_network()


class TestRemoteNode(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.network = canopen.Network()
cls.network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
cls.network.connect(interface="virtual")

cls.node = canopen.RemoteNode(2, canopen.objectdictionary.ObjectDictionary())

@classmethod
def tearDownClass(cls):
cls.network.disconnect()

def test_associate_network(self):
# Need to store the number of subscribers before associating because the
# network implementation automatically adds subscribers to the list
n_subscribers = count_subscribers(self.network)

# Associating the network with the local node
self.node.associate_network(self.network)
self.assertIs(self.node.network, self.network)
self.assertIs(self.node.sdo.network, self.network)
self.assertIs(self.node.tpdo.network, self.network)
self.assertIs(self.node.rpdo.network, self.network)
self.assertIs(self.node.nmt.network, self.network)

# Test that its not possible to associate the network multiple times
with self.assertRaises(RuntimeError) as cm:
self.node.associate_network(self.network)
self.assertIn("already associated with a network", str(cm.exception))

# Test removal of the network. The count of subscribers should
# be the same as before the association
self.node.remove_network()
uninitalized = canopen.network._UNINITIALIZED_NETWORK
self.assertIs(self.node.network, uninitalized)
self.assertIs(self.node.sdo.network, uninitalized)
self.assertIs(self.node.tpdo.network, uninitalized)
self.assertIs(self.node.rpdo.network, uninitalized)
self.assertIs(self.node.nmt.network, uninitalized)
self.assertEqual(count_subscribers(self.network), n_subscribers)

# Test that its possible to deassociate the network multiple times
self.node.remove_network()