Skip to content

Commit 607f219

Browse files
GarrettBeattyclaude
andcommitted
Add RunInChildContextAsync
Adds child-context support to the .NET Durable Execution SDK. A child context is a logical sub-workflow with its own deterministic operation-ID space, persisted as a CONTEXT operation so subsequent invocations replay the cached value without re-executing the function. Public surface: - IDurableContext.RunInChildContextAsync<T> (reflection + AOT-safe ICheckpointSerializer<T> overloads, plus a void overload). - ChildContextConfig with SubType (observability label) and ErrorMapping (transform exceptions before they surface to the caller). - ChildContextException for failure surfacing. Used as a building block for upcoming WaitForCallbackAsync. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 097e5a1 commit 607f219

24 files changed

Lines changed: 1506 additions & 6 deletions
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for a child context.
5+
/// </summary>
6+
/// <remarks>
7+
/// A child context is a logical sub-workflow with its own deterministic
8+
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
9+
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
10+
/// (and overloads) to run code inside one.
11+
/// </remarks>
12+
public sealed class ChildContextConfig
13+
{
14+
/// <summary>
15+
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
16+
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
17+
/// </summary>
18+
public string? SubType { get; set; }
19+
20+
/// <summary>
21+
/// Optional function to transform exceptions thrown by the child context's
22+
/// user function before they surface to the caller. Useful for wrapping
23+
/// low-level errors into domain-specific exceptions.
24+
/// </summary>
25+
/// <remarks>
26+
/// Applied when the user function throws (the mapped exception propagates
27+
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
28+
/// <c>FAILED</c> child context (the constructed
29+
/// <see cref="ChildContextException"/> is mapped before being thrown).
30+
/// </remarks>
31+
public Func<Exception, Exception>? ErrorMapping { get; set; }
32+
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private Task<T> RunStep<T>(
7676

7777
var operationId = _idGenerator.NextId();
7878
var op = new StepOperation<T>(
79-
operationId, name, func, config, serializer, Logger,
79+
operationId, name, _idGenerator.ParentId, func, config, serializer, Logger,
8080
_state, _terminationManager, _durableExecutionArn, _batcher);
8181
return op.ExecuteAsync(cancellationToken);
8282
}
@@ -99,7 +99,58 @@ public Task WaitAsync(
9999
var operationId = _idGenerator.NextId();
100100
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
101101
var op = new WaitOperation(
102-
operationId, name, waitSeconds,
102+
operationId, name, _idGenerator.ParentId, waitSeconds,
103+
_state, _terminationManager, _durableExecutionArn, _batcher);
104+
return op.ExecuteAsync(cancellationToken);
105+
}
106+
107+
public Task<T> RunInChildContextAsync<T>(
108+
Func<IDurableContext, Task<T>> func,
109+
string? name = null,
110+
ChildContextConfig? config = null,
111+
CancellationToken cancellationToken = default)
112+
=> RunChildContext(func, name, config, cancellationToken);
113+
114+
public async Task RunInChildContextAsync(
115+
Func<IDurableContext, Task> func,
116+
string? name = null,
117+
ChildContextConfig? config = null,
118+
CancellationToken cancellationToken = default)
119+
{
120+
// Void child contexts don't carry a meaningful payload; the wrapper
121+
// returns null so the registered ILambdaSerializer is never asked to
122+
// serialize a real value.
123+
await RunChildContext<object?>(
124+
async (ctx) => { await func(ctx); return null; },
125+
name, config, cancellationToken);
126+
}
127+
128+
private Task<T> RunChildContext<T>(
129+
Func<IDurableContext, Task<T>> func,
130+
string? name,
131+
ChildContextConfig? config,
132+
CancellationToken cancellationToken)
133+
{
134+
var serializer = LambdaContext.Serializer
135+
?? throw new InvalidOperationException(
136+
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
137+
"In the class library programming model, register one with " +
138+
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
139+
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
140+
"In tests, set TestLambdaContext.Serializer.");
141+
142+
var operationId = _idGenerator.NextId();
143+
144+
// Capture this DurableContext's collaborators; the child shares state,
145+
// termination, batcher, ARN, and Lambda context — but uses a child
146+
// OperationIdGenerator so its operation IDs are deterministically
147+
// namespaced under the parent op ID.
148+
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
149+
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
150+
_durableExecutionArn, LambdaContext, _batcher);
151+
152+
var op = new ChildContextOperation<T>(
153+
operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory,
103154
_state, _terminationManager, _durableExecutionArn, _batcher);
104155
return op.ExecuteAsync(cancellationToken);
105156
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,31 @@ public StepInterruptedException(string message) : base(message) { }
6767
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
6868
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
6969
}
70+
71+
/// <summary>
72+
/// Thrown when a child context's user function fails. Surfaces from
73+
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
74+
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
75+
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
76+
/// domain-specific exception.
77+
/// </summary>
78+
public class ChildContextException : DurableExecutionException
79+
{
80+
/// <summary>
81+
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
82+
/// </summary>
83+
public string? SubType { get; init; }
84+
/// <summary>The fully-qualified type name of the original exception.</summary>
85+
public string? ErrorType { get; init; }
86+
/// <summary>Optional structured error data attached by the user.</summary>
87+
public string? ErrorData { get; init; }
88+
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
89+
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
90+
91+
/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
92+
public ChildContextException() { }
93+
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
94+
public ChildContextException(string message) : base(message) { }
95+
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
96+
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
97+
}

Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,47 @@ Task WaitAsync(
6060
TimeSpan duration,
6161
string? name = null,
6262
CancellationToken cancellationToken = default);
63+
64+
/// <summary>
65+
/// Run a user function inside a logical sub-workflow (a "child context").
66+
/// The child has its own deterministic operation-ID space; its result is
67+
/// checkpointed as a <c>CONTEXT</c> operation so subsequent invocations
68+
/// replay the cached value without re-executing the func.
69+
/// </summary>
70+
/// <remarks>
71+
/// Use child contexts to group related durable operations (e.g. a step plus
72+
/// a wait plus a step) into a single observability/error-handling boundary.
73+
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
74+
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
75+
/// domain-specific exception.
76+
/// The child context's return value is serialized to a checkpoint using the
77+
/// <see cref="ILambdaSerializer"/> registered on
78+
/// <see cref="ILambdaContext.Serializer"/>.
79+
/// </remarks>
80+
Task<T> RunInChildContextAsync<T>(
81+
Func<IDurableContext, Task<T>> func,
82+
string? name = null,
83+
ChildContextConfig? config = null,
84+
CancellationToken cancellationToken = default);
85+
86+
/// <summary>
87+
/// Run a user function inside a logical sub-workflow (a "child context")
88+
/// that returns no value. The child has its own deterministic operation-ID
89+
/// space and is checkpointed as a <c>CONTEXT</c> operation so subsequent
90+
/// invocations skip re-executing the func.
91+
/// </summary>
92+
/// <remarks>
93+
/// Use child contexts to group related durable operations (e.g. a step plus
94+
/// a wait plus a step) into a single observability/error-handling boundary.
95+
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
96+
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
97+
/// domain-specific exception.
98+
/// </remarks>
99+
Task RunInChildContextAsync(
100+
Func<IDurableContext, Task> func,
101+
string? name = null,
102+
ChildContextConfig? config = null,
103+
CancellationToken cancellationToken = default);
63104
}
64105

65106
/// <summary>
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
using System.IO;
2+
using System.Text;
3+
using Amazon.Lambda;
4+
using Amazon.Lambda.Core;
5+
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
6+
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
7+
8+
namespace Amazon.Lambda.DurableExecution.Internal;
9+
10+
/// <summary>
11+
/// Durable child context operation. Runs a user-supplied function inside a
12+
/// nested <see cref="DurableContext"/> with its own deterministic operation-ID
13+
/// space, persisting the function's result so subsequent invocations replay
14+
/// the cached value without re-executing.
15+
/// </summary>
16+
/// <remarks>
17+
/// Replay branches — example: <c>await ctx.RunInChildContextAsync(child =&gt; ..., name: "phase")</c>
18+
/// <list type="bullet">
19+
/// <item><b>Fresh</b>: no prior state → sync-flush CONTEXT START → run user
20+
/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL
21+
/// and throw <see cref="ChildContextException"/>.</item>
22+
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
23+
/// NOT re-executed.</item>
24+
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
25+
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
26+
/// set, the mapped exception is thrown instead.</item>
27+
/// <item><b>STARTED</b> / <b>PENDING</b>: re-run the user func without
28+
/// re-checkpointing START. The child's own operations recover from their
29+
/// own checkpoints, so this is replay propagation; if a wait/callback
30+
/// inside the child is still pending, the user func re-suspends.</item>
31+
/// </list>
32+
/// Unlike <see cref="StepOperation{T}"/>, child contexts have no retry strategy:
33+
/// failure is terminal and surfaces immediately via
34+
/// <see cref="ChildContextException"/>.
35+
/// </remarks>
36+
internal sealed class ChildContextOperation<T> : DurableOperation<T>
37+
{
38+
private readonly Func<IDurableContext, Task<T>> _func;
39+
private readonly ChildContextConfig? _config;
40+
private readonly ILambdaSerializer _serializer;
41+
private readonly Func<string, IDurableContext> _childContextFactory;
42+
43+
public ChildContextOperation(
44+
string operationId,
45+
string? name,
46+
string? parentId,
47+
Func<IDurableContext, Task<T>> func,
48+
ChildContextConfig? config,
49+
ILambdaSerializer serializer,
50+
Func<string, IDurableContext> childContextFactory,
51+
ExecutionState state,
52+
TerminationManager termination,
53+
string durableExecutionArn,
54+
CheckpointBatcher? batcher = null)
55+
: base(operationId, name, parentId, state, termination, durableExecutionArn, batcher)
56+
{
57+
_func = func;
58+
_config = config;
59+
_serializer = serializer;
60+
_childContextFactory = childContextFactory;
61+
}
62+
63+
protected override string OperationType => OperationTypes.Context;
64+
65+
protected override async Task<T> StartAsync(CancellationToken cancellationToken)
66+
{
67+
// Sync-flush CONTEXT START before user code so the service has a record
68+
// of the parent context if the inner func suspends (e.g. a Wait inside
69+
// the child terminates the workflow before SUCCEED is reached).
70+
await EnqueueAsync(new SdkOperationUpdate
71+
{
72+
Id = OperationId,
73+
ParentId = ParentId,
74+
Type = OperationTypes.Context,
75+
Action = OperationAction.START,
76+
SubType = _config?.SubType,
77+
Name = Name
78+
}, cancellationToken);
79+
80+
return await ExecuteFunc(cancellationToken);
81+
}
82+
83+
protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
84+
{
85+
switch (existing.Status)
86+
{
87+
case OperationStatuses.Succeeded:
88+
// Side-effecting code runs at most once: replay returns the
89+
// cached result without invoking the user func.
90+
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));
91+
92+
case OperationStatuses.Failed:
93+
throw MapFailureException(BuildChildContextException(existing));
94+
95+
case OperationStatuses.Started:
96+
case OperationStatuses.Pending:
97+
// Re-run the user func: the child's own operations replay from
98+
// their own checkpoints. Do NOT re-checkpoint START — the
99+
// original is still authoritative. If something inside the
100+
// child is still pending (Wait, callback, retry) the user func
101+
// will re-suspend on its own.
102+
return ExecuteFunc(cancellationToken);
103+
104+
default:
105+
throw new NonDeterministicExecutionException(
106+
$"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
107+
}
108+
}
109+
110+
private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
111+
{
112+
cancellationToken.ThrowIfCancellationRequested();
113+
114+
var childContext = _childContextFactory(OperationId);
115+
116+
T result;
117+
try
118+
{
119+
result = await _func(childContext);
120+
}
121+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
122+
{
123+
throw;
124+
}
125+
catch (Exception ex)
126+
{
127+
await EnqueueAsync(new SdkOperationUpdate
128+
{
129+
Id = OperationId,
130+
ParentId = ParentId,
131+
Type = OperationTypes.Context,
132+
Action = OperationAction.FAIL,
133+
SubType = _config?.SubType,
134+
Name = Name,
135+
Error = ToSdkError(ex)
136+
}, cancellationToken);
137+
138+
throw MapFailureException(new ChildContextException(ex.Message, ex)
139+
{
140+
SubType = _config?.SubType,
141+
ErrorType = ex.GetType().FullName
142+
});
143+
}
144+
145+
await EnqueueAsync(new SdkOperationUpdate
146+
{
147+
Id = OperationId,
148+
ParentId = ParentId,
149+
Type = OperationTypes.Context,
150+
Action = OperationAction.SUCCEED,
151+
SubType = _config?.SubType,
152+
Name = Name,
153+
Payload = SerializeResult(result)
154+
}, cancellationToken);
155+
156+
return result;
157+
}
158+
159+
private Exception MapFailureException(ChildContextException ex)
160+
{
161+
var mapper = _config?.ErrorMapping;
162+
if (mapper == null) return ex;
163+
164+
var mapped = mapper(ex);
165+
return mapped ?? ex;
166+
}
167+
168+
private ChildContextException BuildChildContextException(Operation failedOp)
169+
{
170+
var err = failedOp.ContextDetails?.Error;
171+
return new ChildContextException(err?.ErrorMessage ?? "Child context failed")
172+
{
173+
SubType = failedOp.SubType ?? _config?.SubType,
174+
ErrorType = err?.ErrorType,
175+
ErrorData = err?.ErrorData,
176+
OriginalStackTrace = err?.StackTrace
177+
};
178+
}
179+
180+
private T DeserializeResult(string? serialized)
181+
{
182+
if (serialized == null) return default!;
183+
var bytes = Encoding.UTF8.GetBytes(serialized);
184+
using var ms = new MemoryStream(bytes);
185+
return _serializer.Deserialize<T>(ms);
186+
}
187+
188+
private string SerializeResult(T value)
189+
{
190+
using var ms = new MemoryStream();
191+
_serializer.Serialize(value, ms);
192+
return Encoding.UTF8.GetString(ms.ToArray());
193+
}
194+
195+
private static SdkErrorObject ToSdkError(Exception ex) => new()
196+
{
197+
ErrorType = ex.GetType().FullName,
198+
ErrorMessage = ex.Message,
199+
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
200+
};
201+
}

0 commit comments

Comments
 (0)