1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2019-2025, by Samuel Williams.
require_relative "list"
require_relative "task"
require_relative "queue"
module Async
# A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
#
# @public Since *Async v1*.
class Barrier
# Initialize the barrier.
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
# @public Since *Async v1*.
def initialize(parent: nil)
@tasks = List.new
@finished = Queue.new
@parent = parent
end
class TaskNode < List::Node
def initialize(task)
@task = task
end
attr :task
end
private_constant :TaskNode
# Number of tasks being held by the barrier.
def size
@tasks.size
end
# All tasks which have been invoked into the barrier.
attr :tasks
# Execute a child task and add it to the barrier.
# @asynchronous Executes the given block concurrently.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
raise "Barrier is stopped!" if @finished.closed?
waiting = nil
parent.async(*arguments, **options) do |task, *arguments|
waiting = TaskNode.new(task)
@tasks.append(waiting)
block.call(task, *arguments)
ensure
@finished.signal(waiting) unless @finished.closed?
end
end
# Whether there are any tasks being held by the barrier.
# @returns [Boolean]
def empty?
@tasks.empty?
end
# Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
#
# @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results.
#
# @asynchronous Will wait for tasks to finish executing.
def wait
while !@tasks.empty?
# Wait for a task to finish (we get the task node):
return unless waiting = @finished.wait
# Remove the task as it is now finishing:
@tasks.remove?(waiting)
# Get the task:
task = waiting.task
# If a block is given, the user can implement their own behaviour:
if block_given?
yield task
else
# Wait for it to either complete or raise an error:
task.wait
end
end
end
# Stop all tasks held by the barrier.
# @asynchronous May wait for tasks to finish executing.
def stop
@tasks.each do |waiting|
waiting.task.stop
end
@finished.close
end
end
end
|