1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
|
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2017-2025, by Samuel Williams.
# Copyright, 2017, by Kent Gruber.
# Copyright, 2022, by Shannon Skipper.
# Copyright, 2025, by Shopify Inc.
require "fiber/annotation"
require_relative "list"
module Async
# A list of children tasks.
class Children < List
# Create an empty list of children tasks.
def initialize
super
@transient_count = 0
end
# Some children may be marked as transient. Transient children do not prevent the parent from finishing.
# @returns [Boolean] Whether the node has transient children.
def transients?
@transient_count > 0
end
# Whether all children are considered finished. Ignores transient children.
def finished?
@size == @transient_count
end
# Whether the children is empty, preserved for compatibility.
def nil?
empty?
end
# Adjust the number of transient children, assuming it has changed.
#
# Despite being public, this is not intended to be called directly. It is used internally by {Node#transient=}.
#
# @parameter transient [Boolean] Whether to increment or decrement the transient count.
def adjust_transient_count(transient)
if transient
@transient_count += 1
else
@transient_count -= 1
end
end
private
def added(node)
if node.transient?
@transient_count += 1
end
return super
end
def removed(node)
if node.transient?
@transient_count -= 1
end
return super
end
end
# A node in a tree, used for implementing the task hierarchy.
class Node
# Create a new node in the tree.
# @parameter parent [Node | Nil] This node will attach to the given parent.
def initialize(parent = nil, annotation: nil, transient: false)
@parent = nil
@children = nil
@annotation = annotation
@object_name = nil
@transient = transient
@head = nil
@tail = nil
if parent
parent.add_child(self)
end
end
# @returns [Node] The root node in the hierarchy.
def root
@parent&.root || self
end
# @private
attr_accessor :head
# @private
attr_accessor :tail
# @attribute [Node] The parent node.
attr :parent
# @attribute [Children | Nil] Optional list of children.
attr :children
# @attribute [String | Nil] A useful identifier for the current node.
attr :annotation
# Whether this node has any children.
# @returns [Boolean]
def children?
@children && !@children.empty?
end
# Represents whether a node is transient. Transient nodes are not considered
# when determining if a node is finished. This is useful for tasks which are
# internal to an object rather than explicit user concurrency. For example,
# a child task which is pruning a connection pool is transient, because it
# is not directly related to the parent task, and should not prevent the
# parent task from finishing.
def transient?
@transient
end
# Change the transient state of the node.
#
# A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed.
#
# @parameter value [Boolean] Whether the node is transient.
def transient=(value)
if @transient != value
@transient = value
@parent&.children&.adjust_transient_count(value)
end
end
# Annotate the node with a description.
#
# @parameter annotation [String] The description to annotate the node with.
def annotate(annotation)
if block_given?
begin
current_annotation = @annotation
@annotation = annotation
return yield
ensure
@annotation = current_annotation
end
else
@annotation = annotation
end
end
# A description of the node, including the annotation and object name.
#
# @returns [String] The description of the node.
def description
@object_name ||= "#{self.class}:#{format '%#018x', object_id}#{@transient ? ' transient' : nil}"
if annotation = self.annotation
"#{@object_name} #{annotation}"
elsif line = self.backtrace(0, 1)&.first
"#{@object_name} #{line}"
else
@object_name
end
end
# Provides a backtrace for nodes that have an active execution context.
#
# @returns [Array(Thread::Backtrace::Locations) | Nil] The backtrace of the node, if available.
def backtrace(*arguments)
nil
end
# @returns [String] A description of the node.
def to_s
"\#<#{self.description}>"
end
# @returns [String] A description of the node.
alias inspect to_s
# Change the parent of this node.
#
# @parameter parent [Node | Nil] The parent to attach to, or nil to detach.
# @returns [Node] Itself.
def parent=(parent)
return if @parent.equal?(parent)
if @parent
@parent.remove_child(self)
@parent = nil
end
if parent
parent.add_child(self)
end
return self
end
protected def set_parent(parent)
@parent = parent
end
protected def add_child(child)
@children ||= Children.new
@children.append(child)
child.set_parent(self)
end
protected def remove_child(child)
# In the case of a fork, the children list may be nil:
@children&.remove(child)
child.set_parent(nil)
end
# Whether the node can be consumed (deleted) safely. By default, checks if the children set is empty.
#
# @returns [Boolean]
def finished?
@children.nil? || @children.finished?
end
# If the node has a parent, and is {finished?}, then remove this node from
# the parent.
def consume
if parent = @parent and finished?
parent.remove_child(self)
# If we have children, then we need to move them to our the parent if they are not finished:
if @children
while child = @children.shift
if child.finished?
child.set_parent(nil)
else
parent.add_child(child)
end
end
@children = nil
end
parent.consume
end
end
# Traverse the task tree.
#
# @returns [Enumerator] An enumerator which will traverse the tree if no block is given.
# @yields {|node, level| ...} The node and the level relative to the given root.
def traverse(&block)
return enum_for(:traverse) unless block_given?
self.traverse_recurse(&block)
end
protected def traverse_recurse(level = 0, &block)
yield self, level
@children&.each do |child|
child.traverse_recurse(level + 1, &block)
end
end
# Immediately terminate all children tasks, including transient tasks. Internally invokes `stop(false)` on all children. This should be considered a last ditch effort and is used when closing the scheduler.
def terminate
# Attempt to stop the current task immediately, and all children:
stop(false)
# If that doesn't work, take more serious action:
@children&.each do |child|
child.terminate
end
return @children.nil?
end
# Attempt to stop the current node immediately, including all non-transient children. Invokes {#stop_children} to stop all children.
#
# @parameter later [Boolean] Whether to defer stopping until some point in the future.
def stop(later = false)
# The implementation of this method may defer calling `stop_children`.
stop_children(later)
end
# Attempt to stop all non-transient children.
private def stop_children(later = false)
@children&.each do |child|
child.stop(later) unless child.transient?
end
end
# Wait for this node to complete. By default, nodes cannot be waited on.
# Subclasses like Task override this method to provide waiting functionality.
def wait
nil
end
# Whether the node has been stopped.
def stopped?
@children.nil?
end
# Print the hierarchy of the task tree from the given node.
#
# @parameter out [IO] The output stream to write to.
# @parameter backtrace [Boolean] Whether to print the backtrace of each node.
def print_hierarchy(out = $stdout, backtrace: true)
self.traverse do |node, level|
indent = "\t" * level
out.puts "#{indent}#{node}"
print_backtrace(out, indent, node) if backtrace
end
end
private
def print_backtrace(out, indent, node)
if backtrace = node.backtrace
backtrace.each_with_index do |line, index|
out.puts "#{indent}#{index.zero? ? "→ " : " "}#{line}"
end
end
end
end
end
|