Skip to content

Commit ccbff41

Browse files
andystaplesCopilotberndverst
authored
Add replay-safe logger support for orchestrations (#129)
* Add replay-safe logger for orchestrations Adds ReplaySafeLogger class and OrchestrationContext.create_replay_safe_logger() so orchestrators can log without generating duplicate messages during replay. The logger wraps a standard logging.Logger and suppresses all log calls when the orchestrator is replaying from history. All standard log levels are supported: debug, info, warning, error, critical, and exception. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Refactor ReplaySafeLogger to use LoggerAdapter, add tests and docs - Rewrite ReplaySafeLogger as a logging.LoggerAdapter subclass with a single isEnabledFor() override instead of manual method delegation - Add log() and isEnabledFor() methods with tests - Add replay-safe logging section to docs/features.md - Add usage example in examples/activity_sequence.py - Add cross-package compatibility section to copilot-instructions.md --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Bernd Verst <github@bernd.dev>
1 parent 45233b5 commit ccbff41

6 files changed

Lines changed: 306 additions & 0 deletions

File tree

.github/copilot-instructions.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,18 @@ python -m pytest
128128
- `examples/` — example orchestrations (see `examples/README.md`)
129129
- `tests/` — test suite
130130
- `dev-requirements.txt` — development dependencies
131+
132+
## Cross-Package Compatibility
133+
134+
The `durabletask-azuremanaged` package extends the core `durabletask`
135+
package (e.g. `DurableTaskSchedulerWorker` subclasses
136+
`TaskHubGrpcWorker`). When adding or changing features in
137+
`durabletask/`, always verify that `durabletask-azuremanaged` still
138+
works correctly:
139+
140+
- Check whether the azuremanaged worker, client, or tests override or
141+
depend on the code you changed.
142+
- Run the azuremanaged unit tests if they exist for the affected area.
143+
- If a new public API is added to the core SDK (e.g. a method on
144+
`OrchestrationContext`), confirm it is accessible through the
145+
azuremanaged package and add a test or example if appropriate.

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
99

1010
ADDED
1111

12+
- Added `ReplaySafeLogger` and `OrchestrationContext.create_replay_safe_logger()`
13+
for suppressing duplicate log messages during orchestrator replay
1214
- Added `GrpcChannelOptions` and `GrpcRetryPolicyOptions` for configuring
1315
gRPC transport behavior, including message-size limits, keepalive settings,
1416
and channel-level retry policy service configuration.

docs/features.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,49 @@ Orchestrations can specify retry policies for activities and sub-orchestrations.
225225
control how many times and how frequently an activity or sub-orchestration will be retried in the
226226
event of a transient error.
227227

228+
### Replay-safe logging
229+
230+
Orchestrator functions replay their history each time they are resumed,
231+
which can cause duplicate log messages. The `create_replay_safe_logger`
232+
method on `OrchestrationContext` returns a `ReplaySafeLogger` that wraps
233+
a standard `logging.Logger` and automatically suppresses output while
234+
the orchestrator is replaying. `ReplaySafeLogger` extends Python's
235+
`logging.LoggerAdapter`, which is the idiomatic way to add context or
236+
modify behavior on an existing logger.
237+
238+
```python
239+
import logging
240+
241+
from durabletask import task
242+
243+
logger = logging.getLogger("my_orchestrator")
244+
245+
def my_orchestrator(ctx: task.OrchestrationContext, payload):
246+
replay_logger = ctx.create_replay_safe_logger(logger)
247+
replay_logger.info("Starting orchestration %s", ctx.instance_id)
248+
result = yield ctx.call_activity(my_activity, input=payload)
249+
replay_logger.info("Activity returned: %s", result)
250+
return result
251+
```
252+
253+
> [!NOTE]
254+
> Unlike the .NET SDK, where `CreateReplaySafeLogger` accepts a
255+
> category name string and internally creates the logger via
256+
> `ILoggerFactory`, the Python SDK requires you to pass an existing
257+
> `logging.Logger` instance. This is because Python's
258+
> `logging.getLogger(name)` already serves as the global factory and
259+
> is the standard way to obtain loggers.
260+
261+
The replay-safe logger supports all standard log levels: `debug`,
262+
`info`, `warning`, `error`, `critical`, and `exception`, as well as
263+
the generic `log(level, msg)` method. It also exposes `isEnabledFor`
264+
which returns `False` during replay so callers can skip expensive
265+
message formatting.
266+
267+
> [!TIP]
268+
> Create the replay-safe logger once at the start of your orchestrator
269+
> and reuse it throughout the function.
270+
228271
### Large payload externalization
229272

230273
Orchestration inputs, outputs, and event data are transmitted through gRPC messages. When these

durabletask/task.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# See https://peps.python.org/pep-0563/
55
from __future__ import annotations
66

7+
import logging
78
import math
89
from abc import ABC, abstractmethod
910
from datetime import datetime, timedelta, timezone
@@ -279,6 +280,51 @@ def new_uuid(self) -> str:
279280
def _exit_critical_section(self) -> None:
280281
pass
281282

283+
def create_replay_safe_logger(self, logger: logging.Logger) -> ReplaySafeLogger:
284+
"""Create a replay-safe logger that suppresses log messages during orchestration replay.
285+
286+
The returned logger wraps the provided logger and only emits log messages when
287+
the orchestrator is not replaying. This prevents duplicate log messages from
288+
appearing as a side effect of orchestration replay.
289+
290+
Parameters
291+
----------
292+
logger : logging.Logger
293+
The underlying logger to wrap.
294+
295+
Returns
296+
-------
297+
ReplaySafeLogger
298+
A logger that only emits log messages when the orchestrator is not replaying.
299+
"""
300+
return ReplaySafeLogger(logger, lambda: self.is_replaying)
301+
302+
303+
class ReplaySafeLogger(logging.LoggerAdapter):
304+
"""A logger adapter that suppresses log messages during orchestration replay.
305+
306+
This class extends :class:`logging.LoggerAdapter` and only emits log
307+
messages when the orchestrator is *not* replaying. Use this to avoid
308+
duplicate log entries that would otherwise appear every time the
309+
orchestrator replays its history.
310+
311+
Obtain an instance by calling :meth:`OrchestrationContext.create_replay_safe_logger`.
312+
"""
313+
314+
def __init__(self, logger: logging.Logger, is_replaying: Callable[[], bool]) -> None:
315+
super().__init__(logger, {})
316+
self._is_replaying = is_replaying
317+
318+
def isEnabledFor(self, level: int) -> bool:
319+
"""Return whether logging is enabled for the given level.
320+
321+
Returns ``False`` while the orchestrator is replaying so that callers
322+
can skip expensive message formatting during replay.
323+
"""
324+
if self._is_replaying():
325+
return False
326+
return self.logger.isEnabledFor(level)
327+
282328

283329
class FailureDetails:
284330
def __init__(self, message: str, error_type: str, stack_trace: Optional[str]):

examples/activity_sequence.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""End-to-end sample that demonstrates how to configure an orchestrator
22
that calls an activity function in a sequence and prints the outputs."""
3+
import logging
34
import os
45

56
from azure.identity import DefaultAzureCredential
@@ -8,6 +9,8 @@
89
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
910
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
1011

12+
logger = logging.getLogger("activity_sequence")
13+
1114

1215
def hello(ctx: task.ActivityContext, name: str) -> str:
1316
"""Activity function that returns a greeting"""
@@ -16,10 +19,15 @@ def hello(ctx: task.ActivityContext, name: str) -> str:
1619

1720
def sequence(ctx: task.OrchestrationContext, _):
1821
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
22+
# Create a replay-safe logger to avoid duplicate log messages during replay
23+
replay_logger = ctx.create_replay_safe_logger(logger)
24+
25+
replay_logger.info("Starting activity sequence for instance %s", ctx.instance_id)
1926
# call "hello" activity function in a sequence
2027
result1 = yield ctx.call_activity(hello, input='Tokyo')
2128
result2 = yield ctx.call_activity(hello, input='Seattle')
2229
result3 = yield ctx.call_activity(hello, input='London')
30+
replay_logger.info("All activities completed")
2331

2432
# return an array of results
2533
return [result1, result2, result3]

tests/durabletask/test_orchestration_executor.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,198 @@ def orchestrator(ctx: task.OrchestrationContext, _):
15011501
assert complete_action.result.value == encoded_output
15021502

15031503

1504+
def test_replay_safe_logger_suppresses_during_replay():
1505+
"""Validates that the replay-safe logger suppresses log messages during replay."""
1506+
log_calls: list[str] = []
1507+
1508+
class _RecordingHandler(logging.Handler):
1509+
def emit(self, record: logging.LogRecord) -> None:
1510+
log_calls.append(record.getMessage())
1511+
1512+
handler = _RecordingHandler()
1513+
inner_logger = logging.getLogger("test_replay_safe_logger")
1514+
inner_logger.setLevel(logging.DEBUG)
1515+
original_propagate = inner_logger.propagate
1516+
inner_logger.propagate = False
1517+
inner_logger.addHandler(handler)
1518+
1519+
try:
1520+
activity_name = "say_hello"
1521+
1522+
def say_hello(_, name: str) -> str:
1523+
return f"Hello, {name}!"
1524+
1525+
def orchestrator(ctx: task.OrchestrationContext, _):
1526+
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1527+
replay_logger.info("Starting orchestration")
1528+
result = yield ctx.call_activity(say_hello, input="World")
1529+
replay_logger.info("Activity completed: %s", result)
1530+
return result
1531+
1532+
registry = worker._Registry()
1533+
activity_name = registry.add_activity(say_hello)
1534+
orchestrator_name = registry.add_orchestrator(orchestrator)
1535+
1536+
# First execution: starts the orchestration. The orchestrator runs without
1537+
# replay, emits the initial log message, and then schedules the activity.
1538+
new_events = [
1539+
helpers.new_orchestrator_started_event(datetime.now()),
1540+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1541+
]
1542+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1543+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1544+
assert result.actions # should have scheduled the activity
1545+
1546+
assert log_calls == ["Starting orchestration"]
1547+
log_calls.clear()
1548+
1549+
# Second execution: the orchestrator replays from history and then processes the
1550+
# activity completion. The "Starting orchestration" message is emitted during
1551+
# replay and should be suppressed; "Activity completed" is emitted after replay
1552+
# ends and should appear exactly once.
1553+
old_events = new_events + [
1554+
helpers.new_task_scheduled_event(1, activity_name),
1555+
]
1556+
encoded_output = json.dumps(say_hello(None, "World"))
1557+
new_events = [helpers.new_task_completed_event(1, encoded_output)]
1558+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1559+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1560+
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1561+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1562+
1563+
assert log_calls == ["Activity completed: Hello, World!"]
1564+
finally:
1565+
inner_logger.removeHandler(handler)
1566+
inner_logger.propagate = original_propagate
1567+
1568+
1569+
def test_replay_safe_logger_all_levels():
1570+
"""Validates that all log levels are suppressed during replay and emitted otherwise."""
1571+
log_levels: list[str] = []
1572+
1573+
class _LevelRecorder(logging.Handler):
1574+
def emit(self, record: logging.LogRecord) -> None:
1575+
log_levels.append(record.levelname)
1576+
1577+
handler = _LevelRecorder()
1578+
inner_logger = logging.getLogger("test_replay_safe_logger_levels")
1579+
inner_logger.setLevel(logging.DEBUG)
1580+
original_propagate = inner_logger.propagate
1581+
inner_logger.propagate = False
1582+
inner_logger.addHandler(handler)
1583+
1584+
try:
1585+
def orchestrator(ctx: task.OrchestrationContext, _):
1586+
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1587+
replay_logger.debug("debug msg")
1588+
replay_logger.info("info msg")
1589+
replay_logger.warning("warning msg")
1590+
replay_logger.error("error msg")
1591+
replay_logger.critical("critical msg")
1592+
return "done"
1593+
1594+
registry = worker._Registry()
1595+
orchestrator_name = registry.add_orchestrator(orchestrator)
1596+
1597+
new_events = [
1598+
helpers.new_orchestrator_started_event(datetime.now()),
1599+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1600+
]
1601+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1602+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1603+
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1604+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1605+
1606+
assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
1607+
finally:
1608+
inner_logger.removeHandler(handler)
1609+
inner_logger.propagate = original_propagate
1610+
1611+
1612+
def test_replay_safe_logger_direct():
1613+
"""Unit test for ReplaySafeLogger — verifies suppression based on is_replaying flag."""
1614+
log_calls: list[str] = []
1615+
1616+
class _RecordingHandler(logging.Handler):
1617+
def emit(self, record: logging.LogRecord) -> None:
1618+
log_calls.append(record.getMessage())
1619+
1620+
handler = _RecordingHandler()
1621+
inner_logger = logging.getLogger("test_replay_safe_logger_direct")
1622+
inner_logger.setLevel(logging.DEBUG)
1623+
original_propagate = inner_logger.propagate
1624+
inner_logger.propagate = False
1625+
inner_logger.addHandler(handler)
1626+
1627+
try:
1628+
replaying = True
1629+
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1630+
1631+
replay_logger.info("should be suppressed")
1632+
assert log_calls == []
1633+
1634+
replaying = False
1635+
replay_logger.info("should appear")
1636+
assert log_calls == ["should appear"]
1637+
finally:
1638+
inner_logger.removeHandler(handler)
1639+
inner_logger.propagate = original_propagate
1640+
1641+
1642+
def test_replay_safe_logger_log_method():
1643+
"""Validates the generic log() method respects the replay flag."""
1644+
log_calls: list[str] = []
1645+
1646+
class _RecordingHandler(logging.Handler):
1647+
def emit(self, record: logging.LogRecord) -> None:
1648+
log_calls.append(record.getMessage())
1649+
1650+
handler = _RecordingHandler()
1651+
inner_logger = logging.getLogger("test_replay_safe_logger_log_method")
1652+
inner_logger.setLevel(logging.DEBUG)
1653+
original_propagate = inner_logger.propagate
1654+
inner_logger.propagate = False
1655+
inner_logger.addHandler(handler)
1656+
1657+
try:
1658+
replaying = True
1659+
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1660+
1661+
replay_logger.log(logging.WARNING, "suppressed warning")
1662+
assert log_calls == []
1663+
1664+
replaying = False
1665+
replay_logger.log(logging.WARNING, "visible warning")
1666+
assert log_calls == ["visible warning"]
1667+
finally:
1668+
inner_logger.removeHandler(handler)
1669+
inner_logger.propagate = original_propagate
1670+
1671+
1672+
def test_replay_safe_logger_is_enabled_for():
1673+
"""Validates isEnabledFor returns False during replay."""
1674+
inner_logger = logging.getLogger("test_replay_safe_logger_enabled")
1675+
inner_logger.setLevel(logging.DEBUG)
1676+
1677+
replaying = True
1678+
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1679+
1680+
# During replay, isEnabledFor should always return False
1681+
assert replay_logger.isEnabledFor(logging.DEBUG) is False
1682+
assert replay_logger.isEnabledFor(logging.INFO) is False
1683+
assert replay_logger.isEnabledFor(logging.CRITICAL) is False
1684+
1685+
# After replay, delegates to the inner logger
1686+
replaying = False
1687+
assert replay_logger.isEnabledFor(logging.DEBUG) is True
1688+
assert replay_logger.isEnabledFor(logging.INFO) is True
1689+
1690+
# If a level is below the inner logger's level, should return False
1691+
inner_logger.setLevel(logging.WARNING)
1692+
assert replay_logger.isEnabledFor(logging.DEBUG) is False
1693+
assert replay_logger.isEnabledFor(logging.WARNING) is True
1694+
1695+
15041696
def test_when_any_with_retry():
15051697
"""Tests that a when_any pattern works correctly with retries"""
15061698
def dummy_activity(_, inp: str):

0 commit comments

Comments
 (0)