Skip to content

Commit c097fb4

Browse files
authored
Catch and log transform errors in the GrpcStreamBroadcaster (#203)
2 parents abc5bc9 + c1029fa commit c097fb4

File tree

4 files changed

+96
-2
lines changed

4 files changed

+96
-2
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
- This fixes a bug in the `GrpcStreamBroadcaster` that was causing exceptions in the transform method to be silenced.

src/frequenz/client/base/retry.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from copy import deepcopy
1010
from typing import Self
1111

12+
from typing_extensions import override
13+
1214
DEFAULT_RETRY_INTERVAL = 3.0
1315
"""Default retry interval, in seconds."""
1416

@@ -98,6 +100,7 @@ def __init__(
98100

99101
self._count = 0
100102

103+
@override
101104
def next_interval(self) -> float | None:
102105
"""Return the time to wait before the next retry.
103106
@@ -153,6 +156,7 @@ def __init__( # pylint: disable=too-many-arguments
153156

154157
self._count = 0
155158

159+
@override
156160
def next_interval(self) -> float | None:
157161
"""Return the time to wait before the next retry.
158162

src/frequenz/client/base/streaming.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,17 @@ async def _run(self) -> None:
291291

292292
async for msg in call:
293293
first_message_received = True
294-
await data_sender.send(self._transform(msg))
294+
try:
295+
transformed = self._transform(msg)
296+
except Exception: # pylint: disable=broad-exception-caught
297+
_logger.exception(
298+
"%s: error transforming message: %s",
299+
self._stream_name,
300+
msg,
301+
)
302+
continue
303+
304+
await data_sender.send(transformed)
295305

296306
except grpc.aio.AioRpcError as err:
297307
error = err

tests/streaming/test_grpc_stream_broadcaster.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,86 @@ async def test_streaming_error( # pylint: disable=too-many-arguments
293293
]
294294

295295

296+
async def test_streaming_transform_error( # pylint: disable=too-many-arguments
297+
no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name
298+
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
299+
caplog: pytest.LogCaptureFixture,
300+
) -> None:
301+
"""Test streaming transform errors."""
302+
caplog.set_level(logging.INFO)
303+
304+
def transform_err(x: int) -> str:
305+
"""Mock transform with err for odd values."""
306+
if x % 2 == 1:
307+
raise ValueError("No, you transform.")
308+
return f"transformed_{x}"
309+
310+
async def asynciter() -> AsyncIterator[int]:
311+
"""Mock async iterator."""
312+
await receiver_ready_event.wait()
313+
for i in range(5):
314+
yield i
315+
await asyncio.sleep(0) # Yield control to the event loop
316+
317+
rpc_mock = mock.MagicMock(
318+
name="ok_helper_method",
319+
side_effect=lambda: unary_stream_call_mock(
320+
"ok_helper_unary_stream_call", asynciter
321+
),
322+
)
323+
324+
helper = streaming.GrpcStreamBroadcaster(
325+
stream_name="test_helper",
326+
stream_method=rpc_mock,
327+
transform=transform_err,
328+
retry_strategy=no_retry,
329+
)
330+
331+
items: list[str] = []
332+
async with AsyncExitStack() as stack:
333+
stack.push_async_callback(helper.stop)
334+
335+
receiver = helper.new_receiver()
336+
receiver_ready_event.set()
337+
items, _ = await _split_message(receiver)
338+
339+
assert items == [
340+
"transformed_0",
341+
"transformed_2",
342+
"transformed_4",
343+
]
344+
345+
assert caplog.record_tuples == [
346+
(
347+
"frequenz.client.base.streaming",
348+
logging.INFO,
349+
"test_helper: starting to stream",
350+
),
351+
# LogCaptureFixture can't capture tracebacks, so only the error message
352+
# is checked.
353+
(
354+
"frequenz.client.base.streaming",
355+
logging.ERROR,
356+
"test_helper: error transforming message: 1",
357+
),
358+
(
359+
"frequenz.client.base.streaming",
360+
logging.ERROR,
361+
"test_helper: error transforming message: 3",
362+
),
363+
(
364+
"frequenz.client.base.streaming",
365+
logging.INFO,
366+
"test_helper: connection closed, stream exhausted",
367+
),
368+
(
369+
"frequenz.client.base.streaming",
370+
logging.INFO,
371+
"test_helper: stopping the stream",
372+
),
373+
]
374+
375+
296376
@pytest.mark.parametrize("include_events", [True, False])
297377
async def test_retry_next_interval_zero( # pylint: disable=too-many-arguments
298378
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name

0 commit comments

Comments
 (0)