Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,086 changes: 1,543 additions & 1,543 deletions docs/utilities/batch-processing.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AWS.Lambda.Powertools.BatchProcessing;
using AWS.Lambda.Powertools.Logging;
using HelloWorld.Data;

namespace HelloWorld.DynamoDb;

/// <summary>
/// Data model representing the DynamoDB stream record structure.
/// </summary>
public class DynamoDbProductStreamRecord
{
public string? EventName { get; set; }
public DynamoDbNewImage? NewImage { get; set; }
public string? SequenceNumber { get; set; }
}

public class DynamoDbNewImage
{
public DynamoDbAttributeValue? Product { get; set; }
}

public class DynamoDbAttributeValue
{
public string? S { get; set; }
}

/// <summary>
/// Typed record handler for DynamoDB stream records.
/// Receives the deserialized DynamoDB stream record and extracts the Product from NewImage.
/// </summary>
public class TypedDynamoDbProductHandler : ITypedRecordHandler<DynamoDbProductStreamRecord>
{
public Task<RecordHandlerResult> HandleAsync(DynamoDbProductStreamRecord record, CancellationToken cancellationToken)
{
/*
* Your business logic with strongly-typed data.
* The batch processor automatically deserializes the DynamoDB stream record.
* We then extract and deserialize the Product from the NewImage attribute.
*/
Logger.LogInformation($"[Typed] Handling DynamoDB record with event: {record.EventName}");

// Extract the Product JSON from the NewImage attribute
var productJson = record.NewImage?.Product?.S;
if (string.IsNullOrEmpty(productJson))
{
throw new ArgumentException("Product attribute is missing or empty");
}

// Check for failure marker
if (productJson == "failure")
{
throw new ArgumentException("Error on failure product");
}

var product = JsonSerializer.Deserialize<Product>(productJson);

Logger.LogInformation($"[Typed] Handling product with id: {product!.Id}, name: {product.Name}");

if (product.Id == 4)
{
throw new ArgumentException("Error on id 4");
}

return Task.FromResult(RecordHandlerResult.None);
}
}
76 changes: 74 additions & 2 deletions examples/BatchProcessing/src/HelloWorld/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using AWS.Lambda.Powertools.BatchProcessing.Kinesis;
using AWS.Lambda.Powertools.BatchProcessing.Sqs;
using AWS.Lambda.Powertools.Logging;
using HelloWorld.Data;
using HelloWorld.DynamoDb;
using HelloWorld.Kinesis;
using HelloWorld.Sqs;
Expand Down Expand Up @@ -77,8 +78,8 @@ public BatchItemFailuresResponse SqsHandlerUsingAttributeWithErrorPolicy(SQSEven
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomRecordHandlerProvider(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
}

[Logging(LogEvent = true)]
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))]
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessor(SQSEvent _)
Expand Down Expand Up @@ -114,4 +115,75 @@ public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(SQSEvent
}

#endregion

#region Typed Batch Processing Handlers

/// <summary>
/// Example handler using typed batch processing for SQS.
/// The TypedSqsBatchProcessor automatically deserializes the message body to the specified type.
/// </summary>
[Logging(LogEvent = true)]
public async Task<BatchItemFailuresResponse> TypedSqsHandler(SQSEvent sqsEvent)
{
var handler = new TypedSqsProductHandler();
var result = await TypedSqsBatchProcessor.TypedInstance.ProcessAsync<Product>(sqsEvent, handler);
return result.BatchItemFailuresResponse;
}

/// <summary>
/// Example handler using typed batch processing for Kinesis.
/// The TypedKinesisEventBatchProcessor automatically deserializes the record data to the specified type.
/// </summary>
[Logging(LogEvent = true)]
public async Task<BatchItemFailuresResponse> TypedKinesisHandler(KinesisEvent kinesisEvent)
{
var handler = new TypedKinesisProductHandler();
var result = await TypedKinesisEventBatchProcessor.TypedInstance.ProcessAsync<Product>(kinesisEvent, handler);
return result.BatchItemFailuresResponse;
}

/// <summary>
/// Example handler using typed batch processing for DynamoDB Streams.
/// The TypedDynamoDbStreamBatchProcessor automatically deserializes the stream record to the specified type.
/// </summary>
[Logging(LogEvent = true)]
public async Task<BatchItemFailuresResponse> TypedDynamoDbHandler(DynamoDBEvent dynamoDbEvent)
{
var handler = new TypedDynamoDbProductHandler();
var result = await TypedDynamoDbStreamBatchProcessor.TypedInstance.ProcessAsync<DynamoDbProductStreamRecord>(dynamoDbEvent, handler);
return result.BatchItemFailuresResponse;
}

/// <summary>
/// Example handler using typed batch processing with inline handler.
/// Demonstrates using a lambda expression for simple processing logic.
/// </summary>
[Logging(LogEvent = true)]
public async Task<BatchItemFailuresResponse> TypedSqsHandlerInline(SQSEvent sqsEvent)
{
var result = await TypedSqsBatchProcessor.TypedInstance.ProcessAsync<Product>(
sqsEvent,
new InlineTypedProductHandler());
return result.BatchItemFailuresResponse;
}

#endregion
}

/// <summary>
/// Simple inline typed handler for demonstration purposes.
/// </summary>
public class InlineTypedProductHandler : ITypedRecordHandler<Product>
{
public Task<RecordHandlerResult> HandleAsync(Product product, System.Threading.CancellationToken cancellationToken)
{
Logger.LogInformation($"[Inline Typed] Processing product: {product.Id} - {product.Name}");

if (product.Id == 4)
{
throw new System.ArgumentException("Error on id 4");
}

return Task.FromResult(RecordHandlerResult.None);
}
}
7 changes: 5 additions & 2 deletions examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="2.7.1" />
<PackageReference Include="Amazon.Lambda.Core" Version="2.8.0" />
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4" />
<PackageReference Include="AWS.Lambda.Powertools.BatchProcessing" Version="3.0.0" />
<!-- <PackageReference Include="AWS.Lambda.Powertools.BatchProcessing" Version="3.0.0" />-->
<PackageReference Include="AWS.Lambda.Powertools.Logging" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\libraries\src\AWS.Lambda.Powertools.BatchProcessing\AWS.Lambda.Powertools.BatchProcessing.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using AWS.Lambda.Powertools.BatchProcessing;
using AWS.Lambda.Powertools.Logging;
using HelloWorld.Data;

namespace HelloWorld.Kinesis;

/// <summary>
/// Typed record handler for Kinesis records that automatically deserializes to Product.
/// No manual JSON deserialization needed - the batch processor handles it.
/// </summary>
public class TypedKinesisProductHandler : ITypedRecordHandler<Product>
{
public Task<RecordHandlerResult> HandleAsync(Product product, CancellationToken cancellationToken)
{
/*
* Your business logic with strongly-typed data.
* The batch processor automatically deserializes the Kinesis record data to Product.
* If an exception is thrown, the item will be marked as a partial batch item failure.
*/
Logger.LogInformation($"[Typed] Handling product with id: {product.Id}, name: {product.Name}");

if (product.Id == 4)
{
throw new ArgumentException("Error on id 4");
}

return Task.FromResult(RecordHandlerResult.None);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using AWS.Lambda.Powertools.BatchProcessing;
using AWS.Lambda.Powertools.Logging;
using HelloWorld.Data;

namespace HelloWorld.Sqs;

/// <summary>
/// Typed record handler for SQS messages that automatically deserializes to Product.
/// No manual JSON deserialization needed - the batch processor handles it.
/// </summary>
public class TypedSqsProductHandler : ITypedRecordHandler<Product>
{
public Task<RecordHandlerResult> HandleAsync(Product product, CancellationToken cancellationToken)
{
/*
* Your business logic with strongly-typed data.
* The batch processor automatically deserializes the SQS message body to Product.
* If an exception is thrown, the item will be marked as a partial batch item failure.
*/
Logger.LogInformation($"[Typed] Handling product with id: {product.Id}, name: {product.Name}");

if (product.Id == 4)
{
throw new ArgumentException("Error on id 4");
}

return Task.FromResult(RecordHandlerResult.None);
}
}
Loading
Loading