Skip to content

Latest commit

 

History

History
675 lines (482 loc) · 19.9 KB

File metadata and controls

675 lines (482 loc) · 19.9 KB

Complex Workflow Testing

Table of Contents

← Back to main index

Overview

When your workflows involve multiple steps, nested contexts, or parallel operations, you need to verify more than just the final result. You'll want to check intermediate states, operation ordering, error handling, and timeout behavior.

This guide shows you how to test workflows that chain operations together, handle errors gracefully, and implement polling patterns.

↑ Back to top

Prerequisites

You need both SDKs installed:

pip install aws-durable-execution-sdk-python
pip install aws-durable-execution-sdk-python-testing
pip install pytest

If you're new to testing durable functions, start with Basic test patterns first.

↑ Back to top

Multi-step workflows

Sequential operations

Here's a workflow that processes an order through validation, payment, and fulfillment:

from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution,
    durable_step,
    StepContext,
)

@durable_step
def validate_order(step_context: StepContext, order_id: str) -> dict:
    return {"order_id": order_id, "status": "validated"}

@durable_step
def process_payment(step_context: StepContext, order: dict) -> dict:
    return {**order, "payment_status": "completed"}

@durable_step
def fulfill_order(step_context: StepContext, order: dict) -> dict:
    return {**order, "fulfillment_status": "shipped"}

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    order_id = event["order_id"]
    
    validated = context.step(validate_order(order_id), name="validate")
    paid = context.step(process_payment(validated), name="payment")
    fulfilled = context.step(fulfill_order(paid), name="fulfillment")
    
    return fulfilled

Verify all steps execute in order:

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from test.conftest import deserialize_operation_payload

@pytest.mark.durable_execution(handler=handler, lambda_function_name="order_workflow")
def test_order_workflow(durable_runner):
    """Test order processing executes all steps."""
    with durable_runner:
        result = durable_runner.run(input={"order_id": "order-123"}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    # Check final result
    final_result = deserialize_operation_payload(result.result)
    assert final_result["order_id"] == "order-123"
    assert final_result["payment_status"] == "completed"
    assert final_result["fulfillment_status"] == "shipped"
    
    # Verify all three steps ran
    step_ops = [op for op in result.operations if op.operation_type == OperationType.STEP]
    assert len(step_ops) == 3
    
    # Check step order
    step_names = [op.name for op in step_ops]
    assert step_names == ["validate", "payment", "fulfillment"]

↑ Back to top

Conditional branching

Test different execution paths based on input:

@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    amount = event.get("amount", 0)
    
    context.step(lambda _: amount, name="validate_amount")
    
    if amount > 1000:
        context.step(lambda _: "Manager approval required", name="approval")
        context.wait(Duration.from_seconds(10), name="approval_wait")
        result = context.step(lambda _: "High-value order processed", name="process_high")
    else:
        result = context.step(lambda _: "Standard order processed", name="process_standard")
    
    return result

Test both paths separately:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="conditional_workflow")
def test_high_value_path(durable_runner):
    """Test high-value orders require approval."""
    with durable_runner:
        result = durable_runner.run(input={"amount": 1500}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    assert deserialize_operation_payload(result.result) == "High-value order processed"
    
    # Verify approval step exists
    approval_step = result.get_step("approval")
    assert approval_step is not None

@pytest.mark.durable_execution(handler=handler, lambda_function_name="conditional_workflow")
def test_standard_path(durable_runner):
    """Test standard orders skip approval."""
    with durable_runner:
        result = durable_runner.run(input={"amount": 500}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    # Verify no approval step
    step_names = [op.name for op in result.operations if op.operation_type == OperationType.STEP]
    assert "approval" not in step_names

↑ Back to top

Nested child contexts

Single child context

Child contexts isolate operations:

from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution,
    durable_with_child_context,
)

@durable_with_child_context
def process_item(ctx: DurableContext, item_id: str) -> dict:
    ctx.step(lambda _: f"Validating {item_id}", name="validate")
    result = ctx.step(
        lambda _: {"item_id": item_id, "status": "processed"},
        name="process"
    )
    return result

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    item_id = event["item_id"]
    result = context.run_in_child_context(
        process_item(item_id),
        name="item_processing"
    )
    return result

Verify the child context executes:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="child_context_workflow")
def test_child_context(durable_runner):
    """Test child context execution."""
    with durable_runner:
        result = durable_runner.run(input={"item_id": "item-123"}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    # Check child context ran
    context_ops = [op for op in result.operations if op.operation_type.value == "CONTEXT"]
    assert len(context_ops) == 1
    assert context_ops[0].name == "item_processing"
    
    # Check child context result
    child_result = result.get_context("item_processing")
    child_data = deserialize_operation_payload(child_result.result)
    assert child_data["item_id"] == "item-123"

↑ Back to top

Multiple child contexts

Use multiple child contexts to organize operations:

@durable_with_child_context
def validate_data(ctx: DurableContext, data: dict) -> dict:
    return ctx.step(lambda _: {**data, "validated": True}, name="validate")

@durable_with_child_context
def transform_data(ctx: DurableContext, data: dict) -> dict:
    return ctx.step(lambda _: {**data, "transformed": True}, name="transform")

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    data = event["data"]
    
    validated = context.run_in_child_context(validate_data(data), name="validation")
    transformed = context.run_in_child_context(transform_data(validated), name="transformation")
    
    return transformed

Verify both contexts execute:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="multiple_contexts")
def test_multiple_child_contexts(durable_runner):
    """Test multiple child contexts."""
    with durable_runner:
        result = durable_runner.run(input={"data": {"value": 42}}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    final_result = deserialize_operation_payload(result.result)
    assert final_result["validated"] is True
    assert final_result["transformed"] is True
    
    # Verify both contexts ran
    context_ops = [op for op in result.operations if op.operation_type.value == "CONTEXT"]
    assert len(context_ops) == 2

↑ Back to top

Parallel operations

Basic parallel execution

Multiple operations execute concurrently:

@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    task1 = context.step(lambda _: "Task 1 complete", name="task1")
    task2 = context.step(lambda _: "Task 2 complete", name="task2")
    task3 = context.step(lambda _: "Task 3 complete", name="task3")
    
    return [task1, task2, task3]

Verify all operations execute:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="parallel_ops")
def test_parallel_operations(durable_runner):
    """Test parallel execution."""
    with durable_runner:
        result = durable_runner.run(input={}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    results = deserialize_operation_payload(result.result)
    assert len(results) == 3
    
    # Verify all steps ran
    step_ops = [op for op in result.operations if op.operation_type == OperationType.STEP]
    assert len(step_ops) == 3
    
    step_names = {op.name for op in step_ops}
    assert step_names == {"task1", "task2", "task3"}

↑ Back to top

Processing collections

Process collection items in parallel:

@durable_execution
def handler(event: dict, context: DurableContext) -> list[int]:
    numbers = event.get("numbers", [1, 2, 3, 4, 5])
    
    results = []
    for i, num in enumerate(numbers):
        result = context.step(lambda _, n=num: n * 2, name=f"square_{i}")
        results.append(result)
    
    return results

Verify collection processing:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="parallel_collection")
def test_collection_processing(durable_runner):
    """Test collection processing."""
    with durable_runner:
        result = durable_runner.run(input={"numbers": [1, 2, 3, 4, 5]}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10]
    
    # Verify all steps ran
    step_ops = [op for op in result.operations if op.operation_type == OperationType.STEP]
    assert len(step_ops) == 5

↑ Back to top

Error scenarios

Expected failures

Test that your workflow fails correctly:

@durable_step
def validate_input(step_context: StepContext, value: int) -> int:
    if value < 0:
        raise ValueError("Value must be non-negative")
    return value

@durable_execution
def handler(event: dict, context: DurableContext) -> int:
    value = event.get("value", 0)
    validated = context.step(validate_input(value), name="validate")
    return validated

Verify validation failures:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="validation_workflow")
def test_validation_failure(durable_runner):
    """Test validation fails with invalid input."""
    with durable_runner:
        result = durable_runner.run(input={"value": -5}, timeout=30)
    
    assert result.status is InvocationStatus.FAILED
    assert "Value must be non-negative" in str(result.error)

↑ Back to top

Retry behavior

Test operations that retry on failure:

from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import (
    RetryStrategyConfig,
    create_retry_strategy,
)

attempt_count = 0

@durable_step
def unreliable_operation(step_context: StepContext) -> str:
    global attempt_count
    attempt_count += 1
    
    if attempt_count < 3:
        raise RuntimeError("Transient error")
    
    return "Operation succeeded"

@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    retry_config = RetryStrategyConfig(
        max_attempts=5,
        retryable_error_types=[RuntimeError],
    )
    
    result = context.step(
        unreliable_operation(),
        config=StepConfig(create_retry_strategy(retry_config)),
        name="unreliable"
    )
    
    return result

Verify retry succeeds:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="retry_workflow")
def test_retry_behavior(durable_runner):
    """Test operation retries on failure."""
    global attempt_count
    attempt_count = 0
    
    with durable_runner:
        result = durable_runner.run(input={}, timeout=60)
    
    assert result.status is InvocationStatus.SUCCEEDED
    assert deserialize_operation_payload(result.result) == "Operation succeeded"
    assert attempt_count >= 3

↑ Back to top

Partial failures

Test workflows where some operations succeed before failure:

@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    context.step(lambda _: "Step 1 complete", name="step1")
    context.step(lambda _: "Step 2 complete", name="step2")
    context.step(
        lambda _: (_ for _ in ()).throw(RuntimeError("Step 3 failed")),
        name="step3"
    )
    return "Should not reach here"

Verify partial execution:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="partial_failure")
def test_partial_failure(durable_runner):
    """Test workflow fails after some steps succeed."""
    with durable_runner:
        result = durable_runner.run(input={}, timeout=30)
    
    assert result.status is InvocationStatus.FAILED
    
    # First two steps succeeded
    step1 = result.get_step("step1")
    assert deserialize_operation_payload(step1.result) == "Step 1 complete"
    
    step2 = result.get_step("step2")
    assert deserialize_operation_payload(step2.result) == "Step 2 complete"
    
    assert "Step 3 failed" in str(result.error)

↑ Back to top

Timeout handling

Callback timeouts

Verify callback timeout configuration:

from aws_durable_execution_sdk_python.config import CallbackConfig

@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    config = CallbackConfig(timeout_seconds=60, heartbeat_timeout_seconds=30)
    callback = context.create_callback(name="approval_callback", config=config)
    return f"Callback created: {callback.callback_id}"

Test callback configuration:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="callback_timeout")
def test_callback_timeout(durable_runner):
    """Test callback timeout configuration."""
    with durable_runner:
        result = durable_runner.run(input={}, timeout=10)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    callback_ops = [op for op in result.operations if op.operation_type.value == "CALLBACK"]
    assert len(callback_ops) == 1
    assert callback_ops[0].name == "approval_callback"

↑ Back to top

Long waits

For workflows with long waits, verify configuration without actually waiting:

@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    context.step(lambda _: "Starting", name="start")
    context.wait(Duration.from_seconds(3600), name="long_wait")  # 1 hour
    context.step(lambda _: "Continuing", name="continue")
    return "Complete"

Test completes quickly:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="long_wait")
def test_long_wait(durable_runner):
    """Test long wait configuration."""
    with durable_runner:
        result = durable_runner.run(input={}, timeout=10)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    # Verify wait exists
    wait_ops = [op for op in result.operations if op.operation_type.value == "WAIT"]
    assert len(wait_ops) == 1
    assert wait_ops[0].name == "long_wait"

↑ Back to top

Polling patterns

Wait-for-condition

Poll until a condition is met:

@durable_execution
def handler(event: dict, context: DurableContext) -> int:
    state = 0
    attempt = 0
    max_attempts = 5
    
    while attempt < max_attempts:
        attempt += 1
        
        state = context.step(lambda _, s=state: s + 1, name=f"increment_{attempt}")
        
        if state >= 3:
            break
        
        context.wait(Duration.from_seconds(1), name=f"wait_{attempt}")
    
    return state

Verify polling behavior:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="polling")
def test_polling(durable_runner):
    """Test wait-for-condition pattern."""
    with durable_runner:
        result = durable_runner.run(input={}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    assert deserialize_operation_payload(result.result) == 3
    
    # Should have 3 increment steps
    step_ops = [op for op in result.operations if op.operation_type == OperationType.STEP]
    assert len(step_ops) == 3
    
    # Should have 2 waits (before reaching state 3)
    wait_ops = [op for op in result.operations if op.operation_type.value == "WAIT"]
    assert len(wait_ops) == 2

↑ Back to top

Maximum attempts

Test polling respects attempt limits:

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    target = event.get("target", 10)
    state = 0
    attempt = 0
    max_attempts = 5
    
    while attempt < max_attempts and state < target:
        attempt += 1
        state = context.step(lambda _, s=state: s + 1, name=f"attempt_{attempt}")
        
        if state < target:
            context.wait(Duration.from_seconds(1), name=f"wait_{attempt}")
    
    return {"state": state, "attempts": attempt, "reached_target": state >= target}

Test with unreachable target:

@pytest.mark.durable_execution(handler=handler, lambda_function_name="max_attempts")
def test_max_attempts(durable_runner):
    """Test polling stops at max attempts."""
    with durable_runner:
        result = durable_runner.run(input={"target": 10}, timeout=30)
    
    assert result.status is InvocationStatus.SUCCEEDED
    
    final_result = deserialize_operation_payload(result.result)
    assert final_result["attempts"] == 5
    assert final_result["state"] == 5
    assert final_result["reached_target"] is False

↑ Back to top

FAQ

Q: How do I test workflows with long waits?

A: The test runner doesn't actually wait. You can verify wait operations are configured correctly without waiting for them to complete.

Q: Can I test workflows with external API calls?

A: Yes, but mock external dependencies in your tests. The test runner executes your code locally, so standard Python mocking works.

Q: What's the best way to test conditional logic?

A: Write separate tests for each execution path. Use descriptive test names and verify the specific operations that should execute in each path.

Q: How do I verify operation ordering?

A: Iterate through result.operations and check the order. You can also use operation names to verify specific sequences.

Q: What timeout should I use?

A: Use a timeout slightly longer than expected execution time. For most tests, 30-60 seconds is sufficient.

Q: How do I test error recovery?

A: Test both the failure case (verify the error is raised) and the recovery case (verify retry succeeds). Use separate tests for each scenario.

↑ Back to top

See also

↑ Back to top

License

See the LICENSE file for our project's licensing.

↑ Back to top