Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion fixtures/async/chainable_async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ module Async
let(:chainable) {subject.new(parent: parent)}

it "should chain async to parent" do
expect(parent).to receive(:async).and_return(nil)
expect(parent).to receive(:async).and_return{|*arguments, **options, &block|
Async(*arguments, **options, &block)
}

chainable.async do
# Nothing.
Expand Down
23 changes: 19 additions & 4 deletions lib/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Barrier
def initialize(parent: nil)
@tasks = List.new
@finished = Queue.new
@condition = Condition.new

@parent = parent
end
Expand All @@ -42,18 +43,32 @@ def size

# Execute a child task and add it to the barrier.
# @asynchronous Executes the given block concurrently.
# @returns [Task] The task which was created to execute the block.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
raise "Barrier is stopped!" if @finished.closed?

waiting = nil

parent.async(*arguments, **options) do |task, *arguments|
waiting = TaskNode.new(task)
@tasks.append(waiting)
task = parent.async(*arguments, **options) do |task, *arguments|
# Create a new list node for the task and add it to the list of waiting tasks:
node = TaskNode.new(task)
@tasks.append(node)

# Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish:
waiting = node
@condition.signal

# Invoke the block, which may raise an error. If it does, we will still signal that the task has finished:
block.call(task, *arguments)
ensure
@finished.signal(waiting) unless @finished.closed?
# Signal that the task has finished, which will unblock the waiting task:
@finished.signal(node) unless @finished.closed?
end

# `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it:
@condition.wait while waiting.nil?

return task
end

# Whether there are any tasks being held by the barrier.
Expand Down
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- Fix `Barrier#async` when `parent.async` yields before the child block executes. Previously, `Barrier#wait` could return early and miss tracking the task entirely, because the task had not yet appended itself to the barrier's task list.

## v2.38.0

- Rename `Task#stop` to `Task#cancel` for better clarity and consistency with common concurrency terminology. The old `stop` method is still available as an alias for backward compatibility, but it is recommended to use `cancel` going forward.
Expand Down
25 changes: 25 additions & 0 deletions test/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,31 @@
barrier.stop
end
end

it "waits even if the child task yields immediately" do
class Yielder
def async(*arguments, **options, &block)
Async(*arguments, **options) do |task, *arguments|
task.yield
block.call(task, *arguments)
end
end
end

parent = Yielder.new

3.times do |i|
barrier.async(parent:){i}
end

expect(barrier.size).to be == 3

results = []
barrier.wait do |task|
results << task.wait
end
expect(results.sort).to be == [0, 1, 2]
end
end

with "#stop" do
Expand Down
Loading