- What are map operations?
- Terminology
- Key features
- Getting started
- Method signature
- Map function signature
- Configuration
- Advanced patterns
- Best practices
- Performance tips
- FAQ
- Testing
- See also
Map operation - A durable operation that processes a collection of items in parallel, where each item is processed independently and checkpointed. Created using context.map().
Map function - A function that processes a single item from the collection. Receives the context, item, index, and full collection as parameters.
BatchResult - The result type returned by map operations, containing results from all processed items with success/failure status.
Concurrency control - Limiting how many items process simultaneously using max_concurrency in MapConfig.
Completion criteria - Rules that determine when a map operation succeeds or fails based on individual item results.
Map operations let you process collections durably by applying a function to each item in parallel. Each item's processing is checkpointed independently, so if your function is interrupted, completed items don't need to be reprocessed.
Use map operations to:
- Transform collections with automatic checkpointing
- Process lists of items in parallel
- Handle large datasets with resilience
- Control concurrency behavior
- Define custom success/failure criteria
Map operations use context.map() to process collections efficiently. Each item becomes an independent operation that executes in parallel with other items.
- Parallel processing - Items process concurrently by default
- Independent checkpointing - Each item's result is saved separately
- Partial completion - Completed items don't reprocess on replay
- Concurrency control - Limit simultaneous processing with
max_concurrency - Flexible completion - Define custom success/failure criteria
- Result ordering - Results maintain the same order as inputs
Here's a simple example of processing a collection:
from aws_durable_execution_sdk_python import (
DurableContext,
durable_execution,
BatchResult,
)
def square(context: DurableContext, item: int, index: int, items: list[int]) -> int:
"""Square a number."""
return item * item
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
"""Process a list of items using map operations."""
items = [1, 2, 3, 4, 5]
result = context.map(items, square)
# Convert to dict for JSON serialization (BatchResult is not JSON serializable)
return result.to_dict()
When this function runs:
- Each item is processed in parallel
- The
squarefunction is called for each item - Each result is checkpointed independently
- The function returns a dict with results
[1, 4, 9, 16, 25]
If the function is interrupted after processing items 0-2, it resumes at item 3 without reprocessing the first three items.
def map(
inputs: Sequence[U],
func: Callable[[DurableContext, U, int, Sequence[U]], T],
name: str | None = None,
config: MapConfig | None = None,
) -> BatchResult[T]
Parameters:
inputs- A sequence of items to process (list, tuple, or any sequence type).func- A callable that processes each item. See Map function signature for details.name(optional) - A name for the map operation, useful for debugging and testing.config(optional) - AMapConfigobject to configure concurrency and completion criteria.
Returns: A BatchResult[T] containing the results from processing all items.
Raises: Exceptions based on the completion criteria defined in MapConfig.
The map function receives four parameters:
def process_item(
context: DurableContext,
item: U,
index: int,
items: Sequence[U]
) -> T:
"""Process a single item from the collection."""
# Your processing logic here
return result
Parameters:
context- ADurableContextfor the item's processing. Use this to call steps, waits, or other operations.item- The current item being processed.index- The zero-based index of the item in the original collection.items- The full collection of items being processed.
Returns: The result of processing the item.
def validate_email(
context: DurableContext,
item: str,
index: int,
items: list[str]
) -> dict:
"""Validate an email address."""
is_valid = "@" in item and "." in item
return {
"email": item,
"valid": is_valid,
"position": index,
"total": len(items)
}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
emails = ["jane_doe@example.com", "john_doe@example.org", "invalid"]
result = context.map(emails, validate_email)
return result.to_dict()
Configure map behavior using MapConfig:
from aws_durable_execution_sdk_python import (
DurableContext,
durable_execution,
BatchResult,
)
from aws_durable_execution_sdk_python.config import (
MapConfig,
CompletionConfig,
)
def process_item(context: DurableContext, item: int, index: int, items: list[int]) -> dict:
"""Process a single item."""
return {"item": item, "squared": item * item}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
items = list(range(100))
# Configure map operation
config = MapConfig(
max_concurrency=10, # Process 10 items at a time
completion_config=CompletionConfig.all_successful(), # Require all to succeed
)
result = context.map(items, process_item, name="process_numbers", config=config)
return result.to_dict()
max_concurrency - Maximum number of items to process concurrently. If None, all items process in parallel. Use this to control resource usage.
completion_config - Defines when the map operation succeeds or fails:
CompletionConfig()- Default, allows any number of failuresCompletionConfig.all_successful()- Requires all items to succeedCompletionConfig(min_successful=N)- Requires at least N items to succeedCompletionConfig(tolerated_failure_count=N)- Fails after N failuresCompletionConfig(tolerated_failure_percentage=X)- Fails if more than X% fail
serdes - Custom serialization for the entire BatchResult. If None, uses JSON serialization.
item_serdes - Custom serialization for individual item results. If None, uses JSON serialization.
summary_generator - Function to generate compact summaries for large results (>256KB).
Limit how many items process simultaneously:
from aws_durable_execution_sdk_python.config import MapConfig
def fetch_data(context: DurableContext, url: str, index: int, urls: list[str]) -> dict:
"""Fetch data from a URL."""
# Network call that might be rate-limited
return {"url": url, "data": "..."}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
urls = [f"https://example.com/api/{i}" for i in range(100)]
# Process only 5 URLs at a time
config = MapConfig(max_concurrency=5)
result = context.map(urls, fetch_data, config=config)
return result.to_dict()```
### Custom completion criteria
Define when the map operation should succeed or fail:
```python
from aws_durable_execution_sdk_python.config import MapConfig, CompletionConfig
def process_item(context: DurableContext, item: int, index: int, items: list[int]) -> dict:
"""Process an item that might fail."""
# Processing that might fail
if item % 7 == 0:
raise ValueError(f"Item {item} failed")
return {"item": item, "processed": True}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
items = list(range(20))
# Succeed if at least 15 items succeed, fail after 5 failures
config = MapConfig(
completion_config=CompletionConfig(
min_successful=15,
tolerated_failure_count=5,
)
)
result = context.map(items, process_item, config=config)
return result.to_dict()
Call steps, waits, or other operations inside map functions:
from aws_durable_execution_sdk_python import durable_step, StepContext
@durable_step
def fetch_user_data(step_context: StepContext, user_id: str) -> dict:
"""Fetch user data from external service."""
return {"user_id": user_id, "name": "Jane Doe", "email": "jane_doe@example.com"}
@durable_step
def send_notification(step_context: StepContext, user: dict) -> dict:
"""Send notification to user."""
return {"sent": True, "email": user["email"]}
def process_user(
context: DurableContext,
user_id: str,
index: int,
user_ids: list[str]
) -> dict:
"""Process a user by fetching data and sending notification."""
# Use steps within the map function
user = context.step(fetch_user_data(user_id))
notification = context.step(send_notification(user))
return {"user_id": user_id, "notification_sent": notification["sent"]}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
"""Process multiple users using context operations within map functions."""
user_ids = ["user_1", "user_2", "user_3"]
result = context.map(user_ids, process_user)
# Convert to dict for JSON serialization (BatchResult is not JSON serializable)
return result.to_dict()
Access individual results from the BatchResult:
def check_inventory(
context: DurableContext,
product_id: str,
index: int,
products: list[str]
) -> dict:
"""Check if a product is in stock."""
# Check if product is in stock
return {"product_id": product_id, "in_stock": True, "quantity": 10}
@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
product_ids = ["prod_1", "prod_2", "prod_3", "prod_4"]
# Get all inventory results
batch_result = context.map(product_ids, check_inventory)
# Filter to only in-stock products
in_stock = [
r.result["product_id"]
for r in batch_result.results
if r.result["in_stock"]
]
return in_stock
Use descriptive names - Name your map operations for easier debugging: context.map(items, process_item, name="process_orders").
Control concurrency for external calls - When calling external APIs, use max_concurrency to avoid rate limits.
Define completion criteria - Use CompletionConfig to specify when the operation should succeed or fail.
Keep map functions focused - Each map function should process one item. Don't mix collection iteration with item processing.
Use context operations - Call steps, waits, or other operations inside map functions for complex processing.
Handle errors gracefully - Wrap error-prone code in try-except blocks or use completion criteria to tolerate failures.
Consider collection size - For very large collections (10,000+ items), consider processing in chunks.
Monitor memory usage - Large collections create many checkpoints. Monitor Lambda memory usage.
Return only necessary data - Large result objects increase checkpoint size. Return minimal data from map functions.
Parallel execution is automatic - Items execute concurrently by default. Don't try to manually parallelize.
Use max_concurrency wisely - Too much concurrency can overwhelm external services or exhaust Lambda resources. Start conservative and increase as needed.
Optimize map functions - Keep map functions lightweight. Move heavy computation into steps within the map function.
Use appropriate completion criteria - Fail fast with tolerated_failure_count to avoid processing remaining items when many fail.
Monitor checkpoint size - Large result objects increase checkpoint size and Lambda memory usage. Return only necessary data.
Consider memory limits - Processing thousands of items creates many checkpoints. Monitor Lambda memory and adjust concurrency.
Profile your workload - Test with representative data to find optimal concurrency settings.
Q: What's the difference between map and parallel operations?
A: Map operations process a collection of similar items using the same function. Parallel operations execute different functions concurrently. Use map for collections, parallel for heterogeneous tasks.
Q: How many items can I process?
A: There's no hard limit, but consider Lambda's memory and timeout constraints. For very large collections (10,000+ items), consider processing in chunks.
Q: Do items process in order?
A: Items execute in parallel, so processing order is non-deterministic. However, results maintain the same order as inputs in the BatchResult.
Q: What happens if one item fails?
A: By default, the map operation continues processing other items. Use CompletionConfig to define failure behavior (e.g., fail after N failures).
Q: Can I use async functions in map operations?
A: No, map functions must be synchronous. If you need async processing, use asyncio.run() inside your map function.
Q: How do I access individual results?
A: The BatchResult contains a results list with each item's result:
batch_result = context.map(items, process_item)
for item_result in batch_result.results:
print(item_result.result)
Q: Can I nest map operations?
A: Yes, you can call context.map() inside a map function to process nested collections.
Q: What's the difference between serdes and item_serdes?
A: item_serdes serializes individual item results as they complete. serdes serializes the entire BatchResult at the end. Use both for custom serialization at different levels.
Q: How do I handle partial failures?
A: Check the BatchResult.results list. Each result has a status indicating success or failure:
batch_result = context.map(items, process_item)
successful = [r for r in batch_result.results if r.status == "SUCCEEDED"]
failed = [r for r in batch_result.results if r.status == "FAILED"]
Q: Can I use map operations with steps?
A: Yes, call context.step() inside your map function to execute steps for each item.
You can test map operations using the testing SDK. The test runner executes your function and lets you inspect individual item results.
import pytest
from aws_durable_execution_sdk_python_testing import InvocationStatus
from my_function import handler
@pytest.mark.durable_execution(
handler=handler,
lambda_function_name="map_operations",
)
def test_map_operations(durable_runner):
"""Test map operations."""
with durable_runner:
result = durable_runner.run(input={}, timeout=10)
# Check overall status
assert result.status is InvocationStatus.SUCCEEDED
# Check the BatchResult
batch_result = result.result
assert batch_result.total_count == 5
assert batch_result.success_count == 5
assert batch_result.failure_count == 0
# Check individual results
assert batch_result.results[0].result == 1
assert batch_result.results[1].result == 4
assert batch_result.results[2].result == 9
Use result.get_map() to inspect the map operation:
from aws_durable_execution_sdk_python.lambda_service import OperationType
@pytest.mark.durable_execution(
handler=handler,
lambda_function_name="map_operations",
)
def test_map_individual_items(durable_runner):
"""Test individual item processing."""
with durable_runner:
result = durable_runner.run(input={}, timeout=10)
# Get the map operation
map_op = result.get_map("square")
assert map_op is not None
# Verify all items were processed
assert map_op.result.total_count == 5
# Check specific items
assert map_op.result.results[0].result == 1
assert map_op.result.results[2].result == 9
Test that individual item failures are handled correctly:
@pytest.mark.durable_execution(
handler=handler_with_errors,
lambda_function_name="map_with_errors",
)
def test_map_error_handling(durable_runner):
"""Test error handling in map operations."""
with durable_runner:
result = durable_runner.run(input={}, timeout=10)
# Function should handle errors based on completion config
assert result.status is InvocationStatus.SUCCEEDED
batch_result = result.result
# Check that some items succeeded
successful = [r for r in batch_result.results if r.status == "SUCCEEDED"]
assert len(successful) > 0
# Check that some items failed
failed = [r for r in batch_result.results if r.status == "FAILED"]
assert len(failed) > 0
Test map operations with custom configuration:
from aws_durable_execution_sdk_python.config import MapConfig, CompletionConfig
@pytest.mark.durable_execution(
handler=handler_with_config,
lambda_function_name="map_with_config",
)
def test_map_with_config(durable_runner):
"""Test map operations with custom configuration."""
with durable_runner:
result = durable_runner.run(input={}, timeout=30)
# Verify the map operation completed
assert result.status is InvocationStatus.SUCCEEDED
# Get the map operation
map_op = result.get_map("process_items")
# Verify configuration was applied
assert map_op is not None
assert map_op.result.total_count > 0
For more testing patterns, see:
- Basic tests - Simple test examples
- Complex workflows - Multi-step workflow testing
- Best practices - Testing recommendations
- Parallel operations - Execute different functions concurrently
- Steps - Understanding step operations
- Child contexts - Organizing complex workflows
- Configuration - MapConfig and CompletionConfig details
- BatchResult - Working with batch results
- Examples - More map examples
See the LICENSE file for our project's licensing.