Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions j1939/electronic_control_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(self, data_link_layer='j1939-21', max_cmdt_packets=1, minimum_tp_rt
self._bus = None
# Locking object for send
self._send_lock = threading.Lock()
# Locking object for _timer_events list
self._events_list_lock = threading.Lock()

if max_cmdt_packets > 0xFF:
raise ValueError("max number of segments that can be sent is 0xFF")
Expand Down Expand Up @@ -85,7 +87,13 @@ def add_timer(self, delta_time, callback, cookie=None):
'cookie': cookie,
}

self._timer_events.append( d )
res = self._events_list_lock.acquire(timeout = 1.0)
if res:
self._timer_events.append( d )
self._events_list_lock.release()
else:
logger.error("_events_list_lock acquire failed")

self._job_thread_wakeup()

def remove_timer(self, callback):
Expand All @@ -94,9 +102,19 @@ def remove_timer(self, callback):
:param callback:
The callback to be removed from the timer event list
"""
for event in self._timer_events:
if event['callback'] == callback:
self._timer_events.remove( event )
res = self._events_list_lock.acquire(timeout = 1.0)
if res:
for event in self._timer_events[:]:
if event['callback'] == callback:
try:
self._timer_events.remove( event )
except Exception as e:
logger.error(str(e))

self._events_list_lock.release()
else:
logger.error("_events_list_lock acquire failed")

self._job_thread_wakeup()

def connect(self, *args, **kwargs):
Expand Down Expand Up @@ -282,7 +300,7 @@ def _async_job_thread(self):
next_wakeup = self.j1939_dll.async_job_thread(now)

# check timer events
for event in self._timer_events:
for event in self._timer_events[:]:
if event['deadline'] > now:
if next_wakeup > event['deadline']:
next_wakeup = event['deadline']
Expand All @@ -299,7 +317,16 @@ def _async_job_thread(self):
next_wakeup = event['deadline']
else:
# remove from list
self._timer_events.remove( event )
res = self._events_list_lock.acquire(timeout = 1.0)
if res:
try:
self._timer_events.remove( event )
except Exception as e:
logger.error(str(e))

self._events_list_lock.release()
else:
logger.error("_events_list_lock acquire failed")

time_to_sleep = next_wakeup - time.time()
if time_to_sleep > 0:
Expand Down