Skip to content

Commit 1a1a3fa

Browse files
authored
feat: batch typed decorator (#1034)
1 parent c2df8fb commit 1a1a3fa

23 files changed

+3770
-1665
lines changed

docs/utilities/batch-processing.md

Lines changed: 1543 additions & 1543 deletions
Large diffs are not rendered by default.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Text.Json;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
using AWS.Lambda.Powertools.BatchProcessing;
21+
using AWS.Lambda.Powertools.Logging;
22+
using HelloWorld.Data;
23+
24+
namespace HelloWorld.DynamoDb;
25+
26+
/// <summary>
27+
/// Data model representing the DynamoDB stream record structure.
28+
/// </summary>
29+
public class DynamoDbProductStreamRecord
30+
{
31+
public string? EventName { get; set; }
32+
public DynamoDbNewImage? NewImage { get; set; }
33+
public string? SequenceNumber { get; set; }
34+
}
35+
36+
public class DynamoDbNewImage
37+
{
38+
public DynamoDbAttributeValue? Product { get; set; }
39+
}
40+
41+
public class DynamoDbAttributeValue
42+
{
43+
public string? S { get; set; }
44+
}
45+
46+
/// <summary>
47+
/// Typed record handler for DynamoDB stream records.
48+
/// Receives the deserialized DynamoDB stream record and extracts the Product from NewImage.
49+
/// </summary>
50+
public class TypedDynamoDbProductHandler : ITypedRecordHandler<DynamoDbProductStreamRecord>
51+
{
52+
public Task<RecordHandlerResult> HandleAsync(DynamoDbProductStreamRecord record, CancellationToken cancellationToken)
53+
{
54+
/*
55+
* Your business logic with strongly-typed data.
56+
* The batch processor automatically deserializes the DynamoDB stream record.
57+
* We then extract and deserialize the Product from the NewImage attribute.
58+
*/
59+
Logger.LogInformation($"[Typed] Handling DynamoDB record with event: {record.EventName}");
60+
61+
// Extract the Product JSON from the NewImage attribute
62+
var productJson = record.NewImage?.Product?.S;
63+
if (string.IsNullOrEmpty(productJson))
64+
{
65+
throw new ArgumentException("Product attribute is missing or empty");
66+
}
67+
68+
// Check for failure marker
69+
if (productJson == "failure")
70+
{
71+
throw new ArgumentException("Error on failure product");
72+
}
73+
74+
var product = JsonSerializer.Deserialize<Product>(productJson);
75+
76+
Logger.LogInformation($"[Typed] Handling product with id: {product!.Id}, name: {product.Name}");
77+
78+
if (product.Id == 4)
79+
{
80+
throw new ArgumentException("Error on id 4");
81+
}
82+
83+
return Task.FromResult(RecordHandlerResult.None);
84+
}
85+
}

examples/BatchProcessing/src/HelloWorld/Function.cs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
using AWS.Lambda.Powertools.BatchProcessing.Kinesis;
2525
using AWS.Lambda.Powertools.BatchProcessing.Sqs;
2626
using AWS.Lambda.Powertools.Logging;
27+
using HelloWorld.Data;
2728
using HelloWorld.DynamoDb;
2829
using HelloWorld.Kinesis;
2930
using HelloWorld.Sqs;
@@ -77,8 +78,8 @@ public BatchItemFailuresResponse SqsHandlerUsingAttributeWithErrorPolicy(SQSEven
7778
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomRecordHandlerProvider(SQSEvent _)
7879
{
7980
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
80-
}
81-
81+
}
82+
8283
[Logging(LogEvent = true)]
8384
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))]
8485
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessor(SQSEvent _)
@@ -114,4 +115,75 @@ public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(SQSEvent
114115
}
115116

116117
#endregion
118+
119+
#region Typed Batch Processing Handlers
120+
121+
/// <summary>
122+
/// Example handler using typed batch processing for SQS.
123+
/// The TypedSqsBatchProcessor automatically deserializes the message body to the specified type.
124+
/// </summary>
125+
[Logging(LogEvent = true)]
126+
public async Task<BatchItemFailuresResponse> TypedSqsHandler(SQSEvent sqsEvent)
127+
{
128+
var handler = new TypedSqsProductHandler();
129+
var result = await TypedSqsBatchProcessor.TypedInstance.ProcessAsync<Product>(sqsEvent, handler);
130+
return result.BatchItemFailuresResponse;
131+
}
132+
133+
/// <summary>
134+
/// Example handler using typed batch processing for Kinesis.
135+
/// The TypedKinesisEventBatchProcessor automatically deserializes the record data to the specified type.
136+
/// </summary>
137+
[Logging(LogEvent = true)]
138+
public async Task<BatchItemFailuresResponse> TypedKinesisHandler(KinesisEvent kinesisEvent)
139+
{
140+
var handler = new TypedKinesisProductHandler();
141+
var result = await TypedKinesisEventBatchProcessor.TypedInstance.ProcessAsync<Product>(kinesisEvent, handler);
142+
return result.BatchItemFailuresResponse;
143+
}
144+
145+
/// <summary>
146+
/// Example handler using typed batch processing for DynamoDB Streams.
147+
/// The TypedDynamoDbStreamBatchProcessor automatically deserializes the stream record to the specified type.
148+
/// </summary>
149+
[Logging(LogEvent = true)]
150+
public async Task<BatchItemFailuresResponse> TypedDynamoDbHandler(DynamoDBEvent dynamoDbEvent)
151+
{
152+
var handler = new TypedDynamoDbProductHandler();
153+
var result = await TypedDynamoDbStreamBatchProcessor.TypedInstance.ProcessAsync<DynamoDbProductStreamRecord>(dynamoDbEvent, handler);
154+
return result.BatchItemFailuresResponse;
155+
}
156+
157+
/// <summary>
158+
/// Example handler using typed batch processing with inline handler.
159+
/// Demonstrates using a lambda expression for simple processing logic.
160+
/// </summary>
161+
[Logging(LogEvent = true)]
162+
public async Task<BatchItemFailuresResponse> TypedSqsHandlerInline(SQSEvent sqsEvent)
163+
{
164+
var result = await TypedSqsBatchProcessor.TypedInstance.ProcessAsync<Product>(
165+
sqsEvent,
166+
new InlineTypedProductHandler());
167+
return result.BatchItemFailuresResponse;
168+
}
169+
170+
#endregion
171+
}
172+
173+
/// <summary>
174+
/// Simple inline typed handler for demonstration purposes.
175+
/// </summary>
176+
public class InlineTypedProductHandler : ITypedRecordHandler<Product>
177+
{
178+
public Task<RecordHandlerResult> HandleAsync(Product product, System.Threading.CancellationToken cancellationToken)
179+
{
180+
Logger.LogInformation($"[Inline Typed] Processing product: {product.Id} - {product.Name}");
181+
182+
if (product.Id == 4)
183+
{
184+
throw new System.ArgumentException("Error on id 4");
185+
}
186+
187+
return Task.FromResult(RecordHandlerResult.None);
188+
}
117189
}

examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
<Nullable>enable</Nullable>
66
</PropertyGroup>
77
<ItemGroup>
8-
<PackageReference Include="Amazon.Lambda.Core" Version="2.7.1" />
8+
<PackageReference Include="Amazon.Lambda.Core" Version="2.8.0" />
99
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4" />
10-
<PackageReference Include="AWS.Lambda.Powertools.BatchProcessing" Version="3.0.0" />
10+
<!-- <PackageReference Include="AWS.Lambda.Powertools.BatchProcessing" Version="3.0.0" />-->
1111
<PackageReference Include="AWS.Lambda.Powertools.Logging" Version="3.0.0" />
1212
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
1313
</ItemGroup>
14+
<ItemGroup>
15+
<ProjectReference Include="..\..\..\..\libraries\src\AWS.Lambda.Powertools.BatchProcessing\AWS.Lambda.Powertools.BatchProcessing.csproj" />
16+
</ItemGroup>
1417
</Project>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
using AWS.Lambda.Powertools.BatchProcessing;
20+
using AWS.Lambda.Powertools.Logging;
21+
using HelloWorld.Data;
22+
23+
namespace HelloWorld.Kinesis;
24+
25+
/// <summary>
26+
/// Typed record handler for Kinesis records that automatically deserializes to Product.
27+
/// No manual JSON deserialization needed - the batch processor handles it.
28+
/// </summary>
29+
public class TypedKinesisProductHandler : ITypedRecordHandler<Product>
30+
{
31+
public Task<RecordHandlerResult> HandleAsync(Product product, CancellationToken cancellationToken)
32+
{
33+
/*
34+
* Your business logic with strongly-typed data.
35+
* The batch processor automatically deserializes the Kinesis record data to Product.
36+
* If an exception is thrown, the item will be marked as a partial batch item failure.
37+
*/
38+
Logger.LogInformation($"[Typed] Handling product with id: {product.Id}, name: {product.Name}");
39+
40+
if (product.Id == 4)
41+
{
42+
throw new ArgumentException("Error on id 4");
43+
}
44+
45+
return Task.FromResult(RecordHandlerResult.None);
46+
}
47+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
using AWS.Lambda.Powertools.BatchProcessing;
20+
using AWS.Lambda.Powertools.Logging;
21+
using HelloWorld.Data;
22+
23+
namespace HelloWorld.Sqs;
24+
25+
/// <summary>
26+
/// Typed record handler for SQS messages that automatically deserializes to Product.
27+
/// No manual JSON deserialization needed - the batch processor handles it.
28+
/// </summary>
29+
public class TypedSqsProductHandler : ITypedRecordHandler<Product>
30+
{
31+
public Task<RecordHandlerResult> HandleAsync(Product product, CancellationToken cancellationToken)
32+
{
33+
/*
34+
* Your business logic with strongly-typed data.
35+
* The batch processor automatically deserializes the SQS message body to Product.
36+
* If an exception is thrown, the item will be marked as a partial batch item failure.
37+
*/
38+
Logger.LogInformation($"[Typed] Handling product with id: {product.Id}, name: {product.Name}");
39+
40+
if (product.Id == 4)
41+
{
42+
throw new ArgumentException("Error on id 4");
43+
}
44+
45+
return Task.FromResult(RecordHandlerResult.None);
46+
}
47+
}

0 commit comments

Comments
 (0)