Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/skills/conductor/references/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ conductor update
The command:
1. Fetches the latest release from the GitHub Releases API
2. Compares the remote version with the locally installed version
3. If a newer version is available, runs `uv tool install --force git+https://github.com/microsoft/conductor.git@v{version}` to upgrade
3. If a newer version is available, runs `uv tool install --force --locked git+https://github.com/microsoft/conductor.git@v{version}` to upgrade
4. Clears the update-check cache on success so the next invocation re-checks cleanly

If already up to date, prints a confirmation message and exits.
Expand Down
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ make validate-examples # validate all examples
- `run.py` - Workflow execution command with verbose logging helpers
- `bg_runner.py` - Background process forking for `--web-bg` mode
- `pid.py` - PID file utilities for tracking/stopping background processes
- `update.py` - Update check, version comparison, and self-upgrade via `uv tool install`
- `update.py` - Update check, version comparison, and self-upgrade via `uv tool install --locked`

- **config/**: YAML loading and Pydantic schema validation
- `schema.py` - Pydantic models for all workflow YAML structures (WorkflowConfig, AgentDef, ParallelGroup, ForEachDef, etc.)
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ Conductor provides the patterns that work: evaluator-optimizer loops for iterati
### Using uv (Recommended)

```bash
# Install from GitHub
uv tool install git+https://github.com/microsoft/conductor.git
# Install from GitHub (--locked ensures reproducible dependency versions)
uv tool install --locked git+https://github.com/microsoft/conductor.git

# Run the CLI
conductor run workflow.yaml
Expand All @@ -38,9 +38,9 @@ conductor run workflow.yaml
uvx --from git+https://github.com/microsoft/conductor.git conductor run workflow.yaml

# Install a specific branch, tag, or commit
uv tool install git+https://github.com/microsoft/conductor.git@branch-name
uv tool install git+https://github.com/microsoft/conductor.git@v1.0.0
uv tool install git+https://github.com/microsoft/conductor.git@abc1234
uv tool install --locked git+https://github.com/microsoft/conductor.git@branch-name
uv tool install --locked git+https://github.com/microsoft/conductor.git@v1.0.0
uv tool install --locked git+https://github.com/microsoft/conductor.git@abc1234
```

### Using pipx
Expand Down
2 changes: 1 addition & 1 deletion examples/design.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ workflow:
MODE: stdio
DEFAULT_SEARCH_ENGINE: duckduckgo
ALLOWED_SEARCH_ENGINES: duckduckgo,brave,exa
tools: ["*"]
tools: ["search"]
context7:
command: npx
args: ["-y", "@upstash/context7-mcp@latest"]
Expand Down
2 changes: 1 addition & 1 deletion examples/implement.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ workflow:
MODE: stdio
DEFAULT_SEARCH_ENGINE: duckduckgo
ALLOWED_SEARCH_ENGINES: duckduckgo,brave,exa
tools: ["*"]
tools: ["search"]
context7:
command: npx
args: ["-y", "@upstash/context7-mcp@latest"]
Expand Down
2 changes: 1 addition & 1 deletion examples/parallel-research.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ workflow:
MODE: stdio
DEFAULT_SEARCH_ENGINE: duckduckgo
ALLOWED_SEARCH_ENGINES: duckduckgo,brave,exa
tools: ["*"]
tools: ["search"]

input:
topic:
Expand Down
2 changes: 1 addition & 1 deletion examples/plan.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ workflow:
MODE: stdio
DEFAULT_SEARCH_ENGINE: duckduckgo
ALLOWED_SEARCH_ENGINES: duckduckgo,brave,exa
tools: ["*"]
tools: ["search"]
context7:
command: npx
args: ["-y", "@upstash/context7-mcp@latest"]
Expand Down
2 changes: 1 addition & 1 deletion examples/research-assistant.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ workflow:
MODE: stdio
DEFAULT_SEARCH_ENGINE: duckduckgo
ALLOWED_SEARCH_ENGINES: duckduckgo,brave,exa
tools: ["*"]
tools: ["search"]

context:
mode: explicit # Only declared inputs are available
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-qa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ workflow:
web-search:
command: sh
args: ["-c", "MODE=stdio DEFAULT_SEARCH_ENGINE=bing exec npx -y open-websearch@latest"]
tools: ["*"]
tools: ["search"]

input:
question:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies = [
"ruamel.yaml>=0.18.0",
"jinja2>=3.1.0",
"simpleeval>=1.0.0",
"github-copilot-sdk>=0.1.28,<0.1.31", # >=0.1.28 for on_permission_request; <0.1.31 regression, see #27
"github-copilot-sdk>=0.2.0",
"anthropic>=0.77.0,<1.0.0",
"mcp>=1.0.0",
"fastapi>=0.115.0",
Expand Down
14 changes: 11 additions & 3 deletions src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,9 +930,9 @@ async def _run_with_stop_signal(
inputs: dict[str, Any],
dashboard: Any | None,
) -> dict[str, Any]:
"""Run the workflow engine, racing against a dashboard stop signal.
"""Run the workflow engine, racing against a dashboard kill signal.

When the web dashboard has a stop button clicked (``/api/stop``), the
When the web dashboard's Kill button is clicked (``/api/kill``), the
engine task is cancelled and an ``ExecutionError`` is raised.

If no dashboard is present, this simply awaits ``engine.run()`` directly.
Expand All @@ -946,7 +946,7 @@ async def _run_with_stop_signal(
The workflow result dict.

Raises:
ExecutionError: If the workflow was stopped via the dashboard.
ExecutionError: If the workflow was killed via the dashboard.
"""
if dashboard is None:
return await engine.run(inputs)
Expand Down Expand Up @@ -1096,6 +1096,10 @@ async def run_workflow_async(

interrupt_event = asyncio.Event()
listener = KeyboardListener(interrupt_event=interrupt_event)
elif web:
# In --web mode: no keyboard listener, but still need interrupt_event
# so POST /api/stop can interrupt the running agent mid-execution
interrupt_event = asyncio.Event()

engine = WorkflowEngine(
config,
Expand All @@ -1108,6 +1112,10 @@ async def run_workflow_async(
web_dashboard=dashboard,
)

# Share interrupt_event with dashboard so POST /api/stop can abort agents
if dashboard is not None and interrupt_event is not None:
dashboard.set_interrupt_event(interrupt_event)

try:
if listener is not None:
await listener.start()
Expand Down
2 changes: 1 addition & 1 deletion src/conductor/cli/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def run_update(console: Console) -> None:
console.print(f"Upgrading Conductor: v{current} → v{version}")

install_url = f"git+{_REPO_GIT_URL}@{tag_name}"
cmd = ["uv", "tool", "install", "--force", install_url]
cmd = ["uv", "tool", "install", "--force", "--locked", install_url]

# On Windows, rename our exe out of the way so uv can write the new one.
# Windows locks running executables but allows renaming them.
Expand Down
135 changes: 124 additions & 11 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
import contextlib
import copy
import json
import logging
import time as _time
from dataclasses import dataclass, field
Expand Down Expand Up @@ -687,13 +688,21 @@ async def _wait_for_web_gate(self, agent: AgentDef) -> GateResult:
suggestion="Check the option values in the workflow YAML",
)

# ------------------------------------------------------------------
# Interrupt support
# ------------------------------------------------------------------

async def _check_interrupt(self, current_agent_name: str) -> InterruptResult | None:
"""Check for a pending interrupt and handle it if present.

If the interrupt event is set, clears it, builds an output preview
from the last stored output, and delegates to the InterruptHandler
for user interaction.

In web mode (dashboard connected), the interrupt is consumed
silently — the provider-level racing handles the actual pause/resume
flow, so the between-agent check just needs to clear the stale flag.

Args:
current_agent_name: Name of the agent that just completed
(or the next agent about to run).
Expand All @@ -706,9 +715,15 @@ async def _check_interrupt(self, current_agent_name: str) -> InterruptResult | N

self._interrupt_event.clear()

# Build output preview from last stored output
import json
# In web mode, the interrupt was already handled at the provider level
# (partial output → _handle_web_pause). Consume the stale flag silently.
# We check for dashboard presence only (not has_connections) because in
# --web/--web-bg mode the CLI interactive handler is never appropriate,
# even if clients are transiently disconnected.
if self._web_dashboard is not None:
return None

# Build output preview from last stored output
last_output = self.context.get_latest_output()
last_output_preview: str | None = None
if last_output is not None:
Expand Down Expand Up @@ -760,6 +775,92 @@ async def _handle_interrupt_result(
case InterruptAction.CANCEL:
return current_agent_name

async def _handle_web_pause(self, agent_name: str, partial_output: AgentOutput) -> bool:
"""Handle a mid-agent interrupt when the web dashboard is connected.

Emits an ``agent_paused`` event and waits for the user to click
Resume or Kill in the dashboard. If all browser clients disconnect
while waiting, auto-resumes to avoid hanging the workflow.

Args:
agent_name: The name of the interrupted agent.
partial_output: The partial output from the interrupted agent.

Returns:
True if the agent should be re-executed (Resume chosen or
all clients disconnected), False if no web dashboard is
connected (caller should invoke ``_handle_partial_output``).

Raises:
InterruptError: If the user chose Kill (``POST /api/kill``).
"""
if self._web_dashboard is None or not self._web_dashboard.has_connections():
return False

try:
preview = json.dumps(partial_output.content, indent=2, default=str)[:500]
except (TypeError, ValueError):
preview = str(partial_output.content)[:500]

self._emit(
"agent_paused",
{"agent_name": agent_name, "partial_content": preview},
)
logger.info("Agent '%s' paused — waiting for dashboard resume", agent_name)

resume_event = self._web_dashboard.resume_event
kill_event = self._web_dashboard.kill_event
disconnect_event = self._web_dashboard.disconnect_event

# Clear stale signals from prior pause cycles, then create wait tasks.
# We must check is_set() after creating tasks to close the race window
# where an HTTP handler sets the event between clear() and wait().
resume_event.clear()
kill_event.clear()
disconnect_event.clear()

resume_task = asyncio.create_task(resume_event.wait())
kill_task = asyncio.create_task(kill_event.wait())
disconnect_task = asyncio.create_task(disconnect_event.wait())
tasks = {resume_task, kill_task, disconnect_task}

# If any event was set between clear() and task creation, the task
# will already be done — no need to wait, but we still fall through
# to the normal done/pending handling below.
try:
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
with contextlib.suppress(asyncio.CancelledError):
await t
except Exception:
for t in tasks:
if not t.done():
t.cancel()
with contextlib.suppress(asyncio.CancelledError):
await t
raise

if kill_task in done:
raise InterruptError(agent_name=agent_name)

if disconnect_task in done:
logger.info(
"All dashboard clients disconnected while '%s' was paused — auto-resuming",
agent_name,
)

# Clear resume_event after consumption so a stale signal from a
# double-click or prior API call doesn't skip the next legitimate pause.
resume_event.clear()

self._emit("agent_resumed", {"agent_name": agent_name})
logger.info("Agent '%s' resumed — re-executing", agent_name)
return True

async def _handle_partial_output(
self,
agent: AgentDef,
Expand All @@ -786,17 +887,15 @@ async def _handle_partial_output(
Returns:
The final (non-partial) AgentOutput after handling the interrupt.
"""
import json as _json

from conductor.providers.copilot import CopilotProvider

# Build preview from partial output
try:
preview = _json.dumps(partial_output.content, indent=2, default=str)[:500]
preview = json.dumps(partial_output.content, indent=2, default=str)[:500]
except (TypeError, ValueError):
preview = str(partial_output.content)[:500]

# Invoke the interrupt handler
# CLI mode: invoke interactive interrupt handler
interrupt_result = await self._interrupt_handler.handle_interrupt(
current_agent=agent.name,
iteration=self.context.current_iteration,
Expand Down Expand Up @@ -1271,6 +1370,24 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:

# Handle mid-agent interrupt (partial output)
if output.partial:
if await self._handle_web_pause(agent.name, output):
# Web mode: agent paused then resumed → re-execute.
# Clear interrupt_event to prevent the re-executed agent
# from seeing the stale signal and returning partial again.
if self._interrupt_event is not None:
self._interrupt_event.clear()
continue
# In web mode with no connections, auto-resume rather than
# falling through to the CLI interactive handler (which would
# block on stdin with no tty in --web-bg mode).
if self._web_dashboard is not None:
logger.info(
"No dashboard connections for '%s' — auto-resuming",
agent.name,
)
if self._interrupt_event is not None:
self._interrupt_event.clear()
continue
output = await self._handle_partial_output(
agent,
output,
Expand Down Expand Up @@ -1767,10 +1884,8 @@ def _resolve_workflow_input_array(self, source: str, field_parts: list[str]) ->

# Handle JSON string inputs (CLI passes arrays as strings)
if isinstance(current, str):
import json as _json

try:
parsed = _json.loads(current)
parsed = json.loads(current)
except (ValueError, TypeError):
raise ExecutionError(
f"Source '{source}' resolved to a string that is not valid JSON: {current!r}",
Expand Down Expand Up @@ -2597,8 +2712,6 @@ def _maybe_parse_json(value: str) -> Any:
Returns:
Parsed JSON value if successful, original string otherwise.
"""
import json

stripped = value.strip()
if stripped.startswith(("{", "[", '"')) or stripped in ("true", "false", "null"):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/conductor/interrupt/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class KeyboardListener:
"""

interrupt_event: asyncio.Event
"""Event that is set when an interrupt key is detected."""
"""Event that is set when an interrupt key (Esc/Ctrl+G) is detected."""

_original_settings: Any = field(default=None, repr=False)
"""Saved terminal settings for restoration."""
Expand Down
Loading
Loading