Skip to content
6 changes: 4 additions & 2 deletions j1939/electronic_control_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ def send_message(self, can_id, extended_id, data, fd_format=False):
bitrate_switch=fd_format
)
with self._send_lock:
self._bus.send(msg)
# TODO: check error receivement
try:
self._bus.send(msg)
except can.CanError as e:
logger.error(f'not able to send message because {e}')

def notify(self, can_id, data, timestamp):
"""Feed incoming CAN message into this ecu.
Expand Down
127 changes: 85 additions & 42 deletions j1939/j1939_21.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt
self._rcv_buffer = {}
# Send buffers
self._snd_buffer = {}
# send que
self._snd_que = {}

# List of ControllerApplication
self._cas = []
Expand All @@ -51,7 +53,7 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt
self._minimum_tp_rts_cts_dt_interval = minimum_tp_rts_cts_dt_interval

# set minimum time between two tp-bam messages
if minimum_tp_bam_dt_interval == None:
if minimum_tp_bam_dt_interval is None:
self._minimum_tp_bam_dt_interval = self.Timeout.Tb
else:
self._minimum_tp_bam_dt_interval = minimum_tp_bam_dt_interval
Expand Down Expand Up @@ -89,7 +91,72 @@ def _buffer_hash(self, src_address, dest_address):
"""
return ((src_address & 0xFF) << 8) | (dest_address & 0xFF)

def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, data, time_limit, frame_format):

def _put_multi_msg(self, data, dest_address, src_address,priority, pgn, buffer_hash,pdu_specific):
message_size = len(data)
num_packets = (
int(message_size / 7)
if (message_size % 7 == 0)
else int(message_size / 7) + 1
)

# if the PF is between 240 and 255, the message can only be broadcast
if dest_address == ParameterGroupNumber.Address.GLOBAL:
# send BAM
self.__send_tp_bam(
src_address, priority, pgn.value, message_size, num_packets
)
# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.SENDING_BM,
"deadline": time.time() + self._minimum_tp_bam_dt_interval,
"src_address": src_address,
"dest_address": ParameterGroupNumber.Address.GLOBAL,
"next_packet_to_send": 0,
}
else:
# send RTS/CTS
pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer
# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.WAITING_CTS,
"deadline": time.time() + self.Timeout.T3,
"src_address": src_address,
"dest_address": pdu_specific,
"next_packet_to_send": 0,
"next_wait_on_cts": 0,
}
self.__send_tp_rts(
src_address,
pdu_specific,
priority,
pgn.value,
message_size,
num_packets,
min(self._max_cmdt_packets, num_packets),
)

def send_pgn(
self,
data_page,
pdu_format,
pdu_specific,
priority,
src_address,
data,
time_limit,
frame_format,
):
pgn = ParameterGroupNumber(data_page, pdu_format, pdu_specific)
if len(data) <= 8:
# send normal message
Expand All @@ -108,47 +175,16 @@ def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, d
buffer_hash = self._buffer_hash(src_address, dest_address)
if buffer_hash in self._snd_buffer:
# There is already a sequence active for this pair
# put in que
self._snd_que[pgn.value] = {'buffer_hash': buffer_hash,
'pdu_specific': pdu_specific,
'priority': priority,
'src_address':src_address,
'pgn': pgn,
'data': data,
'dest_address': dest_address}
return False
message_size = len(data)
num_packets = int(message_size / 7) if (message_size % 7 == 0) else int(message_size / 7) + 1

# if the PF is between 240 and 255, the message can only be broadcast
if dest_address == ParameterGroupNumber.Address.GLOBAL:
# send BAM
self.__send_tp_bam(src_address, priority, pgn.value, message_size, num_packets)

# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.SENDING_BM,
"deadline": time.time() + self._minimum_tp_bam_dt_interval,
'src_address' : src_address,
'dest_address' : ParameterGroupNumber.Address.GLOBAL,
'next_packet_to_send' : 0,
}
else:
# send RTS/CTS
pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer
# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.WAITING_CTS,
"deadline": time.time() + self.Timeout.T3,
'src_address' : src_address,
'dest_address' : pdu_specific,
'next_packet_to_send' : 0,
'next_wait_on_cts': 0,
}
self.__send_tp_rts(src_address, pdu_specific, priority, pgn.value, message_size, num_packets, min(self._max_cmdt_packets, num_packets))

self._put_multi_msg(data, dest_address, src_address, priority, pgn, buffer_hash, pdu_specific)
self.__job_thread_wakeup()

return True
Expand All @@ -175,6 +211,13 @@ def async_job_thread(self, now):
# TODO: should we notify our CAs about the cancelled transfer?
del self._rcv_buffer[bufid]

# get from que if buffer is empty
if not bool(self._snd_buffer):
for key, value in self._snd_que.items():
self._put_multi_msg(**value)
self._snd_que.pop(key)
break

# check send buffers
# using "list(x)" to prevent "RuntimeError: dictionary changed size during iteration"
for bufid in list(self._snd_buffer):
Expand Down