Skip to content

Commit 1011682

Browse files
Add multiline logging interceptor sample
- Implements surgical solution for formatting multiline exception logs as single-line JSON - Only affects logging within Temporal SDK boundaries, doesn't interfere with global formatters - Captures exceptions in both activities and workflows using interceptor pattern - Formats exceptions as JSON with message, type, and traceback fields - Includes comprehensive sample with activities, workflows, worker, and starter - Follows established interceptor patterns from sentry sample - Solves Datadog multiline log parsing issues for Temporal exceptions Requested by: @deepika-awasthi Link to Devin run: https://app.devin.ai/sessions/37122594b322437b8e42cef51d4104d1 Co-Authored-By: deepika awasthi <deepika.awasthi@temporal.io>
1 parent 8597fdd commit 1011682

10 files changed

Lines changed: 286 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7272
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
7373
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
7474
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
75+
* [multiline_logging](multiline_logging) - Format multiline exception logs as single-line JSON within Temporal boundaries.
7576
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
7677
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
7778
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.

multiline_logging/README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Multiline Exception Logging Interceptor
2+
3+
This sample demonstrates how to handle multiline exception logs in Temporal workflows and activities by formatting them as single-line JSON. This solves the issue where multiline tracebacks span multiple log entries in log aggregation systems like Datadog.
4+
5+
## Problem
6+
7+
When exceptions with multiline tracebacks are raised in Temporal activities or workflows, they can create multiple separate log entries in log aggregation systems, making them difficult to parse and analyze.
8+
9+
## Solution
10+
11+
The `MultilineLoggingInterceptor` captures exceptions within Temporal boundaries and:
12+
1. Formats the exception details as a single-line JSON object
13+
2. Logs the formatted exception
14+
3. Re-raises the original exception to maintain normal error handling
15+
16+
The JSON format includes:
17+
- `message`: The exception message
18+
- `type`: The exception class name
19+
- `traceback`: The full traceback with newlines replaced by " | "
20+
21+
## Usage
22+
23+
```python
24+
from multiline_logging.interceptor import MultilineLoggingInterceptor
25+
26+
worker = Worker(
27+
client,
28+
task_queue="my-task-queue",
29+
workflows=[MyWorkflow],
30+
activities=[my_activity],
31+
interceptors=[MultilineLoggingInterceptor()],
32+
)
33+
```
34+
35+
## Running the Sample
36+
37+
1. Start Temporal server: `temporal server start-dev`
38+
2. Run the worker: `python multiline_logging/worker.py`
39+
3. In another terminal, run the starter: `python multiline_logging/starter.py`
40+
41+
You'll see the multiline exceptions formatted as single-line JSON in the worker logs.
42+
43+
## Key Benefits
44+
45+
- **Surgical**: Only affects logging within Temporal SDK boundaries
46+
- **Non-intrusive**: Doesn't interfere with existing log formatters
47+
- **Preserves behavior**: Original exceptions are still raised normally
48+
- **Structured**: JSON format is easy to parse and analyze

multiline_logging/__init__.py

Whitespace-only changes.

multiline_logging/activities.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from temporalio import activity
2+
3+
4+
@activity.defn
5+
async def failing_activity(should_fail: bool) -> str:
6+
if should_fail:
7+
try:
8+
raise ValueError("Inner exception with\nmultiple lines\nof text")
9+
except ValueError as inner:
10+
raise RuntimeError(
11+
"Outer exception that wraps\nanother exception with\nmultiline content"
12+
) from inner
13+
return "Success!"
14+
15+
16+
@activity.defn
17+
async def complex_failing_activity() -> str:
18+
"""Activity that creates a complex multiline traceback"""
19+
def nested_function():
20+
def deeply_nested():
21+
raise Exception("Deep exception with\nvery long\nmultiline\nerror message")
22+
deeply_nested()
23+
24+
nested_function()
25+
return "This won't be reached"

multiline_logging/interceptor.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import json
2+
import logging
3+
import traceback
4+
from typing import Any, Optional, Type
5+
6+
from temporalio import activity, workflow
7+
from temporalio.worker import (
8+
ActivityInboundInterceptor,
9+
ExecuteActivityInput,
10+
ExecuteWorkflowInput,
11+
Interceptor,
12+
WorkflowInboundInterceptor,
13+
WorkflowInterceptorClassInput,
14+
)
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
class _MultilineLoggingActivityInboundInterceptor(ActivityInboundInterceptor):
20+
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
21+
try:
22+
return await super().execute_activity(input)
23+
except Exception as e:
24+
exception_data = {
25+
"message": str(e),
26+
"type": type(e).__name__,
27+
"traceback": traceback.format_exc().replace("\n", " | ")
28+
}
29+
30+
logger.error(f"Activity exception: {json.dumps(exception_data)}")
31+
32+
raise e
33+
34+
35+
class _MultilineLoggingWorkflowInterceptor(WorkflowInboundInterceptor):
36+
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
37+
try:
38+
return await super().execute_workflow(input)
39+
except Exception as e:
40+
exception_data = {
41+
"message": str(e),
42+
"type": type(e).__name__,
43+
"traceback": traceback.format_exc().replace("\n", " | ")
44+
}
45+
46+
if not workflow.unsafe.is_replaying():
47+
with workflow.unsafe.sandbox_unrestricted():
48+
logger.error(f"Workflow exception: {json.dumps(exception_data)}")
49+
50+
raise e
51+
52+
53+
class MultilineLoggingInterceptor(Interceptor):
54+
"""Temporal Interceptor that formats multiline exception logs as single-line JSON"""
55+
56+
def intercept_activity(
57+
self, next: ActivityInboundInterceptor
58+
) -> ActivityInboundInterceptor:
59+
return _MultilineLoggingActivityInboundInterceptor(super().intercept_activity(next))
60+
61+
def workflow_interceptor_class(
62+
self, input: WorkflowInterceptorClassInput
63+
) -> Optional[Type[WorkflowInboundInterceptor]]:
64+
return _MultilineLoggingWorkflowInterceptor

multiline_logging/starter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
from temporalio.client import Client
3+
from multiline_logging.workflows import MultilineLoggingWorkflow
4+
5+
6+
async def main():
7+
client = await Client.connect("localhost:7233")
8+
9+
test_cases = [
10+
"activity_exception",
11+
"complex_activity_exception",
12+
"workflow_exception"
13+
]
14+
15+
for test_case in test_cases:
16+
print(f"\n--- Testing {test_case} ---")
17+
try:
18+
result = await client.execute_workflow(
19+
MultilineLoggingWorkflow.run,
20+
test_case,
21+
id=f"multiline-logging-{test_case}",
22+
task_queue="multiline-logging-task-queue"
23+
)
24+
print(f"Result: {result}")
25+
except Exception as e:
26+
print(f"Expected exception caught: {e}")
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())

multiline_logging/test_imports.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test that all imports work correctly
4+
"""
5+
6+
try:
7+
from interceptor import MultilineLoggingInterceptor
8+
print('✅ Interceptor imports successfully')
9+
10+
from activities import failing_activity, complex_failing_activity
11+
print('✅ Activities import successfully')
12+
13+
from workflows import MultilineLoggingWorkflow
14+
print('✅ Workflows import successfully')
15+
16+
print('✅ All imports work correctly!')
17+
18+
except ImportError as e:
19+
print(f'❌ Import error: {e}')
20+
exit(1)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Simple test to validate the multiline logging interceptor logic
4+
"""
5+
import json
6+
import traceback
7+
8+
9+
def test_multiline_formatting():
10+
"""Test that multiline exceptions are properly formatted as single-line JSON"""
11+
try:
12+
try:
13+
raise ValueError("Inner exception with\nmultiple lines\nof text")
14+
except ValueError as inner:
15+
raise RuntimeError(
16+
"Outer exception that wraps\nanother exception with\nmultiline content"
17+
) from inner
18+
except Exception as e:
19+
exception_data = {
20+
"message": str(e),
21+
"type": type(e).__name__,
22+
"traceback": traceback.format_exc().replace("\n", " | ")
23+
}
24+
25+
json_output = json.dumps(exception_data)
26+
27+
print("Original exception message:")
28+
print(repr(str(e)))
29+
print("\nFormatted JSON output:")
30+
print(json_output)
31+
print("\nValidation:")
32+
print(f"- Contains newlines in JSON: {'\\n' in json_output}")
33+
print(f"- JSON is valid: {json.loads(json_output) is not None}")
34+
print(f"- Single line: {json_output.count('\\n') == 0}")
35+
36+
return json_output
37+
38+
39+
if __name__ == "__main__":
40+
print("Testing multiline exception formatting...")
41+
result = test_multiline_formatting()
42+
print("\n✅ Test completed successfully!")

multiline_logging/worker.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import asyncio
2+
import logging
3+
from temporalio.client import Client
4+
from temporalio.worker import Worker
5+
6+
from multiline_logging.activities import failing_activity, complex_failing_activity
7+
from multiline_logging.interceptor import MultilineLoggingInterceptor
8+
from multiline_logging.workflows import MultilineLoggingWorkflow
9+
10+
logging.basicConfig(level=logging.INFO)
11+
12+
async def main():
13+
client = await Client.connect("localhost:7233")
14+
15+
worker = Worker(
16+
client,
17+
task_queue="multiline-logging-task-queue",
18+
workflows=[MultilineLoggingWorkflow],
19+
activities=[failing_activity, complex_failing_activity],
20+
interceptors=[MultilineLoggingInterceptor()],
21+
)
22+
23+
print("Worker started. Ctrl+C to exit.")
24+
await worker.run()
25+
26+
27+
if __name__ == "__main__":
28+
asyncio.run(main())

multiline_logging/workflows.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from datetime import timedelta
2+
from temporalio import workflow
3+
4+
with workflow.unsafe.imports_passed_through():
5+
from .activities import failing_activity, complex_failing_activity
6+
7+
8+
@workflow.defn
9+
class MultilineLoggingWorkflow:
10+
@workflow.run
11+
async def run(self, test_type: str) -> str:
12+
if test_type == "activity_exception":
13+
return await workflow.execute_activity(
14+
failing_activity,
15+
True,
16+
schedule_to_close_timeout=timedelta(seconds=5)
17+
)
18+
elif test_type == "complex_activity_exception":
19+
return await workflow.execute_activity(
20+
complex_failing_activity,
21+
schedule_to_close_timeout=timedelta(seconds=5)
22+
)
23+
elif test_type == "workflow_exception":
24+
raise RuntimeError(
25+
"Workflow exception with\nmultiple lines\nof error text"
26+
)
27+
else:
28+
return "No exception test"

0 commit comments

Comments
 (0)