Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.9.33 (2026-05-11)

- Removed a deadlock that occurred when unsubscribing from an active subscription while a background thread attempted to acquire a subscription lock already held by the main subscribing thread.

## 0.9.32 (2026-04-02)

- Re-generate graph api.
Expand Down
61 changes: 35 additions & 26 deletions python/mujinwebstackclient/controllerwebclientraw.py
Comment thread
gcccai marked this conversation as resolved.
Comment thread
woswos marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ async def _StopAllSubscriptions(self, error: Optional[ControllerGraphClientExcep
subscription.GetSubscriptionCallbackFunction()(error=error, response=None)
self._subscriptions.clear()

def SubscribeGraphAPI(self, query: str, callbackFunction: Callable[[Optional[ControllerGraphClientException], Optional[dict]], None], variables: Optional[dict] = None) -> Subscription:
def SubscribeGraphAPI(self, query: str, callbackFunction: Callable[[Optional[ControllerGraphClientException], Optional[dict]], None], variables: Optional[dict] = None, timeout: float = 5.0) -> Subscription:
"""Subscribes to changes on Mujin controller.

Args:
Expand Down Expand Up @@ -634,12 +634,18 @@ async def _Subscribe():
self._EnsureWebSocketConnection()

# wait until the subscription is created
self._backgroundThread.RunCoroutine(_Subscribe()).result()
future = self._backgroundThread.RunCoroutine(_Subscribe())
try:
# wait for the subscribe outside _subscriptionLock to avoid deadlocking
# with websocket callbacks that may acquire the same lock while resolving
future.result(timeout=timeout)
except Exception as e:
raise ControllerGraphClientException(f'Failed to subscribe within timeout: {e}')
with self._subscriptionLock:
self._subscriptions[subscriptionId] = subscription
return subscription

return subscription

def UnsubscribeGraphAPI(self, subscription: Subscription):
def UnsubscribeGraphAPI(self, subscription: Subscription, timeout: float = 5.0):
"""Unsubscribes to Mujin controller.

Args:
Expand All @@ -649,22 +655,14 @@ def UnsubscribeGraphAPI(self, subscription: Subscription):

async def _Unsubscribe():
try:
# check if self._subscriptionIds has subscriptionId
if subscriptionId in self._subscriptions:
await self._webSocket.send(
json.dumps(
{
'id': subscriptionId,
'type': 'stop',
},
),
)
# remove subscription
self._subscriptions.pop(subscriptionId, None)

# close the websocket connection if no more subscribers are left
if len(self._subscriptions) == 0:
await self._CloseWebSocket()
await self._webSocket.send(
json.dumps(
{
'id': subscriptionId,
'type': 'stop',
},
),
)
except Exception as e:
log.exception('caught WebSocket exception: %s', e)
await self._StopAllSubscriptions(ControllerGraphClientException(_('Failed to unsubscribe: %s') % (e)))
Expand All @@ -673,10 +671,21 @@ async def _Unsubscribe():
# nothing to do if websocket is not established
Comment thread
monaresh marked this conversation as resolved.
if not self._IsWebSocketConnectionOpen():
return

# check if the subscription exists at all
if subscription.GetSubscriptionID() not in self._subscriptions:
# check if self._subscriptionIds has subscriptionId
if subscriptionId not in self._subscriptions:
return
# request unsubscribe under lock
future = self._backgroundThread.RunCoroutine(_Unsubscribe())
try:
# wait for the async result outside the lock
future.result(timeout=timeout)
except Exception as e:
log.exception('timeout or error while unsubscribing: %s', e)

# re-acquire lock to safely modify the dictionary and check for shutdown
with self._subscriptionLock:
self._subscriptions.pop(subscriptionId, None)

# actually unsubscribe and wait until there is a result
self._backgroundThread.RunCoroutine(_Unsubscribe()).result()
# close the websocket connection if no more subscribers are left
if len(self._subscriptions) == 0 and self._IsWebSocketConnectionOpen():
self._backgroundThread.RunCoroutine(self._CloseWebSocket())
2 changes: 1 addition & 1 deletion python/mujinwebstackclient/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.9.32'
__version__ = '0.9.33'

# Do not forget to update CHANGELOG.md
Loading