Skip to content

Commit 1f9545f

Browse files
Bernd VerstCopilot
andcommitted
Address automated review feedback
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b6f3b43 commit 1f9545f

6 files changed

Lines changed: 73 additions & 18 deletions

File tree

docs/superpowers/plans/2026-04-23-grpc-resiliency.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# gRPC Resiliency Implementation Plan
22

3-
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
3+
> [!NOTE]
4+
> For agentic workers: REQUIRED SUB-SKILL:
5+
> Use superpowers:subagent-driven-development (recommended) or
6+
> superpowers:executing-plans to implement this plan task-by-task.
7+
> Steps use checkbox (`- [ ]`) syntax for tracking.
48
59
**Goal:** Implement automatic healing of stale gRPC worker streams and client channels in `durabletask-python`, aligned with the behavior added in `durabletask-dotnet` PR 708.
610

durabletask/client.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def _invoke_unary(
224224
method_name: str,
225225
request: Any,
226226
*,
227-
timeout: Optional[int] = None):
227+
timeout: Optional[float] = None):
228228
method = getattr(self._stub, method_name)
229229
try:
230230
if timeout is None:
@@ -406,7 +406,7 @@ def get_all_orchestration_states(self,
406406

407407
def wait_for_orchestration_start(self, instance_id: str, *,
408408
fetch_payloads: bool = False,
409-
timeout: int = 60) -> Optional[OrchestrationState]:
409+
timeout: float = 60) -> Optional[OrchestrationState]:
410410
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
411411
try:
412412
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
@@ -427,7 +427,7 @@ def wait_for_orchestration_start(self, instance_id: str, *,
427427

428428
def wait_for_orchestration_completion(self, instance_id: str, *,
429429
fetch_payloads: bool = True,
430-
timeout: int = 60) -> Optional[OrchestrationState]:
430+
timeout: float = 60) -> Optional[OrchestrationState]:
431431
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
432432
try:
433433
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")
@@ -685,7 +685,7 @@ async def _invoke_unary(
685685
method_name: str,
686686
request: Any,
687687
*,
688-
timeout: Optional[int] = None):
688+
timeout: Optional[float] = None):
689689
method = getattr(self._stub, method_name)
690690
try:
691691
if timeout is None:
@@ -733,10 +733,9 @@ async def _close_retired_channel(self, channel: grpc.aio.Channel) -> None:
733733
await asyncio.sleep(30.0)
734734
await channel.close()
735735
finally:
736-
try:
737-
self._retired_channels.remove(channel)
738-
except ValueError:
739-
pass
736+
async with self._recreate_lock:
737+
if channel in self._retired_channels:
738+
self._retired_channels.remove(channel)
740739

741740
async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
742741
input: Optional[TInput] = None,
@@ -843,7 +842,7 @@ async def get_all_orchestration_states(self,
843842

844843
async def wait_for_orchestration_start(self, instance_id: str, *,
845844
fetch_payloads: bool = False,
846-
timeout: int = 60) -> Optional[OrchestrationState]:
845+
timeout: float = 60) -> Optional[OrchestrationState]:
847846
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
848847
try:
849848
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
@@ -863,7 +862,7 @@ async def wait_for_orchestration_start(self, instance_id: str, *,
863862

864863
async def wait_for_orchestration_completion(self, instance_id: str, *,
865864
fetch_payloads: bool = True,
866-
timeout: int = 60) -> Optional[OrchestrationState]:
865+
timeout: float = 60) -> Optional[OrchestrationState]:
867866
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
868867
try:
869868
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")

durabletask/internal/grpc_resiliency.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ def get_full_jitter_delay_seconds(
1313
attempt: int,
1414
*,
1515
base_seconds: float,
16-
cap_seconds: float) -> float:
16+
cap_seconds: float,
17+
) -> float:
1718
capped_attempt = min(attempt, 30)
1819
upper_bound = min(cap_seconds, base_seconds * (2 ** capped_attempt))
1920
return random.random() * upper_bound

durabletask/worker.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def _close_channel(channel: Any) -> None:
194194
try:
195195
channel.close()
196196
except Exception:
197-
pass
197+
logging.debug("Ignoring channel close failure during worker cleanup.", exc_info=True)
198198

199199

200200
class VersioningOptions:
@@ -744,9 +744,10 @@ def create_fresh_connection():
744744

745745
def wrap_execution(handler, release):
746746
def wrapped(*args, **kwargs):
747-
result = handler(*args, **kwargs)
748-
release()
749-
return result
747+
try:
748+
return handler(*args, **kwargs)
749+
finally:
750+
release()
750751

751752
return wrapped
752753

tests/durabletask/test_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -801,8 +801,8 @@ async def test_async_client_close_prevents_channel_recreation_race():
801801
client._recreate_lock.release()
802802

803803
with pytest.raises(grpc.aio.AioRpcError):
804-
await rpc_task
805-
await close_task
804+
_ = await rpc_task
805+
_ = await close_task
806806

807807
assert mock_get_channel.call_count == 1
808808
first_channel.close.assert_awaited_once()

tests/durabletask/test_worker_resiliency.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,56 @@ def create_stub(channel):
449449
created_channels[0].close.assert_called_once()
450450

451451

452+
@pytest.mark.asyncio
453+
async def test_worker_releases_inflight_channel_when_activity_handler_raises(monkeypatch):
454+
worker = TaskHubGrpcWorker()
455+
worker_manager = DummyWorkerManager()
456+
worker._async_worker_manager = worker_manager
457+
monkeypatch.setattr(worker._shutdown, "wait", lambda timeout: False)
458+
459+
def fail_activity(req, stub, completion_token):
460+
raise RuntimeError("boom")
461+
462+
worker._execute_activity = fail_activity
463+
464+
created_channels = []
465+
466+
def get_grpc_channel(*args, **kwargs):
467+
channel = MagicMock(name=f"channel-{len(created_channels) + 1}")
468+
created_channels.append(channel)
469+
return channel
470+
471+
first_stub = MagicMock()
472+
first_stub.GetWorkItems.return_value = FakeResponseStream(items=[_make_activity_work_item()])
473+
474+
second_stub = MagicMock()
475+
second_stub.GetWorkItems.side_effect = FakeRpcError(
476+
grpc.StatusCode.CANCELLED,
477+
"stop",
478+
)
479+
480+
stubs = [first_stub, second_stub]
481+
482+
def create_stub(channel):
483+
return stubs.pop(0)
484+
485+
monkeypatch.setattr("durabletask.worker.shared.get_grpc_channel", get_grpc_channel)
486+
monkeypatch.setattr("durabletask.worker.stubs.TaskHubSidecarServiceStub", create_stub)
487+
488+
await worker._async_run_loop()
489+
490+
assert len(worker_manager.submissions) == 1
491+
created_channels[0].close.assert_not_called()
492+
493+
_, submission = worker_manager.submissions[0]
494+
func, _, req, stub, completion_token = submission
495+
with pytest.raises(RuntimeError, match="boom"):
496+
func(req, stub, completion_token)
497+
498+
created_channels[0].close.assert_called_once()
499+
created_channels[1].close.assert_called_once()
500+
501+
452502
@pytest.mark.asyncio
453503
async def test_worker_shutdown_drains_real_manager_work_before_closing_retired_sdk_channel(monkeypatch):
454504
worker = TaskHubGrpcWorker(

0 commit comments

Comments
 (0)