Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ dist/
.kiro/

/examples/build/*
/examples/*.zip
/examples/*.zip

.env
24 changes: 23 additions & 1 deletion examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,28 @@
"ApplicationLogLevel": "DEBUG",
"LogFormat": "JSON"
}
}
},
{
"name": "Map with Item Namer",
"description": "Map operation with custom item_namer for iteration naming",
"handler": "map_with_item_namer.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_item_namer.py"
},
{
"name": "Parallel with Named Branches",
"description": "Parallel operation with named branches using ParallelBranch",
"handler": "parallel_with_named_branches.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_named_branches.py"
}
]
}
30 changes: 30 additions & 0 deletions examples/src/map/map_with_item_namer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Example demonstrating map operations with custom iteration naming."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Process orders using context.map() with custom iteration names."""
orders = [
{"id": "order-101", "amount": 25},
{"id": "order-102", "amount": 50},
{"id": "order-103", "amount": 75},
]

return context.map(
inputs=orders,
func=lambda ctx, order, index, _: ctx.step(
lambda _: f"processed-{order['id']}-${order['amount']}",
name=f"process_{order['id']}",
),
name="process_orders",
config=MapConfig(
max_concurrency=2,
item_namer=lambda order, index: f"order-{order['id']}",
),
).get_results()
35 changes: 35 additions & 0 deletions examples/src/parallel/parallel_with_named_branches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Example demonstrating parallel operations with named branches."""

from typing import Any

from aws_durable_execution_sdk_python.config import ParallelBranch, ParallelConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Execute named parallel branches using ParallelBranch."""

return context.parallel(
functions=[
ParallelBranch(
func=lambda ctx: ctx.step(
lambda _: "user-data-loaded", name="load_user"
),
name="fetch-user-data",
),
ParallelBranch(
func=lambda ctx: ctx.step(
lambda _: "orders-loaded", name="load_orders"
),
name="fetch-order-history",
),
ParallelBranch(
func=lambda ctx: ctx.step(lambda _: "prefs-loaded", name="load_prefs"),
name="fetch-preferences",
),
],
name="load_all_data",
config=ParallelConfig(max_concurrency=3),
).get_results()
36 changes: 36 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,42 @@
"ExecutionTimeout": 300
}
}
},
"MapWithItemNamer": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "map_with_item_namer.handler",
"Description": "Map operation with custom item_namer for iteration naming",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
},
"ParallelWithNamedBranches": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "parallel_with_named_branches.handler",
"Description": "Parallel operation with named branches using ParallelBranch",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
}
}
}
39 changes: 39 additions & 0 deletions examples/test/map/test_map_with_item_namer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Tests for map_with_item_namer example."""

import pytest
from src.map import map_with_item_namer
from test.conftest import deserialize_operation_payload

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import (
OperationStatus,
)


@pytest.mark.example
@pytest.mark.durable_execution(
handler=map_with_item_namer.handler,
lambda_function_name="map with item namer",
)
def test_map_with_item_namer(durable_runner):
"""Test map example with custom item_namer for iteration naming."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"processed-order-101-$25",
"processed-order-102-$50",
"processed-order-103-$75",
]

# Get the map operation
map_op = result.get_context("process_orders")
assert map_op is not None
assert map_op.status is OperationStatus.SUCCEEDED

# Verify custom iteration names from item_namer
assert len(map_op.child_operations) == 3
child_names = {op.name for op in map_op.child_operations}
expected_names = {"order-order-101", "order-order-102", "order-order-103"}
assert child_names == expected_names
45 changes: 45 additions & 0 deletions examples/test/parallel/test_parallel_with_named_branches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Tests for parallel_with_named_branches example."""

import pytest
from src.parallel import parallel_with_named_branches
from test.conftest import deserialize_operation_payload

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import (
OperationStatus,
OperationType,
)


@pytest.mark.example
@pytest.mark.durable_execution(
handler=parallel_with_named_branches.handler,
lambda_function_name="parallel with named branches",
)
def test_parallel_with_named_branches(durable_runner):
"""Test parallel example with named branches using ParallelBranch."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"user-data-loaded",
"orders-loaded",
"prefs-loaded",
]

# Get the parallel operation
parallel_op = result.get_context("load_all_data")
assert parallel_op is not None
assert parallel_op.status is OperationStatus.SUCCEEDED

# Verify custom branch names from ParallelBranch
assert len(parallel_op.child_operations) == 3
child_names = {op.name for op in parallel_op.child_operations}
expected_names = {"fetch-user-data", "fetch-order-history", "fetch-preferences"}
assert child_names == expected_names

# Verify all children succeeded
for child in parallel_op.child_operations:
assert child.operation_type == OperationType.CONTEXT
assert child.status is OperationStatus.SUCCEEDED
3 changes: 3 additions & 0 deletions src/aws_durable_execution_sdk_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Helper decorators - commonly used for step functions
# Concurrency
from aws_durable_execution_sdk_python.concurrency.models import BatchResult
from aws_durable_execution_sdk_python.config import ParallelBranch
from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_step,
Expand All @@ -27,11 +28,13 @@
# Essential context types - passed to user functions
from aws_durable_execution_sdk_python.types import StepContext


__all__ = [
"BatchResult",
"DurableContext",
"DurableExecutionsError",
"InvocationError",
"ParallelBranch",
"StepContext",
"ValidationError",
"__version__",
Expand Down
10 changes: 9 additions & 1 deletion src/aws_durable_execution_sdk_python/concurrency/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ def execute_item(
"""Execute a single executable in a child context and return the result."""
raise NotImplementedError

def get_iteration_name(self, index: int) -> str:
"""Get the display name for an iteration/branch at the given index.

Subclasses can override this to provide custom naming (e.g., from item_namer
or branch names). The default returns "{name_prefix}{index}".
"""
return f"{self.name_prefix}{index}"

def execute(
self, execution_state: ExecutionState, executor_context: DurableContext
) -> BatchResult[ResultType]:
Expand Down Expand Up @@ -410,7 +418,7 @@ def _execute_item_in_child_context(
operation_id: str = executor_context._create_step_id_for_logical_step( # noqa: SLF001
executable.index
)
name: str = f"{self.name_prefix}{executable.index}"
name: str = self.get_iteration_name(executable.index)
is_virtual: bool = self.nesting_type is NestingType.FLAT

child_context: DurableContext = executor_context.create_child_context(
Expand Down
54 changes: 53 additions & 1 deletion src/aws_durable_execution_sdk_python/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from aws_durable_execution_sdk_python.exceptions import ValidationError


P = TypeVar("P") # Payload type
R = TypeVar("R") # Result type
T = TypeVar("T")
Expand Down Expand Up @@ -245,6 +246,41 @@ class ParallelConfig:
nesting_type: NestingType = NestingType.NESTED


@dataclass(frozen=True)
class ParallelBranch(Generic[T]):
"""A named branch for parallel execution.

Use this to provide custom names for parallel branches, improving
observability in execution history.

Type Parameters:
T: The return type of the branch function.

Args:
func: The callable to execute in this branch. Receives a DurableContext.
name: Optional custom name for this branch. When provided, replaces
the default "parallel-branch-{index}" naming in execution history.
This affects observability but not replay determinism.

Example:
context.parallel(
functions=[
ParallelBranch(func=lambda ctx: fetch_user(ctx), name="fetch-user-data"),
ParallelBranch(func=lambda ctx: fetch_orders(ctx), name="fetch-order-history"),
],
name="load-data",
config=ParallelConfig(max_concurrency=2),
)
"""

func: Callable
name: str | None = None

def __call__(self, *args, **kwargs):
"""Delegate to the wrapped function, making ParallelBranch itself callable."""
return self.func(*args, **kwargs)


class StepSemantics(Enum):
AT_MOST_ONCE_PER_RETRY = "AT_MOST_ONCE_PER_RETRY"
AT_LEAST_ONCE_PER_RETRY = "AT_LEAST_ONCE_PER_RETRY"
Expand Down Expand Up @@ -354,12 +390,15 @@ class ItemBatcher(Generic[T]):


@dataclass(frozen=True)
class MapConfig:
class MapConfig(Generic[T]):
"""Configuration options for map operations over collections.

This class configures how map operations process collections of items,
including concurrency, batching, completion criteria, and serialization.

Type Parameters:
T: The type of items being processed in the map operation.

Args:
max_concurrency: Maximum number of items to process concurrently.
If None, no limit is imposed and all items are processed concurrently.
Expand Down Expand Up @@ -402,13 +441,25 @@ class MapConfig:
- NESTED: Each item runs in its own isolated context (default)
- FLAT: All items share the same parent context

item_namer: Optional callable to generate custom names for each map iteration.
When provided, replaces the default "map-item-{index}" naming scheme.
Receives the item and its index, and returns a string name for that iteration.
This affects observability (execution history names) but not replay determinism.
If None, uses the default naming: "map-item-{index}".

Example:
# Process 5 items at a time, batch by count, require all to succeed
config = MapConfig(
max_concurrency=5,
item_batcher=ItemBatcher(max_items_per_batch=10),
completion_config=CompletionConfig.all_successful()
)

# With custom iteration names
config = MapConfig(
max_concurrency=5,
item_namer=lambda item, index: f"process-order-{item.id}"
)
"""

max_concurrency: int | None = None
Expand All @@ -418,6 +469,7 @@ class MapConfig:
item_serdes: SerDes | None = None
summary_generator: SummaryGenerator | None = None
nesting_type: NestingType = NestingType.NESTED
item_namer: Callable[[T, int], str] | None = None


@dataclass(frozen=True)
Expand Down
Loading
Loading