Conductor supports dynamic parallel execution via for-each groups, which allow workflows to process arrays of items with parallel agent instances spawned at runtime. This is perfect for tasks like analyzing multiple KPIs, processing datasets, or performing batch operations.
Static parallel groups run a fixed set of named agents:
parallel_group: [agent_a, agent_b, agent_c] # Known at workflow definition
Dynamic parallel (for-each) groups run N copies of an agent template, where N is determined at runtime:
for_each(items): # N = len(items), determined when workflow runs
agent_template(item)
✅ Use for-each when:
- Processing variable-length arrays (e.g., list of KPIs, files, user requests)
- Each item requires the same processing logic
- The number of items is unknown at workflow definition time
- You want to parallelize batch operations
❌ Use static parallel when:
- Running distinct agents with different purposes
- Agent count is fixed and known at definition time
- Agents have different prompts or configurations
Define for-each groups at the top level of your workflow YAML:
workflow:
name: kpi-analyzer
entry_point: kpi_finder
for_each:
- name: kpi_processors
type: for_each
source: kpi_finder.output.kpis # Array reference
as: kpi # Loop variable name
max_concurrent: 5 # Parallel execution limit
failure_mode: continue_on_error # Error handling
agent:
model: claude-sonnet-4.5
prompt: |
Analyze this KPI:
Name: {{ kpi.name }}
Value: {{ kpi.value }}
Provide insights and recommendations.
output:
analysis:
type: string
recommendations:
type: array
routes:
- to: aggregator
agents:
- name: kpi_finder
# ... finds KPIs ...
routes:
- to: kpi_processors
- name: aggregator
# ... aggregates results ...name: Unique identifier for the for-each grouptype: Must be"for_each"to mark as dynamic parallelsource: Reference to array in context (dotted path notation)as: Loop variable name (accessible in templates)agent: Inline agent definition used as template
description: Human-readable purpose descriptionmax_concurrent: Maximum concurrent executions per batch (default: 10)failure_mode: Error handling strategy (default:fail_fast)key_by: Path to extract keys for dict-based outputsroutes: Routing rules evaluated after for-each execution
The source field uses dotted path notation to reference arrays in the workflow context:
# Reference agent output fields
source: finder.output.items
source: analyzer.output.results.data_points
# Reference workflow input
source: workflow.input.tasks
# Reference parallel group outputs
source: parallel_fetchers.outputs.data_collector.itemsAt runtime, the source path is resolved to an array. If the path:
- Doesn't exist: Error is raised with clear message
- Not an array: Error is raised (must be list type)
- Empty array: For-each completes immediately with empty outputs
Three special variables are available in agent templates:
The loop variable (specified in as:) contains the current array item:
for_each:
- name: processors
source: finder.output.items
as: item
agent:
prompt: |
Current item: {{ item }}
# If items are objects:
Item ID: {{ item.id }}
Item name: {{ item.name }}The index of the current item in the source array:
agent:
prompt: |
Processing item {{ _index + 1 }} of {{ total_items }}
Item data: {{ item }}When key_by is specified, {{ _key }} contains the extracted key:
for_each:
- name: analyzers
source: finder.output.kpis
as: kpi
key_by: kpi.kpi_id # Extract kpi.kpi_id as key
agent:
prompt: |
Analyzing KPI: {{ _key }} # The kpi_id value
Full KPI data: {{ kpi | json }}Reserved names: The following cannot be used as loop variable names:
workflow,context,output,_index,_key
For-each groups process items in sequential batches controlled by max_concurrent:
for_each:
- name: processors
source: finder.output.items # Suppose this resolves to 25 items
max_concurrent: 10
agent:
# ... agent definition ...Execution flow:
- Batch 1: Items 0-9 execute in parallel (10 items)
- Wait for batch 1 to complete
- Batch 2: Items 10-19 execute in parallel (10 items)
- Wait for batch 2 to complete
- Batch 3: Items 20-24 execute in parallel (5 items)
- Complete
Why batching?
- Prevents unbounded parallelism (e.g., 1000 items → 1000 concurrent agents)
- Controls memory usage and API rate limits
- Provides progress feedback between batches
Setting max_concurrent:
- Default: 10 - Good balance for most use cases
- Higher (20-50): Fast APIs, small items, high rate limits
- Lower (3-5): Rate-limited APIs, large contexts, memory constraints
- 1: Sequential processing (rarely needed)
For-each groups support three failure modes:
Stop immediately when the first item fails. Remaining items are cancelled.
for_each:
- name: validators
source: inputs.output.data
as: item
failure_mode: fail_fast
agent:
prompt: "Validate {{ item }}"Use case: Critical validation where any failure should halt the workflow.
Behavior:
- First item failure → entire for-each fails immediately
- Items in the current batch may complete before cancellation
- No outputs are stored
- Exception is raised with error details
Continue processing all items. Workflow proceeds if at least one item succeeds.
for_each:
- name: fetchers
source: sources.output.urls
as: url
failure_mode: continue_on_error
agent:
prompt: "Fetch data from {{ url }}"Use case: Data gathering where partial success is acceptable (e.g., fetching from multiple sources).
Behavior:
- All items run to completion
- Successful outputs stored in
outputs - Failed items stored in
errorswith exception details - Workflow continues if any item succeeded
- Raises error if all items fail
Error structure:
{
"fetchers": {
"outputs": {
"0": {"data": "..."},
"2": {"data": "..."}
},
"errors": {
"1": {
"error": "TimeoutError",
"message": "Request timed out",
"index": 1,
"item": "https://slow-api.com"
}
}
}
}Process all items to completion. Fail if any item fails.
for_each:
- name: checks
source: tasks.output.items
as: task
failure_mode: all_or_nothing
agent:
prompt: "Check {{ task }}"Use case: Pre-deployment checks where all must pass but you want to see all failures.
Behavior:
- All items run to completion
- All outputs collected
- Raises error if any item failed (after all complete)
- Useful for seeing all failures, not just the first one
After for-each execution completes, outputs are aggregated based on whether key_by is specified:
When key_by is not specified, outputs are a list indexed by position:
for_each:
- name: processors
source: finder.output.items
as: item
agent:
output:
result: { type: string }Output structure:
{
"processors": {
"outputs": [
{"result": "processed item 0"},
{"result": "processed item 1"},
{"result": "processed item 2"}
],
"errors": {}
}
}Accessing in downstream agents:
agents:
- name: aggregator
prompt: |
First result: {{ processors.outputs[0].result }}
Second result: {{ processors.outputs[1].result }}
All results:
{% for result in processors.outputs %}
- {{ result.result }}
{% endfor %}When key_by is specified, outputs are a dictionary keyed by extracted values:
for_each:
- name: analyzers
source: finder.output.kpis
as: kpi
key_by: kpi.kpi_id # Extract kpi.kpi_id as key
agent:
output:
analysis: { type: string }Output structure:
{
"analyzers": {
"outputs": {
"KPI-123": {"analysis": "..."},
"KPI-456": {"analysis": "..."},
"KPI-789": {"analysis": "..."}
},
"errors": {}
}
}Accessing in downstream agents:
agents:
- name: aggregator
prompt: |
KPI-123 analysis: {{ analyzers.outputs["KPI-123"].analysis }}
KPI-456 analysis: {{ analyzers.outputs["KPI-456"].analysis }}
All analyses:
{% for kpi_id, output in analyzers.outputs.items() %}
{{ kpi_id }}: {{ output.analysis }}
{% endfor %}Error structure with key_by:
When using key_by with continue_on_error or all_or_nothing failure modes, errors are keyed using the same extracted keys (or indices if key extraction fails):
{
"analyzers": {
"outputs": {
"KPI-123": {"analysis": "..."},
"KPI-789": {"analysis": "..."}
},
"errors": {
"KPI-456": {
"error": "ValidationError",
"message": "Missing required field: metric",
"index": 1,
"key": "KPI-456"
}
}
}
}Accessing errors in templates:
agents:
- name: reporter
prompt: |
{% if analyzers.errors is defined and analyzers.errors %}
Failed KPIs:
{% for kpi_id, error in analyzers.errors.items() %}
- {{ kpi_id }}: {{ error.message }}
{% endfor %}
{% endif %}If key extraction fails for any item, it falls back to index:
key_by: item.id # Some items might not have 'id' fieldBehavior:
- Items with valid keys: Use extracted key
- Items with missing/invalid keys: Use index (0, 1, 2, ...)
- Mixed dict:
{"key1": {...}, "0": {...}, "key2": {...}}
When keys conflict, later items overwrite earlier ones.
For-each groups handle empty arrays gracefully:
source: finder.output.items # Resolves to []Behavior:
- For-each completes immediately
- No agent executions
- Without
key_by:outputs = [](empty list) - With
key_by:outputs = {}(empty dict) - No errors
- Routes are evaluated normally
Downstream agents can check for empty outputs:
agents:
- name: aggregator
prompt: |
{% if processors.outputs | length == 0 %}
No items to process.
{% else %}
Processing {{ processors.outputs | length }} results...
{% endif %}Each for-each agent instance receives an immutable context snapshot plus injected loop variables:
for_each:
- name: processors
source: finder.output.items
as: item
agent:
input:
- workflow.input.config
- finder.output.metadata
prompt: |
Config: {{ workflow.input.config }}
Metadata: {{ finder.output.metadata }}
Current item: {{ item }}
Index: {{ _index }}Context includes:
- Workflow inputs (all inputs if using default context mode, or only declared inputs if using
context: mode: explicit) - Outputs from agents executed before the for-each group
- Injected loop variables:
{{ item }},{{ _index }},{{ _key }}
Context excludes:
- Outputs from other items in the same for-each group
- Outputs from agents after the for-each group
Note on workflow context modes:
- By default, workflows use
accumulatemode where all previous agent outputs are available - You can use
context: mode: explicitat the workflow level to require agents to declare their inputs - With explicit mode, each for-each agent must list its inputs (as shown in the example above)
- Explicit mode can improve clarity and performance for workflows with many agents
This ensures:
- No race conditions: Items cannot interfere with each other
- Deterministic behavior: Results are consistent across runs
- Clean isolation: Each item processes independently
For-each groups support routing just like regular agents:
for_each:
- name: processors
source: finder.output.items
as: item
failure_mode: continue_on_error
agent:
# ... agent definition ...
routes:
- to: success_handler
when: "{{ processors.outputs | length >= 5 }}"
- to: partial_handler
when: "{{ processors.outputs | length > 0 }}"
- to: failure_handlerAvailable in route conditions:
{{ group_name.outputs }}- Aggregated outputs{{ group_name.errors }}- Errors (ifcontinue_on_errororall_or_nothing)- All previous agent outputs
workflow:
name: kpi-analysis
entry_point: kpi_finder
runtime:
provider: copilot
default_model: claude-sonnet-4.5
for_each:
- name: kpi_analyzers
type: for_each
description: Analyze each KPI in parallel
source: kpi_finder.output.kpis
as: kpi
max_concurrent: 5
failure_mode: continue_on_error
key_by: kpi.kpi_id
agent:
model: claude-opus-4.5
prompt: |
You are a KPI analyst. Analyze this KPI:
KPI ID: {{ kpi.kpi_id }}
KPI Name: {{ kpi.name }}
Current Value: {{ kpi.value }}
Target: {{ kpi.target }}
Provide:
1. Status assessment (on track, at risk, off track)
2. Trend analysis
3. Recommendations for improvement
output:
status:
type: string
description: "on track | at risk | off track"
trend:
type: string
recommendations:
type: array
confidence:
type: number
routes:
- to: aggregator
agents:
- name: kpi_finder
prompt: |
Find all KPIs for Q4 2024.
Return as a structured list with kpi_id, name, value, target.
output:
kpis:
type: array
routes:
- to: kpi_analyzers
- name: aggregator
input:
- kpi_finder.output
- kpi_analyzers.outputs
- kpi_analyzers.errors
prompt: |
Create an executive summary of KPI analysis results:
Total KPIs: {{ kpi_finder.output.kpis | length }}
Analyzed: {{ kpi_analyzers.outputs | length }}
Failed: {{ kpi_analyzers.errors | length }}
{% if kpi_analyzers.errors %}
Failed KPIs:
{% for kpi_id, error in kpi_analyzers.errors.items() %}
- {{ kpi_id }}: {{ error.message }}
{% endfor %}
{% endif %}
Successful Analyses:
{% for kpi_id, analysis in kpi_analyzers.outputs.items() %}
{{ kpi_id }}: {{ analysis.status }} - {{ analysis.trend }}
{% endfor %}
Provide:
1. Overall health score
2. Critical issues requiring immediate attention
3. Positive trends to highlight
output:
summary:
type: string
health_score:
type: number
routes:
- to: $end
output:
summary: "{{ aggregator.output.summary }}"
health_score: "{{ aggregator.output.health_score }}"
total_kpis: "{{ kpi_finder.output.kpis | length }}"
analyzed: "{{ kpi_analyzers.outputs | length }}"workflow:
name: batch-processor
entry_point: data_loader
runtime:
provider: copilot
for_each:
- name: item_processors
type: for_each
source: data_loader.output.items
as: item
max_concurrent: 10
failure_mode: all_or_nothing
agent:
prompt: |
Process this item:
{{ item | json }}
Extract and transform the data.
output:
processed_data:
type: object
routes:
- to: $end
agents:
- name: data_loader
prompt: "Load the dataset from {{ workflow.input.source }}"
output:
items:
type: array
routes:
- to: item_processors
output:
results: "{{ item_processors.outputs | json }}"
total_processed: "{{ item_processors.outputs | length }}"for_each:
- name: validators
type: for_each
source: checker.output.items
as: item
failure_mode: continue_on_error
agent:
prompt: "Validate {{ item }}"
output:
valid: { type: boolean }
routes:
- to: success_path
when: "{{ validators.outputs | length == (checker.output.items | length) }}"
- to: partial_path
when: "{{ validators.outputs | length > 0 }}"
- to: failure_pathConsider your constraints:
# High throughput, fast API
max_concurrent: 20
# Rate-limited API (e.g., 10 requests/second)
max_concurrent: 5
# Large context or memory constrained
max_concurrent: 3
# Need sequential processing (rare)
max_concurrent: 1When items have unique IDs, use key_by for clearer output access:
# ✅ Good: Access by meaningful ID
key_by: kpi.kpi_id
# Access: {{ analyzers.outputs["KPI-123"] }}
# ❌ Avoid: Access by index requires knowing order
# Access: {{ analyzers.outputs[0] }} # Which KPI is this?Use continue_on_error for resilient data gathering:
for_each:
- name: fetchers
failure_mode: continue_on_error
# ...
agents:
- name: aggregator
prompt: |
{% if fetchers.errors %}
Warning: {{ fetchers.errors | length }} sources failed
{% endif %}
Processing {{ fetchers.outputs | length }} successful results...Ensure source arrays are well-formed:
agents:
- name: finder
# Add validation to output schema
output:
items:
type: array
description: "Must be non-null array"
# Check in prompt
prompt: |
Find items. Return as an array, even if empty: []Each for-each agent should do one specific task:
# ✅ Good: Focused task
agent:
prompt: "Analyze KPI {{ kpi.kpi_id }}"
# ❌ Avoid: Multiple unrelated tasks
agent:
prompt: |
Analyze {{ kpi.kpi_id }}
Also check database consistency
And send notifications
And update the dashboardFor large arrays (100+ items), consider breaking into multiple for-each groups:
# Instead of processing 500 items in one for-each:
# agents: [splitter, batch_1, batch_2, batch_3, merger]
# Option 1: Use higher max_concurrent
for_each:
- name: processors
max_concurrent: 50 # Process 10 batches of 50
# Option 2: Split in workflow logic
agents:
- name: splitter
# Create batches
- name: batch_processor_1
# Process first batch
- name: batch_processor_2
# Process second batchError: "Array reference 'finder.output.items' not found in context"
Solution: Ensure the source agent has completed and produced the expected output:
# Check agent output schema
agents:
- name: finder
output:
items: # Must match source reference
type: arrayError: "Source 'finder.output.result' resolved to <class 'str'>, expected list"
Solution: Ensure the source path points to an array field:
# ❌ Wrong: points to string field
source: finder.output.summary
# ✅ Correct: points to array field
source: finder.output.itemsError: "Loop variable 'workflow' conflicts with reserved name"
Solution: Use a different variable name. Reserved: workflow, context, output, _index, _key
# ❌ Invalid
as: workflow
# ✅ Valid
as: item
as: kpi
as: taskIssue: Some items don't have the key field specified in key_by
Behavior: System automatically falls back to index
key_by: item.id
# Item 0: has item.id="abc" → outputs["abc"]
# Item 1: missing item.id → outputs["1"] (fallback)
# Item 2: has item.id="def" → outputs["def"]Solution: If you want strict key extraction, validate in the finder agent:
agents:
- name: finder
prompt: "Ensure every item has an 'id' field"
output:
items:
type: array
description: "Each item must have 'id' field"Error: "All items failed in for-each group 'processors'"
Solution:
- Check agent prompt is valid for the item structure
- Verify items match expected schema
- Test with a small sample first
- Check verbose logs:
conductor run --log-file debug.log
Problem: Workflow uses too much memory with 1000+ items
Solutions:
- Reduce
max_concurrentto limit parallel executions - Break into multiple for-each groups
- Use explicit context mode to reduce snapshot size
- Consider pagination in the finder agent
context:
mode: explicit # Reduce context size
for_each:
- name: processors
max_concurrent: 5 # Lower concurrency- No nested for-each: For-each groups cannot contain other for-each or parallel groups
- Source must be array: Only list types are supported; dict iteration not supported
- Inline agent only: Agents must be defined inline, cannot reference existing agents
- No dynamic
max_concurrent: Batch size is fixed at workflow definition time - Key conflicts: When using
key_by, duplicate keys cause overwrites
For N items with max_concurrent=M:
- Best case: ~M× faster than sequential
- Typical: Depends on item processing time variance
- Worst case: Limited by slowest item in each batch
- Context snapshot per item (deep copy)
- Asyncio task scheduling
- Output aggregation
For large arrays (1000+ items), total overhead is typically <5% of execution time.
Converting static parallel to for-each:
Before (static):
parallel:
- name: researchers
agents: [researcher_1, researcher_2, researcher_3]
agents:
- name: researcher_1
prompt: "Research {{ topics[0] }}"
- name: researcher_2
prompt: "Research {{ topics[1] }}"
- name: researcher_3
prompt: "Research {{ topics[2] }}"After (for-each):
for_each:
- name: researchers
type: for_each
source: topic_finder.output.topics
as: topic
agent:
prompt: "Research {{ topic }}"
agents:
- name: topic_finder
# Finds topics dynamically- Workflow YAML Syntax - Complete syntax reference
- Parallel Execution Guide - Static parallel groups
- Examples - Complete workflow examples
- Context Management - Understanding context modes