Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions Sources/Vexil/Utilities/AsyncCurrentValue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,7 @@ struct AsyncCurrentValue<Wrapped: Sendable> {
// iterators start with generation = 0, so our initial value
// has generation 1, so even that will be delivered.
var generation = 1
var wrappedValue: Wrapped {
didSet {
generation += 1
for (_, continuation) in pendingContinuations {
continuation.resume(returning: (generation, wrappedValue))
}
pendingContinuations = []
}
}

var wrappedValue: Wrapped
var pendingContinuations = [(UUID, CheckedContinuation<(Int, Wrapped)?, Never>)]()
}

Expand Down Expand Up @@ -68,18 +59,39 @@ struct AsyncCurrentValue<Wrapped: Sendable> {
/// - body: A closure that passes the current value as an in-out parameter that you can mutate.
/// When the closure returns the mutated value is saved as the current value and is sent to all subscribers.
///
func update<R: Sendable>(_ body: (inout sending Wrapped) throws -> R) rethrows -> R {
try allocation.mutex.withLock { state in
func update<R: Sendable, E: Error>(_ body: (inout sending Wrapped) throws(E) -> R) throws(E) -> R {
let result: Result<R, E>
let generation: Int
let pendingContinuations: [CheckedContinuation<(Int, Wrapped)?, Never>]
let updatedValue: Wrapped

// If we resume continuations within the context of this lock we risk a deadlock
// as they attempt to access the next value. So we do the update and return
// pending continuations to be resumed outside the lock. It should be impossible
// for new continuations to miss this generation as they're accessed and added
// within the same lock closure.

(result, updatedValue, generation, pendingContinuations) = allocation.mutex.withLock { state in

// The closure mutates a copy, then we save that back to our state
var wrappedValue = state.wrappedValue
do {
let result = try body(&wrappedValue)
state.wrappedValue = wrappedValue
return result
} catch {
state.wrappedValue = wrappedValue
throw error
let result = Result { () throws(E) -> R in
try body(&wrappedValue)
}
state.wrappedValue = wrappedValue

// Bump generation and grab pending continuations
state.generation += 1
let toResume = state.pendingContinuations.map(\.1)
state.pendingContinuations = []
return (result, wrappedValue, state.generation, toResume)
}

// Resume our pending continuations
for continuation in pendingContinuations {
continuation.resume(returning: (generation, updatedValue))
}
return try result.get()
}

}
Expand Down
71 changes: 71 additions & 0 deletions Tests/VexilTests/Utilities/AsyncCurrentValueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Vexil open source project
//
// Copyright (c) 2026 Unsigned Apps and the open source contributors.
// Licensed under the MIT license
//
// See LICENSE for license information
//
// SPDX-License-Identifier: MIT
//
//===----------------------------------------------------------------------===//

import Foundation
import Testing
@testable import Vexil

#if os(macOS)

struct AsyncCurrentValueTests {
/// Regression test for a lock-order inversion deadlock between two threads:
///
/// Thread A (update): holds allocation.mutex β†’ calls continuation.resume() inside didSet
/// β†’ resume() internally acquires the Swift task status lock
///
/// Thread B (cancel): acquires the Swift task status lock β†’ fires onCancel: handler
/// β†’ onCancel: calls allocation.mutex.withLock β†’ waits for mutex
///
/// Thread A holds mutex, wants task-lock.
/// Thread B holds task-lock, wants mutex.
/// β†’ deadlock.
///
/// If the bug is present, the test will time out after 1 minute.
@Test(.timeLimit(.minutes(1)))
func `AsyncCurrentValue does not deadlock when cancellation races a concurrent update`() async {
for _ in 0 ..< 100_000 {
let currentValue = AsyncCurrentValue<FlagChange>(.all)

// This task will:
// 1. Call next() once β€” returns the initial value immediately because
// iterator.generation (0) < state.generation (1).
// 2. Call next() again β€” suspends, storing its continuation in
// pendingContinuations. This is the continuation that gets raced.
let consumingTask = Task {
var iterator = currentValue.stream.makeAsyncIterator()
_ = await iterator.next(isolation: nil)
_ = await iterator.next(isolation: nil)
}

// Yield to give the consuming task time to advance past the first next()
// and park its continuation inside pendingContinuations on the second call.
await Task.yield()
await Task.yield()
await Task.yield()

// Fire the two racing operations:
// updateTask β€” acquires allocation.mutex, sets wrappedValue, didSet calls
// continuation.resume() while still inside withLock.
// cancel β€” acquires the task status lock, fires onCancel:, which calls
// allocation.mutex.withLock.
let updateTask = Task.detached { currentValue.update { _ in } }
consumingTask.cancel()

await updateTask.value
await consumingTask.value
}
}

}

#endif
Loading