Skip to content

Commit b6f3b43

Browse files
Bernd VerstCopilot
andcommitted
Fix sync client channel cleanup
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 32f383d commit b6f3b43

3 files changed

Lines changed: 120 additions & 9 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ FIXED
3939
- Improved sync and async gRPC clients so repeated transport failures recreate
4040
SDK-owned channels, while long-poll deadlines, successful replies, and
4141
application-level RPC errors do not trigger unnecessary channel replacement.
42+
- Fixed `TaskHubGrpcClient.close()` so explicit sync client shutdown now closes
43+
any previously retired SDK-owned gRPC channels immediately instead of waiting
44+
for the delayed cleanup timer.
4245

4346
## v1.4.0
4447

durabletask/client.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,10 @@ def __init__(self, *,
211211
self._client_failure_tracker = FailureTracker(
212212
self._resiliency_options.channel_recreate_failure_threshold
213213
)
214+
self._closing = False
214215
self._last_recreate_time = 0.0
215216
self._recreate_lock = threading.Lock()
217+
self._retired_channels: dict[grpc.Channel, threading.Timer] = {}
216218
self._logger = shared.get_logger("client", log_handler, log_formatter)
217219
self.default_version = default_version
218220
self._payload_store = payload_store
@@ -243,9 +245,11 @@ def _invoke_unary(
243245
return response
244246

245247
def _maybe_recreate_channel(self) -> None:
246-
if not self._owns_channel:
248+
if not self._owns_channel or self._closing:
247249
return
248250
with self._recreate_lock:
251+
if self._closing:
252+
return
249253
now = time.monotonic()
250254
if now - self._last_recreate_time < self._resiliency_options.min_recreate_interval_seconds:
251255
return
@@ -259,10 +263,22 @@ def _maybe_recreate_channel(self) -> None:
259263
self._stub = stubs.TaskHubSidecarServiceStub(self._channel)
260264
self._last_recreate_time = now
261265
self._client_failure_tracker.record_success()
262-
close_timer = threading.Timer(30.0, old_channel.close)
266+
close_timer = threading.Timer(
267+
30.0,
268+
self._close_retired_channel,
269+
args=(old_channel,),
270+
)
263271
close_timer.daemon = True
272+
self._retired_channels[old_channel] = close_timer
264273
close_timer.start()
265274

275+
def _close_retired_channel(self, channel: grpc.Channel) -> None:
276+
with self._recreate_lock:
277+
close_timer = self._retired_channels.pop(channel, None)
278+
if close_timer is None:
279+
return
280+
channel.close()
281+
266282
def close(self) -> None:
267283
"""Closes the underlying gRPC channel.
268284
@@ -272,7 +288,15 @@ def close(self) -> None:
272288
it.
273289
"""
274290
if self._owns_channel:
275-
self._channel.close()
291+
with self._recreate_lock:
292+
self._closing = True
293+
retired_channels = list(self._retired_channels.items())
294+
self._retired_channels.clear()
295+
current_channel = self._channel
296+
for retired_channel, close_timer in retired_channels:
297+
close_timer.cancel()
298+
retired_channel.close()
299+
current_channel.close()
276300

277301
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
278302
input: Optional[TInput] = None,

tests/durabletask/test_client.py

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -382,11 +382,88 @@ def test_sync_client_recreates_sdk_owned_channel_with_original_transport_inputs(
382382
expected_channel_call,
383383
]
384384
assert client._channel is second_channel
385-
mock_timer.assert_called_once_with(30.0, first_channel.close)
385+
mock_timer.assert_called_once()
386+
timer_call = mock_timer.call_args
387+
assert timer_call.args[0] == 30.0
388+
assert timer_call.args[1].__self__ is client
389+
assert timer_call.args[1].__func__ is TaskHubGrpcClient._close_retired_channel
390+
assert timer_call.kwargs == {"args": (first_channel,)}
386391
assert timer.daemon is True
387392
timer.start.assert_called_once_with()
388393

389394

395+
def test_sync_client_close_closes_retired_channels_immediately():
396+
first_channel = MagicMock(name="first-channel")
397+
second_channel = MagicMock(name="second-channel")
398+
first_stub = MagicMock()
399+
first_stub.GetInstance.side_effect = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
400+
second_stub = MagicMock()
401+
second_stub.GetInstance.return_value = MagicMock(exists=False)
402+
close_timer = MagicMock(name="close-timer")
403+
404+
with patch(
405+
"durabletask.client.shared.get_grpc_channel",
406+
side_effect=[first_channel, second_channel],
407+
), patch(
408+
"durabletask.client.stubs.TaskHubSidecarServiceStub", side_effect=[first_stub, second_stub]
409+
), patch("threading.Timer", return_value=close_timer):
410+
client = TaskHubGrpcClient(
411+
resiliency_options=GrpcClientResiliencyOptions(
412+
channel_recreate_failure_threshold=1,
413+
min_recreate_interval_seconds=0.0,
414+
)
415+
)
416+
with pytest.raises(FakeRpcError):
417+
client.get_orchestration_state("abc")
418+
419+
client.close()
420+
421+
close_timer.cancel.assert_called_once_with()
422+
first_channel.close.assert_called_once_with()
423+
second_channel.close.assert_called_once_with()
424+
assert client._retired_channels == {}
425+
426+
427+
def test_sync_client_close_closes_all_retired_sdk_channels_immediately():
428+
first_channel = MagicMock(name="first-channel")
429+
second_channel = MagicMock(name="second-channel")
430+
third_channel = MagicMock(name="third-channel")
431+
first_stub = MagicMock()
432+
first_stub.GetInstance.side_effect = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
433+
second_stub = MagicMock()
434+
second_stub.GetInstance.side_effect = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
435+
third_stub = MagicMock()
436+
timer1 = MagicMock(name="close-timer-1")
437+
timer2 = MagicMock(name="close-timer-2")
438+
439+
with patch(
440+
"durabletask.client.shared.get_grpc_channel",
441+
side_effect=[first_channel, second_channel, third_channel],
442+
), patch(
443+
"durabletask.client.stubs.TaskHubSidecarServiceStub",
444+
side_effect=[first_stub, second_stub, third_stub],
445+
), patch("threading.Timer", side_effect=[timer1, timer2]):
446+
client = TaskHubGrpcClient(
447+
resiliency_options=GrpcClientResiliencyOptions(
448+
channel_recreate_failure_threshold=1,
449+
min_recreate_interval_seconds=0.0,
450+
)
451+
)
452+
with pytest.raises(FakeRpcError):
453+
client.get_orchestration_state("abc")
454+
with pytest.raises(FakeRpcError):
455+
client.get_orchestration_state("abc")
456+
457+
client.close()
458+
459+
timer1.cancel.assert_called_once_with()
460+
timer2.cancel.assert_called_once_with()
461+
first_channel.close.assert_called_once_with()
462+
second_channel.close.assert_called_once_with()
463+
third_channel.close.assert_called_once_with()
464+
assert client._retired_channels == {}
465+
466+
390467
@pytest.mark.parametrize(
391468
("stub_method_name", "client_method_name"),
392469
[
@@ -431,11 +508,13 @@ def test_sync_client_does_not_recreate_caller_owned_channel():
431508
client.get_orchestration_state("abc")
432509
with pytest.raises(FakeRpcError):
433510
client.get_orchestration_state("abc")
511+
client.close()
434512

435513
assert client._channel is provided_channel
436514
mock_get_channel.assert_not_called()
437515
mock_stub.assert_called_once_with(provided_channel)
438516
mock_timer.assert_not_called()
517+
provided_channel.close.assert_not_called()
439518

440519

441520
def test_sync_client_recreate_cooldown_prevents_immediate_repeated_recreation():
@@ -478,7 +557,6 @@ def test_sync_client_recreate_cooldown_prevents_immediate_repeated_recreation():
478557
client.get_orchestration_state("abc")
479558
assert client._channel is second_channel
480559
assert mock_get_channel.call_count == 2
481-
mock_timer.assert_called_once_with(30.0, first_channel.close)
482560

483561
with pytest.raises(FakeRpcError):
484562
client.get_orchestration_state("abc")
@@ -495,10 +573,16 @@ def test_sync_client_recreate_cooldown_prevents_immediate_repeated_recreation():
495573
expected_channel_call,
496574
expected_channel_call,
497575
]
498-
assert mock_timer.call_args_list == [
499-
call(30.0, first_channel.close),
500-
call(30.0, second_channel.close),
501-
]
576+
assert mock_timer.call_count == 2
577+
first_timer_call, second_timer_call = mock_timer.call_args_list
578+
assert first_timer_call.args[0] == 30.0
579+
assert first_timer_call.args[1].__self__ is client
580+
assert first_timer_call.args[1].__func__ is TaskHubGrpcClient._close_retired_channel
581+
assert first_timer_call.kwargs == {"args": (first_channel,)}
582+
assert second_timer_call.args[0] == 30.0
583+
assert second_timer_call.args[1].__self__ is client
584+
assert second_timer_call.args[1].__func__ is TaskHubGrpcClient._close_retired_channel
585+
assert second_timer_call.kwargs == {"args": (second_channel,)}
502586
assert timer1.daemon is True
503587
assert timer2.daemon is True
504588
timer1.start.assert_called_once_with()

0 commit comments

Comments
 (0)