Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 90 additions & 60 deletions j1939/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
fileHandler.setLevel(lLevel)
logger.addHandler(fileHandler)

can_set_logging_level('debug')
# can_set_logging_level('debug')

class j1939Listner(canListener):

Expand All @@ -82,6 +82,15 @@ class Bus(BusABC):
"""
A CAN Bus that implements the J1939 Protocol.

:param bool strict:
indicates whether to operate on strictly J1939 message (True)
or also handle 11-bit CAN messages (False). In strict operation,
specification of `j1939_filters` will override any specification of
`can_filters`; inbound and outbound 11-bit CAN messages will be
dropped. In non-strict operation, `j1939_filters` will add to
`can_filters`; inbound and outbound 11-bit CAN messages will be
accepted and sent, respectively.

:param list j1939_filters:
a list of dictionaries that specify filters that messages must
match to be received by this Bus. Messages can match any of the
Expand All @@ -94,7 +103,7 @@ class Bus(BusABC):

channel_info = "j1939 bus"

def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs):
def __init__(self, pdu_type=PDU, broadcast=True, strict=True, *args, **kwargs):
logger.debug("Creating a new j1939 bus")

#self.rx_can_message_queue = Queue()
Expand All @@ -104,6 +113,7 @@ def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs):

super(Bus, self).__init__(kwargs.get('channel'), kwargs.get('can_filters'))
self._pdu_type = pdu_type
self._strict = strict
self.timeout = 1
self._long_message_throttler = threading.Thread(target=self._throttler_function)
self._long_message_throttler.daemon = True
Expand All @@ -129,7 +139,7 @@ def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs):
if 'j1939_filters' in kwargs and kwargs['j1939_filters'] is not None:
filters = kwargs.pop('j1939_filters')
logger.debug("Got filters: {}".format(filters))
can_filters = []
can_filters = [] if self._strict else kwargs.get('can_filters', [])
for filt in filters:
can_id, can_mask = 0, 0
if 'pgn' in filt:
Expand Down Expand Up @@ -159,13 +169,15 @@ def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs):
raise ValueError("Bad timeout type")

logger.debug("Creating a new can bus")
self.can_bus = RawCanBus(*args, **kwargs)
self.can_bus = self._make_can_bus(*args, **kwargs)

canListener = j1939Listner(self.notification)
self.can_notifier = canNotifier(self.can_bus, [canListener], timeout=self.timeout)

self._long_message_throttler.start()

def _make_can_bus(self, *args, **kwargs):
return RawCanBus(*args, **kwargs)

def notification(self, inboundMessage):
#self.rx_can_message_queue.put(inboundMessage)
Expand All @@ -176,55 +188,64 @@ def notification(self, inboundMessage):
if isinstance(inboundMessage, Message):
logger.info('\n\n{}: Got a Message from CAN: {}'.format(inspect.stack()[0][3],inboundMessage))
if inboundMessage.id_type:
# Extended ID
# Only J1939 messages (i.e. 29-bit IDs) should go further than this point.
# Non-J1939 systems can co-exist with J1939 systems, but J1939 doesn't care
# about the content of their messages.
logger.info('{}: Message is j1939 msg'.format(inspect.stack()[0][3]))

#
# Need to determine if it's a broadcast message or
# limit to listening nodes only
#
arbitration_id = ArbitrationID()
arbitration_id.can_id = inboundMessage.arbitration_id
logger.info("{}: ArbitrationID = {}, inboundMessage.arbitration_id: 0x{:08x}".format(inspect.stack()[0][3],arbitration_id, inboundMessage.arbitration_id))

for (node, l_notifier) in self.node_queue_list:
logger.debug("notification: node=%s" % (node))
logger.debug(" notifier=%s" % (l_notifier))
logger.debug(" arbitration_id.pgn=%s" % (arbitration_id.pgn))
logger.debug(" destination_address=%s" % (arbitration_id.destination_address))

# redirect the AC stuff to the node processors. the rest can go
# to the main queue.
if node and (arbitration_id.pgn in [PGN_AC_ADDRESS_CLAIMED, PGN_AC_COMMANDED_ADDRESS, PGN_REQUEST_FOR_PGN]):
logger.info("{}: sending to notifier queue".format(inspect.stack()[0][3]))
# send the PDU to the node processor.
l_notifier.queue.put(inboundMessage)

# if node has the destination address, do something with the PDU
elif node and (arbitration_id.destination_address in node.address_list):
logger.info("{}: sending to process_incoming_message".format(inspect.stack()[0][3]))
rx_pdu = self._process_incoming_message(inboundMessage)
if rx_pdu:
logger.info("WP02: notification: sent to general queue: %s QQ=%s" % (rx_pdu, self.queue))
self.queue.put(rx_pdu)
elif node and (arbitration_id.destination_address is None):
logger.info("{}: sending broadcast to general queue".format(inspect.stack()[0][3]))
rx_pdu = self._process_incoming_message(inboundMessage)
logger.info("WP01: notification: sent broadcast to general queue: %s QQ=%s" % (rx_pdu, self.queue))
self.queue.put(rx_pdu)
elif node is None:
# always send the message to the logging queue
logger.info("{}: sending to general queue".format(inspect.stack()[0][3]))
rx_pdu = self._process_incoming_message(inboundMessage)
logger.info("WP03: notification: sent pdu [%s] to general queue" % rx_pdu)
self.queue.put(rx_pdu)
else:
logger.info("WP04: notification: pdu dropped: %s\n\n" % inboundMessage)
self._handle_pdu_msg(inboundMessage)
else:
logger.info("Received non J1939 message (ignoring)")
self._handle_non_pdu_msg(inboundMessage)

def _handle_pdu_msg(self, inboundMessage):
# Extended ID
# Only J1939 messages (i.e. 29-bit IDs) should go further than this point.
# Non-J1939 systems can co-exist with J1939 systems, but J1939 doesn't care
# about the content of their messages.
logger.info('{}: Message is j1939 msg'.format(inspect.stack()[0][3]))

#
# Need to determine if it's a broadcast message or
# limit to listening nodes only
#
arbitration_id = ArbitrationID()
arbitration_id.can_id = inboundMessage.arbitration_id
logger.info("{}: ArbitrationID = {}, inboundMessage.arbitration_id: 0x{:08x}".format(inspect.stack()[0][3],arbitration_id, inboundMessage.arbitration_id))

for (node, l_notifier) in self.node_queue_list:
logger.debug("notification: node=%s" % (node))
logger.debug(" notifier=%s" % (l_notifier))
logger.debug(" arbitration_id.pgn=%s" % (arbitration_id.pgn))
logger.debug(" destination_address=%s" % (arbitration_id.destination_address))

# redirect the AC stuff to the node processors. the rest can go
# to the main queue.
if node and (arbitration_id.pgn in [PGN_AC_ADDRESS_CLAIMED, PGN_AC_COMMANDED_ADDRESS, PGN_REQUEST_FOR_PGN]):
logger.info("{}: sending to notifier queue".format(inspect.stack()[0][3]))
# send the PDU to the node processor.
l_notifier.queue.put(inboundMessage)

# if node has the destination address, do something with the PDU
elif node and (arbitration_id.destination_address in node.address_list):
logger.info("{}: sending to process_incoming_message".format(inspect.stack()[0][3]))
rx_pdu = self._process_incoming_message(inboundMessage)
if rx_pdu:
logger.info("WP02: notification: sent to general queue: %s QQ=%s" % (rx_pdu, self.queue))
self.queue.put(rx_pdu)
elif node and (arbitration_id.destination_address is None):
logger.info("{}: sending broadcast to general queue".format(inspect.stack()[0][3]))
rx_pdu = self._process_incoming_message(inboundMessage)
logger.info("WP01: notification: sent broadcast to general queue: %s QQ=%s" % (rx_pdu, self.queue))
self.queue.put(rx_pdu)
elif node is None:
# always send the message to the logging queue
logger.info("{}: sending to general queue".format(inspect.stack()[0][3]))
rx_pdu = self._process_incoming_message(inboundMessage)
logger.info("WP03: notification: sent pdu [%s] to general queue" % rx_pdu)
self.queue.put(rx_pdu)
else:
logger.info("WP04: notification: pdu dropped: %s\n\n" % inboundMessage)

def _handle_non_pdu_msg(self, inboundMessage):
if self._strict:
logger.info("Received non J1939 message (ignoring)")
else:
self.queue.put(inboundMessage)

def connect(self, node):
"""
Expand Down Expand Up @@ -263,7 +284,13 @@ def recv(self, timeout=None):
# listener.on_error_received(rx_error)

def send(self, msg, timeout=None):
logger.info("j1939.send: msg={}".format(msg))
logger.info("j1939.send: msg=%s", msg)
if isinstance(msg, self._pdu_type):
self._send_pdu(msg)
elif not self._strict:
self._send(msg)

def _send_pdu(self, msg):
messages = []
if len(msg.data) > 8:
logger.info("j1939.send: message is > than 8 bytes")
Expand Down Expand Up @@ -403,13 +430,16 @@ def send(self, msg, timeout=None):
dlc=len(msg.data),
data=msg.data)

logger.debug("j1939.send: calling can_bus_send: can-msg: {}".format(can_message))
try:
self.can_bus.send(can_message)
except CanError:
if self._ignore_can_send_error:
pass
raise
self._send(can_message)

def _send(self, can_message):
logger.debug("j1939.send: calling can_bus_send: can-msg: %s", can_message)
try:
self.can_bus.send(can_message)
except CanError:
if self._ignore_can_send_error:
pass
raise

def shutdown(self):
self.can_notifier._running = False
Expand Down