Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Sources/Combiners/Merge/AsyncMergeSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ public struct AsyncMergeSequence<Base: AsyncSequence>: AsyncSequence {
}

public struct Iterator: AsyncIteratorProtocol {
private let isEmpty: Bool
let mergeStateMachine: MergeStateMachine<Element>

init(bases: [Base]) {
isEmpty = bases.isEmpty
self.mergeStateMachine = MergeStateMachine(
bases
)
}

public mutating func next() async rethrows -> Element? {
guard !self.isEmpty else { return nil }

let mergedElement = await self.mergeStateMachine.next()
switch mergedElement {
case .element(let result):
Expand Down
3 changes: 2 additions & 1 deletion Sources/Combiners/Merge/MergeStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ struct MergeStateMachine<Element>: Sendable {
var regulators = [Regulator<Base>]()

for base in bases {
let regulator = Regulator<Base>(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
let regulator = Regulator<Base>(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }
)
regulators.append(regulator)
}

Expand Down
21 changes: 19 additions & 2 deletions Tests/Combiners/Merge/AsyncMergeSequenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ final class AsyncMergeSequenceTests: XCTestCase {

let expectedElements = asyncSequence1 + asyncSequence2 + asyncSequence3 + asyncSequence4


let sut = merge(asyncSequence1.async, asyncSequence2.async, asyncSequence3.async, asyncSequence4.async)

var receivedElements = [Int]()
Expand Down Expand Up @@ -247,7 +246,7 @@ final class AsyncMergeSequenceTests: XCTestCase {
for try await element in sut {
firstElement = element
canCancelExpectation.fulfill()
wait(for: [hasCancelExceptation], timeout: 5)
await fulfillment(of: [hasCancelExceptation], timeout: 5)
}
XCTAssertEqual(firstElement, 10)
taskHasFinishedExpectation.fulfill()
Expand Down Expand Up @@ -289,4 +288,22 @@ final class AsyncMergeSequenceTests: XCTestCase {

wait(for: [hasCancelExceptation], timeout: 1)
}

func testMerge_finishes_when_empty_array_of_base() {
let sut = AsyncMergeSequence<AsyncStream<Int>>([])
let hasFinishedExpectation = expectation(description: "Merge has finished")

let task = Task {
var received = [Int]()
for try await element in sut {
received.append(element)
}
XCTAssertTrue(received.isEmpty)
hasFinishedExpectation.fulfill()
}

wait(for: [hasFinishedExpectation], timeout: 1)

task.cancel()
}
}
Loading