File: node.rb

package info (click to toggle)
ruby-async 2.36.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 400 kB
  • sloc: ruby: 1,938; makefile: 4
file content (333 lines) | stat: -rw-r--r-- 8,397 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2017-2025, by Samuel Williams.
# Copyright, 2017, by Kent Gruber.
# Copyright, 2022, by Shannon Skipper.
# Copyright, 2025, by Shopify Inc.

require "fiber/annotation"

require_relative "list"

module Async
	# A list of children tasks.
	class Children < List
		# Create an empty list of children tasks.
		def initialize
			super
			@transient_count = 0
		end
		
		# Some children may be marked as transient. Transient children do not prevent the parent from finishing.
		# @returns [Boolean] Whether the node has transient children.
		def transients?
			@transient_count > 0
		end
		
		# Whether all children are considered finished. Ignores transient children.
		def finished?
			@size == @transient_count
		end
		
		# Whether the children is empty, preserved for compatibility.
		def nil?
			empty?
		end
		
		# Adjust the number of transient children, assuming it has changed.
		#
		# Despite being public, this is not intended to be called directly. It is used internally by {Node#transient=}.
		#
		# @parameter transient [Boolean] Whether to increment or decrement the transient count.
		def adjust_transient_count(transient)
			if transient
				@transient_count += 1
			else
				@transient_count -= 1
			end
		end
		
		private
		
		def added(node)
			if node.transient?
				@transient_count += 1
			end
			
			return super
		end
		
		def removed(node)
			if node.transient?
				@transient_count -= 1
			end
			
			return super
		end
	end
	
	# A node in a tree, used for implementing the task hierarchy.
	class Node
		# Create a new node in the tree.
		# @parameter parent [Node | Nil] This node will attach to the given parent.
		def initialize(parent = nil, annotation: nil, transient: false)
			@parent = nil
			@children = nil
			
			@annotation = annotation
			@object_name = nil
			
			@transient = transient
			
			@head = nil
			@tail = nil
			
			if parent
				parent.add_child(self)
			end
		end
		
		# @returns [Node] The root node in the hierarchy.
		def root
			@parent&.root || self
		end
		
		# @private
		attr_accessor :head
		
		# @private
		attr_accessor :tail
		
		# @attribute [Node] The parent node.
		attr :parent
		
		# @attribute [Children | Nil] Optional list of children.
		attr :children
		
		# @attribute [String | Nil] A useful identifier for the current node.
		attr :annotation
		
		# Whether this node has any children.
		# @returns [Boolean]
		def children?
			@children && !@children.empty?
		end
		
		# Represents whether a node is transient. Transient nodes are not considered
		# when determining if a node is finished. This is useful for tasks which are
		# internal to an object rather than explicit user concurrency. For example,
		# a child task which is pruning a connection pool is transient, because it
		# is not directly related to the parent task, and should not prevent the
		# parent task from finishing.
		def transient?
			@transient
		end
		
		# Change the transient state of the node.
		#
		# A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed.
		#
		# @parameter value [Boolean] Whether the node is transient.
		def transient=(value)
			if @transient != value
				@transient = value
				
				@parent&.children&.adjust_transient_count(value)
			end
		end
		
		# Annotate the node with a description.
		#
		# @parameter annotation [String] The description to annotate the node with.
		def annotate(annotation)
			if block_given?
				begin
					current_annotation = @annotation
					@annotation = annotation
					return yield
				ensure
					@annotation = current_annotation
				end
			else
				@annotation = annotation
			end
		end
		
		# A description of the node, including the annotation and object name.
		#
		# @returns [String] The description of the node.
		def description
			@object_name ||= "#{self.class}:#{format '%#018x', object_id}#{@transient ? ' transient' : nil}"
			
			if annotation = self.annotation
				"#{@object_name} #{annotation}"
			elsif line = self.backtrace(0, 1)&.first
				"#{@object_name} #{line}"
			else
				@object_name
			end
		end
		
		# Provides a backtrace for nodes that have an active execution context.
		#
		# @returns [Array(Thread::Backtrace::Locations) | Nil] The backtrace of the node, if available.
		def backtrace(*arguments)
			nil
		end
		
		# @returns [String] A description of the node.
		def to_s
			"\#<#{self.description}>"
		end
		
		# @returns [String] A description of the node.
		alias inspect to_s
		
		# Change the parent of this node.
		#
		# @parameter parent [Node | Nil] The parent to attach to, or nil to detach.
		# @returns [Node] Itself.
		def parent=(parent)
			return if @parent.equal?(parent)
			
			if @parent
				@parent.remove_child(self)
				@parent = nil
			end
			
			if parent
				parent.add_child(self)
			end
			
			return self
		end
		
		protected def set_parent(parent)
			@parent = parent
		end
		
		protected def add_child(child)
			@children ||= Children.new
			@children.append(child)
			child.set_parent(self)
		end
		
		protected def remove_child(child)
			# In the case of a fork, the children list may be nil:
			@children&.remove(child)
			child.set_parent(nil)
		end
		
		# Whether the node can be consumed (deleted) safely. By default, checks if the children set is empty.
		#
		# @returns [Boolean]
		def finished?
			@children.nil? || @children.finished?
		end
		
		# If the node has a parent, and is {finished?}, then remove this node from
		# the parent.
		def consume
			if parent = @parent and finished?
				parent.remove_child(self)
				
				# If we have children, then we need to move them to our the parent if they are not finished:
				if @children
					while child = @children.shift
						if child.finished?
							child.set_parent(nil)
						else
							parent.add_child(child)
						end
					end
					
					@children = nil
				end
				
				parent.consume
			end
		end
		
		# Traverse the task tree.
		#
		# @returns [Enumerator] An enumerator which will traverse the tree if no block is given.
		# @yields {|node, level| ...} The node and the level relative to the given root.
		def traverse(&block)
			return enum_for(:traverse) unless block_given?
			
			self.traverse_recurse(&block)
		end
		
		protected def traverse_recurse(level = 0, &block)
			yield self, level
			
			@children&.each do |child|
				child.traverse_recurse(level + 1, &block)
			end
		end
		
		# Immediately terminate all children tasks, including transient tasks. Internally invokes `stop(false)` on all children. This should be considered a last ditch effort and is used when closing the scheduler.
		def terminate
			# Attempt to stop the current task immediately, and all children:
			stop(false)
			
			# If that doesn't work, take more serious action:
			@children&.each do |child|
				child.terminate
			end
			
			return @children.nil?
		end
		
		# Attempt to stop the current node immediately, including all non-transient children. Invokes {#stop_children} to stop all children.
		#
		# @parameter later [Boolean] Whether to defer stopping until some point in the future.
		def stop(later = false)
			# The implementation of this method may defer calling `stop_children`.
			stop_children(later)
		end
		
		# Attempt to stop all non-transient children.
		private def stop_children(later = false)
			@children&.each do |child|
				child.stop(later) unless child.transient?
			end
		end
		
		# Wait for this node to complete. By default, nodes cannot be waited on.
		# Subclasses like Task override this method to provide waiting functionality.
		def wait
			nil
		end
		
		# Whether the node has been stopped.
		def stopped?
			@children.nil?
		end
		
		# Print the hierarchy of the task tree from the given node.
		#
		# @parameter out [IO] The output stream to write to.
		# @parameter backtrace [Boolean] Whether to print the backtrace of each node.
		def print_hierarchy(out = $stdout, backtrace: true)
			self.traverse do |node, level|
				indent = "\t" * level
				
				out.puts "#{indent}#{node}"
				
				print_backtrace(out, indent, node) if backtrace
			end
		end
		
		private
		
		def print_backtrace(out, indent, node)
			if backtrace = node.backtrace
				backtrace.each_with_index do |line, index|
					out.puts "#{indent}#{index.zero? ? "→ " : "  "}#{line}"
				end
			end
		end
	end
end