Skip to content

Commit e9661cd

Browse files
committed
feat(batch): add Kafka/MSK batch processing support
1 parent 9dca23c commit e9661cd

File tree

10 files changed

+732
-24
lines changed

10 files changed

+732
-24
lines changed

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
2626
DynamoDBRecord,
2727
)
28+
from aws_lambda_powertools.utilities.data_classes.kafka_event import (
29+
KafkaEventRecord,
30+
)
2831
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
2932
KinesisStreamRecord,
3033
)
@@ -46,12 +49,13 @@ class EventType(Enum):
4649
SQS = "SQS"
4750
KinesisDataStreams = "KinesisDataStreams"
4851
DynamoDBStreams = "DynamoDBStreams"
52+
Kafka = "Kafka"
4953

5054

5155
# When using processor with default arguments, records will carry EventSourceDataClassTypes
5256
# and depending on what EventType it's passed it'll correctly map to the right record
53-
# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB and Kinesis
54-
EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]
57+
# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB, Kinesis and Kafka
58+
EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord, KafkaEventRecord]
5559
BatchEventTypes = Union[EventSourceDataClassTypes, BatchTypeModels]
5660
SuccessResponse = Tuple[str, Any, BatchEventTypes]
5761
FailureResponse = Tuple[str, str, BatchEventTypes]
@@ -272,11 +276,13 @@ def __init__(
272276
EventType.SQS: self._collect_sqs_failures,
273277
EventType.KinesisDataStreams: self._collect_kinesis_failures,
274278
EventType.DynamoDBStreams: self._collect_dynamodb_failures,
279+
EventType.Kafka: self._collect_kafka_failures,
275280
}
276281
self._DATA_CLASS_MAPPING = {
277282
EventType.SQS: SQSRecord,
278283
EventType.KinesisDataStreams: KinesisStreamRecord,
279284
EventType.DynamoDBStreams: DynamoDBRecord,
285+
EventType.Kafka: KafkaEventRecord,
280286
}
281287

282288
super().__init__()
@@ -365,6 +371,21 @@ def _collect_dynamodb_failures(self):
365371
failures.append({"itemIdentifier": msg_id})
366372
return failures
367373

374+
def _collect_kafka_failures(self):
375+
failures = []
376+
for msg in self.fail_messages:
377+
# Kafka uses a composite identifier with topic-partition and offset
378+
# Both data class and Pydantic model use the same field names
379+
failures.append(
380+
{
381+
"itemIdentifier": {
382+
"topic-partition": f"{msg.topic}-{msg.partition}",
383+
"offset": msg.offset,
384+
},
385+
},
386+
)
387+
return failures
388+
368389
@overload
369390
def _to_batch_type(
370391
self,

aws_lambda_powertools/utilities/batch/decorators.py

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,47 @@
2222
from aws_lambda_powertools.utilities.typing import LambdaContext
2323

2424

25+
def _get_records_from_event(
26+
event: dict[str, Any],
27+
processor: BasePartialBatchProcessor,
28+
) -> list[dict]:
29+
"""
30+
Extract records from the event based on the processor's event type.
31+
32+
For SQS, Kinesis, and DynamoDB: Records are in event["Records"] as a list
33+
For Kafka: Records are in event["records"] as a dict with topic-partition keys
34+
35+
Parameters
36+
----------
37+
event: dict
38+
Lambda's original event
39+
processor: BasePartialBatchProcessor
40+
Batch Processor to determine event type
41+
42+
Returns
43+
-------
44+
records: list[dict]
45+
Flattened list of records to process
46+
"""
47+
# Kafka events use lowercase "records" and have a nested dict structure
48+
if processor.event_type == EventType.Kafka:
49+
kafka_records = event.get("records", {})
50+
if not kafka_records or not isinstance(kafka_records, dict):
51+
raise UnexpectedBatchTypeError(
52+
"Invalid Kafka event structure. Expected 'records' to be a non-empty dict with topic-partition keys.",
53+
)
54+
# Flatten the nested dict: {"topic-0": [r1, r2], "topic-1": [r3]} -> [r1, r2, r3]
55+
return [record for topic_records in kafka_records.values() for record in topic_records]
56+
57+
# SQS, Kinesis, DynamoDB use uppercase "Records" as a list
58+
records = event.get("Records", [])
59+
if not records or not isinstance(records, list):
60+
raise UnexpectedBatchTypeError(
61+
"Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams, Kafka",
62+
)
63+
return records
64+
65+
2566
@lambda_handler_decorator
2667
@deprecated(
2768
"`async_batch_processor` decorator is deprecated; use `async_process_partial_response` function instead.",
@@ -206,12 +247,7 @@ def handler(event, context):
206247
* Async batch processors. Use `async_process_partial_response` instead.
207248
"""
208249
try:
209-
records: list[dict] = event.get("Records", [])
210-
if not records or not isinstance(records, list):
211-
raise UnexpectedBatchTypeError(
212-
"Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
213-
)
214-
250+
records = _get_records_from_event(event, processor)
215251
except AttributeError:
216252
event_types = ", ".join(list(EventType.__members__))
217253
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
@@ -275,12 +311,7 @@ def handler(event, context):
275311
* Sync batch processors. Use `process_partial_response` instead.
276312
"""
277313
try:
278-
records: list[dict] = event.get("Records", [])
279-
if not records or not isinstance(records, list):
280-
raise UnexpectedBatchTypeError(
281-
"Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
282-
)
283-
314+
records = _get_records_from_event(event, processor)
284315
except AttributeError:
285316
event_types = ", ".join(list(EventType.__members__))
286317
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line

aws_lambda_powertools/utilities/batch/types.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,38 @@
1212
from aws_lambda_powertools.utilities.parser.models import (
1313
KinesisDataStreamRecord as KinesisDataStreamRecordModel,
1414
)
15+
from aws_lambda_powertools.utilities.parser.models.kafka import KafkaRecordModel
1516

1617
BatchTypeModels = Optional[
17-
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]]
18+
Union[
19+
Type[SqsRecordModel],
20+
Type[DynamoDBStreamRecordModel],
21+
Type[KinesisDataStreamRecordModel],
22+
Type[KafkaRecordModel],
23+
]
1824
]
1925
BatchSqsTypeModel = Optional[Type[SqsRecordModel]]
2026
else: # pragma: no cover
2127
BatchTypeModels = "BatchTypeModels" # type: ignore
2228
BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore
2329

2430

31+
class KafkaItemIdentifier(TypedDict):
32+
"""Kafka uses a composite identifier with topic-partition and offset."""
33+
34+
topic_partition: str # Maps to "topic-partition" in the actual response
35+
offset: int
36+
37+
2538
class PartialItemFailures(TypedDict):
26-
itemIdentifier: str
39+
"""
40+
Represents a partial item failure response.
41+
42+
For SQS, Kinesis, and DynamoDB: itemIdentifier is a string (message_id or sequence_number)
43+
For Kafka: itemIdentifier is a KafkaItemIdentifier dict with topic-partition and offset
44+
"""
45+
46+
itemIdentifier: str | KafkaItemIdentifier
2747

2848

2949
class PartialItemFailureResponse(TypedDict):

docs/utilities/batch.md

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ title: Batch Processing
33
description: Utility
44
---
55

6-
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
6+
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, Amazon DynamoDB Streams, and Amazon MSK/self-managed Apache Kafka.
77

88
```mermaid
99
stateDiagram-v2
1010
direction LR
11-
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
11+
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/> Amazon MSK / Apache Kafka <br/><br/>
1212
LambdaInit: Lambda invocation
1313
BatchProcessor: Batch Processor
1414
RecordHandler: Record Handler function
@@ -38,7 +38,7 @@ stateDiagram-v2
3838

3939
## Background
4040

41-
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
41+
When using SQS, Kinesis Data Streams, DynamoDB Streams, or Amazon MSK/Apache Kafka as a Lambda event source, your Lambda functions are triggered with a batch of messages.
4242

4343
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** records expire.
4444

@@ -55,13 +55,14 @@ This behavior changes when you enable Report Batch Item Failures feature in your
5555
<!-- markdownlint-disable MD013 -->
5656
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
5757
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
58+
* [**Kafka (MSK and self-managed)**](#processing-messages-from-kafka). Failed records are identified by topic-partition and offset. Only failed records will be retried.
5859

5960
<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
6061
<!-- markdownlint-disable MD013 -->
6162
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
6263
We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
6364

64-
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
65+
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"}, or [MSK/Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} in the AWS Documentation.
6566

6667
## Getting started
6768

@@ -93,6 +94,12 @@ The remaining sections of the documentation will rely on these samples. For comp
9394
--8<-- "examples/batch_processing/sam/dynamodb_batch_processing.yaml"
9495
```
9596

97+
=== "Kafka (MSK)"
98+
99+
```yaml title="template.yaml" hl_lines="74-75"
100+
--8<-- "examples/batch_processing/sam/kafka_batch_processing.yaml"
101+
```
102+
96103
### Processing messages from SQS
97104

98105
Processing batches from SQS works in three stages:
@@ -237,6 +244,44 @@ Processing batches from DynamoDB Streams works in three stages:
237244
--8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json"
238245
```
239246

247+
### Processing messages from Kafka
248+
249+
Processing batches from Amazon MSK or self-managed Apache Kafka works in three stages:
250+
251+
1. Instantiate **`BatchProcessor`** and choose **`EventType.Kafka`** for the event type
252+
2. Define your function to handle each batch record, and use [`KafkaEventRecord`](data_classes.md#kafka){target="_blank"} type annotation for autocompletion
253+
3. Use **`process_partial_response`** to kick off processing
254+
255+
!!! info "This works with both MSK and self-managed Apache Kafka"
256+
The batch processor automatically handles the different event structures from MSK and self-managed Kafka clusters.
257+
258+
=== "Recommended"
259+
260+
```python hl_lines="2-9 12 18 27"
261+
--8<-- "examples/batch_processing/src/getting_started_kafka.py"
262+
```
263+
264+
1. **Step 1**. Creates a partial failure batch processor for Kafka. See [partial failure mechanics for details](#partial-failure-mechanics)
265+
266+
=== "Sample response"
267+
268+
The second record failed to be processed, therefore the processor added its topic-partition and offset in the response.
269+
270+
```json
271+
--8<-- "examples/batch_processing/src/getting_started_kafka_response.json"
272+
```
273+
274+
=== "Sample event"
275+
276+
```json
277+
--8<-- "examples/batch_processing/src/getting_started_kafka_event.json"
278+
```
279+
280+
!!! tip "Extracting message value"
281+
Use `record.json_value` to get the deserialized JSON body from the Kafka record. For raw bytes access, use `record.decoded_value`.
282+
283+
For advanced deserialization (Avro, Protobuf), see the [Kafka Consumer utility](kafka.md){target="_blank"} which can be used alongside the batch processor.
284+
240285
### Error handling
241286

242287
By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.

docs/utilities/kafka.md

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ flowchart LR
5050
Lambda processes Kafka messages as discrete events rather than continuous streams, requiring a different approach to consumer development that Powertools for AWS helps standardize.
5151

5252
| Aspect | Traditional Kafka Consumers | Lambda Kafka Consumer |
53-
|--------|----------------------------|----------------------|
53+
| ------ | --------------------------- | --------------------- |
5454
| **Model** | Pull-based (you poll for messages) | Push-based (Lambda invoked with messages) |
5555
| **Scaling** | Manual scaling configuration | Automatic scaling to partition count |
5656
| **State** | Long-running application with state | Stateless, ephemeral executions |
@@ -241,7 +241,7 @@ Each Kafka record contains important metadata that you can access alongside the
241241
#### Available metadata properties
242242

243243
| Property | Description | Example Use Case |
244-
|----------|-------------|-----------------|
244+
| -------- | ----------- | ---------------- |
245245
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
246246
| `partition` | Kafka partition number | Tracking message distribution |
247247
| `offset` | Position in the partition | De-duplication, exactly-once processing |
@@ -253,7 +253,7 @@ Each Kafka record contains important metadata that you can access alongside the
253253
| `original_value` | Base64-encoded original message value | Debugging or custom deserialization |
254254
| `original_key` | Base64-encoded original message key | Debugging or custom deserialization |
255255
| `value_schema_metadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
256-
| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
256+
| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
257257

258258
### Custom output serializers
259259

@@ -304,7 +304,7 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati
304304
#### Exception types
305305

306306
| Exception | Description | Common Causes |
307-
|-----------|-------------|---------------|
307+
| --------- | ----------- | ------------- |
308308
| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
309309
| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema |
310310
| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
@@ -325,6 +325,21 @@ The [idempotency utility](idempotency.md){target="_blank"} automatically stores
325325

326326
TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
327327

328+
### Handling partial batch failures
329+
330+
When processing Kafka messages, individual records may fail while others succeed. By default, Lambda retries the entire batch when any record fails. To retry only the failed records, use the [Batch Processing utility](batch.md#processing-messages-from-kafka){target="_blank"} with `EventType.Kafka`.
331+
332+
This feature allows Lambda to checkpoint successful records and only retry the failed ones, significantly improving processing efficiency and reducing duplicate processing.
333+
334+
=== "Kafka with Batch Processing"
335+
336+
```python hl_lines="2-6 12 18-19 27"
337+
--8<-- "examples/batch_processing/src/getting_started_kafka.py"
338+
```
339+
340+
!!! note "Using with deserialization"
341+
The Batch Processing utility uses the basic `KafkaEventRecord` data class. For advanced deserialization (Avro, Protobuf), you can use the Kafka Consumer's deserialization utilities inside your record handler function.
342+
328343
### Best practices
329344

330345
#### Handling large messages

0 commit comments

Comments
 (0)