Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ec3bcfb
feat: add --concurrency and --parallel flags
branchseer Mar 29, 2026
7666108
docs: update changelog with PR links for --concurrency and --parallel
branchseer Mar 29, 2026
699176d
docs: simplify changelog wording
branchseer Mar 29, 2026
59cb4be
refactor: rename --concurrency to --concurrency-limit, default to 4
branchseer Mar 29, 2026
259588f
fix: remove broken CONCURRENCY_LIMIT doc link
branchseer Mar 29, 2026
f5ae5a9
docs: add concurrency documentation
branchseer Mar 29, 2026
a471fa6
docs: fix flag ordering in concurrency examples
branchseer Mar 29, 2026
7b35c53
docs: add design rationale to concurrency documentation
branchseer Mar 29, 2026
f22f8f0
refactor: replace concurrency inheritance with VP_RUN_CONCURRENCY_LIM…
branchseer Mar 29, 2026
4f68003
fix: guard interactive selector against execution flags, cap semaphore
branchseer Mar 29, 2026
7c255fb
update docs
branchseer Mar 29, 2026
e9f8d0f
fix: correct concurrency resolution order in docs and comments
branchseer Mar 29, 2026
e105318
test(e2e): add --parallel flag execution test
branchseer Mar 29, 2026
48cbdea
chore: add playground workspace for manual testing
branchseer Mar 31, 2026
b2bf3b9
refactor: remove Session::envs_mut, create per-plan session in plan s…
branchseer Mar 31, 2026
c446d0a
feat: cancel future tasks and prevent caching on Ctrl-C
branchseer Mar 31, 2026
b80acb9
fix: propagate ctrlc::set_handler error instead of ignoring it
branchseer Mar 31, 2026
7327fcf
fix(windows): clear inherited CTRL_C ignore flag before setting handler
branchseer Mar 31, 2026
9ed6a49
docs: add cancellation docs and changelog entry for Ctrl-C handling
branchseer Mar 31, 2026
9fda35c
docs: fix playground README command reference
branchseer Mar 31, 2026
bf2ea55
docs: group test-related sections under Testing in CONTRIBUTING.md
branchseer Mar 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

- **Fixed** Ctrl-C now prevents future tasks from being scheduled and prevents caching of in-flight task results ([#309](https://github.com/voidzero-dev/vite-task/pull/309))
- **Added** `--concurrency-limit` flag to limit the number of tasks running at the same time (defaults to 4) ([#288](https://github.com/voidzero-dev/vite-task/pull/288), [#309](https://github.com/voidzero-dev/vite-task/pull/309))
- **Added** `--parallel` flag to ignore task dependencies and run all tasks at once with unlimited concurrency (unless `--concurrency-limit` is also specified) ([#309](https://github.com/voidzero-dev/vite-task/pull/309))
- **Added** object form for `input` entries: `{ "pattern": "...", "base": "workspace" | "package" }` to resolve glob patterns relative to the workspace root instead of the package directory ([#295](https://github.com/voidzero-dev/vite-task/pull/295))
- **Fixed** arguments after the task name being consumed by `vp` instead of passed through to the task ([#286](https://github.com/voidzero-dev/vite-task/pull/286), [#290](https://github.com/voidzero-dev/vite-task/pull/290))
- **Changed** default untracked env patterns to align with Turborepo, covering more CI and platform-specific variables ([#262](https://github.com/voidzero-dev/vite-task/pull/262))
Expand Down
15 changes: 14 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ just lint # Clippy linting
just doc # Generate documentation
```

### Running Specific Tests
## Testing

### Running Tests

```bash
cargo test # All tests
Expand All @@ -55,6 +57,17 @@ Integration tests (e2e, plan, fspy) require `pnpm install` in `packages/tools` f

See individual crate READMEs for crate-specific testing details.

### Playground

The `playground/` directory is a small workspace for manually testing the task runner. It has three packages (`app → lib → utils`) with cached tasks (`build`, `test`, `lint`, `typecheck`) and an uncached `dev` script.

```bash
cargo run --bin vt -- run -r build # run build across all packages
cargo run --bin vt -- run -r --parallel dev # start all dev scripts in parallel
```

See `playground/README.md` for the full task list and dependency structure.

## Cross-Platform Development

This project must work on macOS, Linux, and Windows. Skipping tests on any platform is not acceptable.
Expand Down
13 changes: 13 additions & 0 deletions crates/vite_task/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub struct RunFlags {
/// How task output is displayed.
#[clap(long, default_value = "interleaved")]
pub log: LogMode,

/// Maximum number of tasks to run concurrently. Defaults to 4.
#[clap(long)]
pub concurrency_limit: Option<usize>,

/// Run tasks without dependency ordering. Sets concurrency to unlimited
/// unless `--concurrency-limit` is also specified.
#[clap(long, default_value = "false")]
pub parallel: bool,
}

impl RunFlags {
Expand Down Expand Up @@ -206,6 +215,8 @@ impl ResolvedRunCommand {

let cache_override = self.flags.cache_override();
let include_explicit_deps = !self.flags.ignore_depends_on;
let concurrency_limit = self.flags.concurrency_limit.map(|n| n.max(1));
let parallel = self.flags.parallel;

let (package_query, is_cwd_only) =
self.flags.package_query.into_package_query(task_specifier.package_name, cwd)?;
Expand All @@ -220,6 +231,8 @@ impl ResolvedRunCommand {
plan_options: PlanOptions {
extra_args: self.additional_args.into(),
cache_override,
concurrency_limit,
parallel,
},
},
is_cwd_only,
Expand Down
7 changes: 7 additions & 0 deletions crates/vite_task/src/session/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ pub enum CacheNotUpdatedReason {
CacheDisabled,
/// Execution exited with non-zero status
NonZeroExitStatus,
/// Execution was cancelled before the result could be trusted.
/// Two possible causes:
/// - Ctrl-C: the user interrupted execution; the task may have
/// exited successfully but without completing its intended work.
/// - Fast-fail: a sibling task failed, triggering cancellation
/// while this task was still running.
Cancelled,
/// Task modified files it read during execution (read-write overlap detected by fspy).
/// Caching such tasks is unsound because the prerun input hashes become stale.
InputModified {
Expand Down
80 changes: 49 additions & 31 deletions crates/vite_task/src/session/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub enum SpawnOutcome {
Failed,
}

/// Maximum number of tasks that can execute concurrently within a single
/// execution graph level.
const CONCURRENCY_LIMIT: usize = 10;

/// Holds shared references needed during graph execution.
///
/// The `reporter` field is wrapped in `RefCell` because concurrent futures
Expand All @@ -71,34 +67,48 @@ struct ExecutionContext<'a> {
/// Base path for resolving relative paths in cache entries.
/// Typically the workspace root.
cache_base_path: &'a Arc<AbsolutePath>,
/// Token for cancelling in-flight child processes.
cancellation_token: CancellationToken,
/// Token cancelled when a task fails. Kills in-flight child processes
/// (via `start_kill` in spawn.rs), prevents scheduling new tasks, and
/// prevents caching results of concurrently-running tasks.
fast_fail_token: CancellationToken,
/// Token cancelled by Ctrl-C. Unlike `fast_fail_token` (which kills
/// children), this only prevents scheduling new tasks and caching
/// results — running processes are left to handle SIGINT naturally.
interrupt_token: CancellationToken,
}

impl ExecutionContext<'_> {
/// Returns true if execution has been cancelled, either by a task
/// failure (fast-fail) or by Ctrl-C (interrupt).
fn cancelled(&self) -> bool {
self.fast_fail_token.is_cancelled() || self.interrupt_token.is_cancelled()
}

/// Execute all tasks in an execution graph concurrently, respecting dependencies.
///
/// Uses a DAG scheduler: tasks whose dependencies have all completed are scheduled
/// onto a `FuturesUnordered`, bounded by a per-graph `Semaphore` with
/// [`CONCURRENCY_LIMIT`] permits. Each recursive `Expanded` graph creates its own
/// `concurrency_limit` permits. Each recursive `Expanded` graph creates its own
/// semaphore, so nested graphs have independent concurrency limits.
///
/// Fast-fail: if any task fails, `execute_leaf` cancels the `CancellationToken`
/// (killing in-flight child processes). This method detects the cancellation,
/// closes the semaphore, drains remaining futures, and returns.
/// Fast-fail: if any task fails, `execute_leaf` cancels the `fast_fail_token`
/// (killing in-flight child processes). Ctrl-C cancels the `interrupt_token`.
/// Either cancellation causes this method to close the semaphore, drain
/// remaining futures, and return.
#[tracing::instrument(level = "debug", skip_all)]
async fn execute_expanded_graph(&self, graph: &ExecutionGraph) {
if graph.node_count() == 0 {
if graph.graph.node_count() == 0 {
return;
}

let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
let semaphore =
Arc::new(Semaphore::new(graph.concurrency_limit.min(Semaphore::MAX_PERMITS)));

// Compute dependency count for each node.
// Edge A→B means "A depends on B", so A's dependency count = outgoing edge count.
let mut dep_count: FxHashMap<ExecutionNodeIndex, usize> = FxHashMap::default();
for node_ix in graph.node_indices() {
dep_count.insert(node_ix, graph.neighbors(node_ix).count());
for node_ix in graph.graph.node_indices() {
dep_count.insert(node_ix, graph.graph.neighbors(node_ix).count());
}

let mut futures = FuturesUnordered::new();
Expand All @@ -114,7 +124,7 @@ impl ExecutionContext<'_> {
// On failure, `execute_leaf` cancels the token — we detect it here, close
// the semaphore (so pending acquires fail immediately), and drain.
while let Some(completed_ix) = futures.next().await {
if self.cancellation_token.is_cancelled() {
if self.cancelled() {
semaphore.close();
while futures.next().await.is_some() {}
return;
Expand All @@ -123,7 +133,7 @@ impl ExecutionContext<'_> {
// Find dependents of the completed node (nodes that depend on it).
// Edge X→completed means "X depends on completed", so X is a predecessor
// in graph direction = neighbor in Incoming direction.
for dependent in graph.neighbors_directed(completed_ix, Direction::Incoming) {
for dependent in graph.graph.neighbors_directed(completed_ix, Direction::Incoming) {
let count = dep_count.get_mut(&dependent).expect("all nodes are in dep_count");
*count -= 1;
if *count == 0 {
Expand All @@ -135,7 +145,7 @@ impl ExecutionContext<'_> {

/// Create a future that acquires a semaphore permit, then executes a graph node.
///
/// On failure, `execute_node` cancels the `CancellationToken` — the caller
/// On failure, `execute_node` cancels the `fast_fail_token` — the caller
/// detects this after the future completes. On semaphore closure or prior
/// cancellation, the node is skipped.
fn spawn_node<'a>(
Expand All @@ -147,7 +157,7 @@ impl ExecutionContext<'_> {
let sem = semaphore.clone();
async move {
if let Ok(_permit) = sem.acquire_owned().await
&& !self.cancellation_token.is_cancelled()
&& !self.cancelled()
{
self.execute_node(graph, node_ix).await;
}
Expand All @@ -159,13 +169,13 @@ impl ExecutionContext<'_> {
/// Execute a single node's items sequentially.
///
/// A node may have multiple items (from `&&`-split commands). Items are executed
/// in order; if any item fails, `execute_leaf` cancels the `CancellationToken`
/// in order; if any item fails, `execute_leaf` cancels the `fast_fail_token`
/// and remaining items are skipped (preserving `&&` semantics).
async fn execute_node(&self, graph: &ExecutionGraph, node_ix: ExecutionNodeIndex) {
let task_execution = &graph[node_ix];
let task_execution = &graph.graph[node_ix];

for item in &task_execution.items {
if self.cancellation_token.is_cancelled() {
if self.cancelled() {
return;
}
match &item.kind {
Expand All @@ -183,7 +193,7 @@ impl ExecutionContext<'_> {
///
/// Creates a [`LeafExecutionReporter`] from the graph reporter and delegates
/// to the appropriate execution method. On failure (non-zero exit or
/// infrastructure error), cancels the `CancellationToken`.
/// infrastructure error), cancels the `fast_fail_token`.
#[tracing::instrument(level = "debug", skip_all)]
async fn execute_leaf(&self, display: &ExecutionItemDisplay, leaf_kind: &LeafExecutionKind) {
// Borrow the reporter briefly to create the leaf reporter, then drop
Expand Down Expand Up @@ -218,7 +228,8 @@ impl ExecutionContext<'_> {
spawn_execution,
self.cache,
self.cache_base_path,
self.cancellation_token.clone(),
self.fast_fail_token.clone(),
self.interrupt_token.clone(),
)
.await;
match outcome {
Expand All @@ -229,7 +240,7 @@ impl ExecutionContext<'_> {
}
};
if failed {
self.cancellation_token.cancel();
self.fast_fail_token.cancel();
}
}
}
Expand Down Expand Up @@ -258,7 +269,8 @@ pub async fn execute_spawn(
spawn_execution: &SpawnExecution,
cache: &ExecutionCache,
cache_base_path: &Arc<AbsolutePath>,
cancellation_token: CancellationToken,
fast_fail_token: CancellationToken,
interrupt_token: CancellationToken,
) -> SpawnOutcome {
let cache_metadata = spawn_execution.cache_metadata.as_ref();

Expand Down Expand Up @@ -351,7 +363,7 @@ pub async fn execute_spawn(
// while the child also writes to the same FD.
drop(stdio_config);

match spawn_inherited(&spawn_execution.spawn_command, cancellation_token).await {
match spawn_inherited(&spawn_execution.spawn_command, fast_fail_token).await {
Ok(result) => {
leaf_reporter.finish(
Some(result.exit_status),
Expand Down Expand Up @@ -422,7 +434,7 @@ pub async fn execute_spawn(
std_outputs.as_mut(),
path_accesses.as_mut(),
&resolved_negatives,
cancellation_token,
fast_fail_token.clone(),
)
.await
{
Expand All @@ -442,7 +454,11 @@ pub async fn execute_spawn(
let (cache_update_status, cache_error) = if let Some((cache_metadata, globbed_inputs)) =
cache_metadata_and_inputs
{
if result.exit_status.success() {
let cancelled = fast_fail_token.is_cancelled() || interrupt_token.is_cancelled();
if cancelled {
// Cancelled (Ctrl-C or sibling failure) — result is untrustworthy
(CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::Cancelled), None)
} else if result.exit_status.success() {
// Check for read-write overlap: if the task wrote to any file it also
// read, the inputs were modified during execution — don't cache.
// Note: this only checks fspy-inferred reads, not globbed_inputs keys.
Expand Down Expand Up @@ -522,7 +538,7 @@ pub async fn execute_spawn(
#[tracing::instrument(level = "debug", skip_all)]
async fn spawn_inherited(
spawn_command: &SpawnCommand,
cancellation_token: CancellationToken,
fast_fail_token: CancellationToken,
) -> anyhow::Result<SpawnResult> {
let mut cmd = fspy::Command::new(spawn_command.program_path.as_path());
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
Expand Down Expand Up @@ -582,7 +598,7 @@ async fn spawn_inherited(

let exit_status = tokio::select! {
status = child.wait() => status?,
() = cancellation_token.cancelled() => {
() = fast_fail_token.cancelled() => {
child.start_kill()?;
child.wait().await?
}
Expand Down Expand Up @@ -697,6 +713,7 @@ impl Session<'_> {
&self,
execution_graph: ExecutionGraph,
builder: Box<dyn GraphExecutionReporterBuilder>,
interrupt_token: CancellationToken,
) -> Result<(), ExitStatus> {
// Initialize cache before building the reporter. Cache errors are reported
// directly to stderr and cause an early exit, keeping the reporter flow clean
Expand All @@ -716,7 +733,8 @@ impl Session<'_> {
reporter: &reporter,
cache,
cache_base_path: &self.workspace_path,
cancellation_token: CancellationToken::new(),
fast_fail_token: CancellationToken::new(),
interrupt_token,
};

// Execute the graph with fast-fail: if any task fails, remaining tasks
Expand Down
8 changes: 4 additions & 4 deletions crates/vite_task/src/session/execute/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub async fn spawn_with_tracking(
std_outputs: Option<&mut Vec<StdOutput>>,
path_accesses: Option<&mut TrackedPathAccesses>,
resolved_negatives: &[wax::Glob<'static>],
cancellation_token: CancellationToken,
fast_fail_token: CancellationToken,
) -> anyhow::Result<SpawnResult> {
let mut cmd = fspy::Command::new(spawn_command.program_path.as_path());
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
Expand All @@ -108,7 +108,7 @@ pub async fn spawn_with_tracking(
let (mut child_stdout, mut child_stderr, mut child_wait) = if path_accesses.is_some() {
// fspy tracking enabled — fspy manages cancellation internally via a clone
// of the token. We keep the original for the pipe read loop.
let mut tracked_child = cmd.spawn(cancellation_token.clone()).await?;
let mut tracked_child = cmd.spawn(fast_fail_token.clone()).await?;
let stdout = tracked_child.stdout.take().unwrap();
let stderr = tracked_child.stderr.take().unwrap();
#[cfg(windows)]
Expand Down Expand Up @@ -193,7 +193,7 @@ pub async fn spawn_with_tracking(
}
}
}
() = cancellation_token.cancelled() => {
() = fast_fail_token.cancelled() => {
// Kill the direct child (no-op for fspy which handles it internally).
if let ChildWait::Tokio(ref mut child) = child_wait {
let _ = child.start_kill();
Expand Down Expand Up @@ -291,7 +291,7 @@ pub async fn spawn_with_tracking(
ChildWait::Tokio(mut child) => {
let exit_status = tokio::select! {
status = child.wait() => status?,
() = cancellation_token.cancelled() => {
() = fast_fail_token.cancelled() => {
child.start_kill()?;
child.wait().await?
}
Expand Down
Loading
Loading