File: scheduler.rb

package info (click to toggle)
ruby-async 2.36.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 400 kB
  • sloc: ruby: 1,938; makefile: 4
file content (664 lines) | stat: -rw-r--r-- 20,526 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2020-2025, by Samuel Williams.
# Copyright, 2020, by Jun Jiang.
# Copyright, 2021, by Julien Portalier.
# Copyright, 2025, by Shopify Inc.

require_relative "clock"
require_relative "task"
require_relative "timeout"
require_relative "fork_handler"

require "io/event"

require "console"
require "resolv"

module Async
	begin
		require "fiber/profiler"
		Profiler = Fiber::Profiler
	rescue LoadError
		# Fiber::Profiler is not available.
		Profiler = nil
	end
	
	# Handles scheduling of fibers. Implements the fiber scheduler interface.
	class Scheduler < Node
		WORKER_POOL = ENV.fetch("ASYNC_SCHEDULER_WORKER_POOL", nil).then do |value|
			value == "true" ? true : nil
		end
		
		# Raised when an operation is attempted on a closed scheduler.
		class ClosedError < RuntimeError
			# Create a new error.
			#
			# @parameter message [String] The error message.
			def initialize(message = "Scheduler is closed!")
				super
			end
		end
		
		# Whether the fiber scheduler is supported.
		# @public Since *Async v1*.
		def self.supported?
			true
		end
		
		# Used to augment the scheduler to add support for blocking operations.
		module BlockingOperationWait
			# Wait for the given work to be executed.
			#
			# @public Since *Async v2.21* and *Ruby v3.4*.
			# @asynchronous May be non-blocking.
			#
			# @parameter work [Proc] The work to execute on a background thread.
			# @returns [Object] The result of the work.
			def blocking_operation_wait(work)
				@worker_pool.call(work)
			end
		end
		
		private_constant :BlockingOperationWait
		
		if ::IO::Event.const_defined?(:WorkerPool)
			WorkerPool = ::IO::Event::WorkerPool
		else
			WorkerPool = nil
		end
		
		# Create a new scheduler.
		#
		# @public Since *Async v1*.
		# @parameter parent [Node | Nil] The parent node to use for task hierarchy.
		# @parameter selector [IO::Event::Selector] The selector to use for event handling.
		def initialize(parent = nil, selector: nil, profiler: Profiler&.default, worker_pool: WORKER_POOL)
			super(parent)
			
			@selector = selector || ::IO::Event::Selector.new(Fiber.current)
			@profiler = profiler
			
			@interrupted = false
			
			@blocked = 0
			
			@busy_time = 0.0
			@idle_time = 0.0
			
			@timers = ::IO::Event::Timers.new
			
			if worker_pool == true
				@worker_pool = WorkerPool&.new
			else
				@worker_pool = worker_pool
			end
			
			if @worker_pool
				self.singleton_class.prepend(BlockingOperationWait)
			end
		end
		
		# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
		#
		# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
		def load
			total_time = @busy_time + @idle_time
			
			# If the total time is zero, then the load is zero:
			return 0.0 if total_time.zero?
			
			# We normalize to a 1 second window:
			if total_time > 1.0
				ratio = 1.0 / total_time
				@busy_time *= ratio
				@idle_time *= ratio
				
				# We don't need to divide here as we've already normalised it to a 1s window:
				return @busy_time
			else
				return @busy_time / total_time
			end
		end
		
		# Invoked when the fiber scheduler is being closed.
		#
		# Executes the run loop until all tasks are finished, then closes the scheduler.
		def scheduler_close(error = $!)
			# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
			unless error
				self.run
			end
		ensure
			self.close
		end
		
		# Terminate all child tasks.
		def terminate
			# If that doesn't work, take more serious action:
			@children&.each do |child|
				child.terminate
			end
			
			return @children.nil?
		end
		
		# Terminate all child tasks and close the scheduler.
		# @public Since *Async v1*.
		def close
			unless @children.nil?
				self.run_loop do
					until self.terminate
						self.run_once!
					end
				end
			end
			
			Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
		ensure
			# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
			if selector = @selector
				@selector = nil
				selector.close
			end
			
			if worker_pool = @worker_pool
				@worker_pool = nil
				worker_pool.close
			end
			
			consume
		end
		
		# @returns [Boolean] Whether the scheduler has been closed.
		# @public Since *Async v1*.
		def closed?
			@selector.nil?
		end
		
		# @returns [String] A description of the scheduler.
		def to_s
			"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
		end
		
		# Interrupt the event loop and cause it to exit.
		# @asynchronous May be called from any thread.
		def interrupt
			@interrupted = true
			@selector&.wakeup
		end
		
		# Transfer from the calling fiber to the event loop.
		def transfer
			@selector.transfer
		end
		
		# Yield the current fiber and resume it on the next iteration of the event loop.
		def yield
			@selector.yield
		end
		
		# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
		# @parameter fiber [Fiber | Object] The object to be resumed on the next iteration of the run-loop.
		def push(fiber)
			@selector.push(fiber)
		end
		
		# Raise an exception on a specified fiber with the given arguments.
		#
		# This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution.
		#
		# @parameter fiber [Fiber] The fiber to raise the exception on.
		# @parameter *arguments [Array] The arguments to pass to the fiber.
		def raise(...)
			@selector.raise(...)
		end
		
		# Resume execution of the specified fiber.
		#
		# @parameter fiber [Fiber] The fiber to resume.
		# @parameter arguments [Array] The arguments to pass to the fiber.
		def resume(fiber, *arguments)
			@selector.resume(fiber, *arguments)
		end
		
		# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
		#
		# @public Since *Async v2*.
		# @asynchronous May only be called on same thread as fiber scheduler.
		#
		# @parameter blocker [Object] The object that is blocking the fiber.
		# @parameter timeout [Float | Nil] The maximum time to block, or if nil, indefinitely.
		def block(blocker, timeout)
			# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
			fiber = Fiber.current
			
			if timeout
				timer = @timers.after(timeout) do
					if fiber.alive?
						fiber.transfer(false)
					end
				end
			end
			
			begin
				@blocked += 1
				@selector.transfer
			ensure
				@blocked -= 1
			end
		ensure
			timer&.cancel!
		end
		
		# Unblock a fiber that was previously blocked.
		#
		# @public Since *Async v2* and *Ruby v3.1*.
		# @asynchronous May be called from any thread.
		#
		# @parameter blocker [Object] The object that was blocking the fiber.
		# @parameter fiber [Fiber] The fiber to unblock.
		def unblock(blocker, fiber)
			# Fiber.blocking{$stderr.puts "unblock(#{blocker}, #{fiber})"}
			
			# This operation is protected by the GVL:
			if selector = @selector
				selector.push(fiber)
				selector.wakeup
			end
		end
		
		# Sleep for the specified duration.
		#
		# @public Since *Async v2* and *Ruby v3.1*.
		# @asynchronous May be non-blocking.
		#
		# @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely.
		def kernel_sleep(duration = nil)
			# Fiber.blocking{$stderr.puts "kernel_sleep(#{duration}, #{Fiber.current})"}
			
			if duration
				self.block(nil, duration)
			else
				self.transfer
			end
		end
		
		# Resolve the address of the given hostname.
		#
		# @public Since *Async v2*.
		# @asynchronous May be non-blocking.
		#
		# @parameter hostname [String] The hostname to resolve.
		def address_resolve(hostname)
			# On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving.
			# See <https://github.com/socketry/async/issues/180> for more details.
			hostname = hostname.split("%", 2).first
			::Resolv.getaddresses(hostname)
		end
		
		# Wait for the specified IO to become ready for the specified events.
		#
		# @public Since *Async v2*.
		# @asynchronous May be non-blocking.
		#
		# @parameter io [IO] The IO object to wait on.
		# @parameter events [Integer] The events to wait for, e.g. `IO::READABLE`, `IO::WRITABLE`, etc.
		# @parameter timeout [Float | Nil] The maximum time to wait, or if nil, indefinitely.
		def io_wait(io, events, timeout = nil)
			fiber = Fiber.current
			
			if timeout
				# If an explicit timeout is specified, we expect that the user will handle it themselves:
				timer = @timers.after(timeout) do
					fiber.transfer
				end
			elsif timeout = io.timeout
				# Otherwise, if we default to the io's timeout, we raise an exception:
				timer = @timers.after(timeout) do
					fiber.raise(::IO::TimeoutError, "Timeout (#{timeout}s) while waiting for IO to become ready!")
				end
			end
			
			return @selector.io_wait(fiber, io, events)
		ensure
			timer&.cancel!
		end
		
		if ::IO::Event::Support.buffer?
			# Read from the specified IO into the buffer.
			#
			# @public Since *Async v2* and Ruby with `IO::Buffer` support.
			# @asynchronous May be non-blocking.
			#
			# @parameter io [IO] The IO object to read from.
			# @parameter buffer [IO::Buffer] The buffer to read into.
			# @parameter length [Integer] The minimum number of bytes to read.
			# @parameter offset [Integer] The offset within the buffer to read into.
			def io_read(io, buffer, length, offset = 0)
				fiber = Fiber.current
				
				if timeout = io.timeout
					timer = @timers.after(timeout) do
						fiber.raise(::IO::TimeoutError, "Timeout (#{timeout}s) while waiting for IO to become readable!")
					end
				end
				
				@selector.io_read(fiber, io, buffer, length, offset)
			ensure
				timer&.cancel!
			end
			
			if RUBY_ENGINE != "ruby" || RUBY_VERSION >= "3.3.1"
				# Write the specified buffer to the IO.
				#
				# @public Since *Async v2* and *Ruby v3.3.1* with `IO::Buffer` support.
				# @asynchronous May be non-blocking.
				#
				# @parameter io [IO] The IO object to write to.
				# @parameter buffer [IO::Buffer] The buffer to write from.
				# @parameter length [Integer] The minimum number of bytes to write.
				# @parameter offset [Integer] The offset within the buffer to write from.
				def io_write(io, buffer, length, offset = 0)
					fiber = Fiber.current
					
					if timeout = io.timeout
						timer = @timers.after(timeout) do
							fiber.raise(::IO::TimeoutError, "Timeout (#{timeout}s) while waiting for IO to become writable!")
						end
					end
					
					@selector.io_write(fiber, io, buffer, length, offset)
				ensure
					timer&.cancel!
				end
			end
		end
		
		# Used to defer stopping the current task until later.
		class FiberInterrupt
			# Create a new stop later operation.
			#
			# @parameter task [Task] The task to stop later.
			def initialize(fiber, exception)
				@fiber = fiber
				@exception = exception
			end
			
			# @returns [Boolean] Whether the task is alive.
			def alive?
				@fiber.alive?
			end
			
			# Transfer control to the operation - this will stop the task.
			def transfer
				# Fiber.blocking{$stderr.puts "FiberInterrupt#transfer(#{@fiber}, #{@exception})"}
				@fiber.raise(@exception)
			end
		end
		
		# Raise an exception on the specified fiber, waking up the event loop if necessary.
		def fiber_interrupt(fiber, exception)
			# Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"}
			unblock(nil, FiberInterrupt.new(fiber, exception))
		end
		
		# Wait for the specified process ID to exit.
		#
		# @public Since *Async v2*.
		# @asynchronous May be non-blocking.
		#
		# @parameter pid [Integer] The process ID to wait for.
		# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
		# @returns [Process::Status] A process status instance.
		# @asynchronous May be non-blocking..
		def process_wait(pid, flags)
			return @selector.process_wait(Fiber.current, pid, flags)
		end
		
		# Wait for the specified IOs to become ready for the specified events.
		#
		# @public Since *Async v2.25*.
		# @asynchronous May be non-blocking.
		def io_select(...)
			Thread.new do
				# Don't make unnecessary output, since we will propagate the exception:
				Thread.current.report_on_exception = false
				
				::IO.select(...)
			end.value
		end
		
		# Run one iteration of the event loop.
		#
		# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
		#
		# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
		# @returns [Boolean] Whether there is more work to do.
		private def run_once!(timeout = nil)
			start_time = Async::Clock.now
			
			interval = @timers.wait_interval
			
			# If there is no interval to wait (thus no timers), and no tasks, we could be done:
			if interval.nil?
				# Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely:
				interval = timeout
			elsif interval < 0
				# We have timers ready to fire, don't sleep in the selctor:
				interval = 0
			elsif timeout and interval > timeout
				interval = timeout
			end
			
			begin
				@selector.select(interval)
			rescue Errno::EINTR
				# Ignore.
			end
			
			@timers.fire
			
			# Compute load:
			end_time = Async::Clock.now
			total_duration = end_time - start_time
			idle_duration = @selector.idle_duration
			busy_duration = total_duration - idle_duration
			
			@busy_time += busy_duration
			@idle_time += idle_duration
			
			# The reactor still has work to do:
			return true
		end
		
		# Run one iteration of the event loop.
		#
		# @public Since *Async v1*.
		# @asynchronous Must be invoked from blocking (root) fiber.
		#
		# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
		# @returns [Boolean] Whether there is more work to do.
		def run_once(timeout = nil)
			Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
			
			if self.finished?
				self.stop
			end
			
			# If we are finished, we stop the task tree and exit:
			if @children.nil?
				return false
			end
			
			return run_once!(timeout)
		end
		
		# Checks and clears the interrupted state of the scheduler.
		#
		# @returns [Boolean] Whether the reactor has been interrupted.
		private def interrupted?
			if @interrupted
				@interrupted = false
				return true
			end
			
			if Thread.pending_interrupt?
				return true
			end
			
			return false
		end
		
		# Stop all children, including transient children.
		#
		# @public Since *Async v1*.
		def stop
			@children&.each do |child|
				child.stop
			end
		end
		
		private def run_loop(&block)
			interrupt = nil
			
			begin
				# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
				Thread.handle_interrupt(::SignalException => :never) do
					until self.interrupted?
						# If we are finished, we need to exit:
						break unless yield
					end
				end
			rescue Interrupt => interrupt
				# If an interrupt did occur during an iteration of the event loop, we need to handle it. More specifically, `self.stop` is not safe to interrupt without potentially corrupting the task tree.
				Thread.handle_interrupt(::SignalException => :never) do
					Console.debug(self) do |buffer|
						buffer.puts "Scheduler interrupted: #{interrupt.inspect}"
						self.print_hierarchy(buffer)
					end
					
					self.stop
				end
				
				retry
			end
			
			# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
			if interrupt
				Kernel.raise(interrupt)
			end
		end
		
		# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
		#
		# Forwards all parameters to {#async} if a block is given.
		#
		# @public Since *Async v1*.
		#
		# @yields {|task| ...} The top level task, if a block is given.
		# @returns [Task] The initial task that was scheduled into the reactor.
		def run(...)
			Kernel.raise ClosedError if @selector.nil?
			
			begin
				@profiler&.start
				
				initial_task = self.async(...) if block_given?
				
				self.run_loop do
					run_once
				end
				
				return initial_task
			ensure
				@profiler&.stop
			end
		end
		
		# Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.
		#
		# @public Since *Async v1*.
		# @asynchronous May context switch immediately to new task.
		# @deprecated Use {#run} or {Task#async} instead.
		#
		# @yields {|task| ...} Executed within the task.
		# @returns [Task] The task that was scheduled into the reactor.
		def async(*arguments, **options, &block)
			# Since this method is called by `run`, this warning is too excessive:
			# warn("Async::Scheduler#async is deprecated. Use `run` or `Task#async` instead.", uplevel: 1, category: :deprecated) if $VERBOSE
			
			Kernel.raise ClosedError if @selector.nil?
			
			task = Task.new(Task.current? || self, **options, &block)
			
			task.run(*arguments)
			
			return task
		end
		
		# Create a new fiber and return it without starting execution.
		# @returns [Fiber] The fiber that was created.
		def fiber(...)
			return async(...).fiber
		end
		
		# Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
		#
		# @public Since *Async v1*.
		# @asynchronous May raise an exception at any interruption point (e.g. blocking operations).
		#
		# @parameter duration [Numeric] The time in seconds, in which the task should complete.
		# @parameter exception [Class] The exception class to raise.
		# @parameter message [String] The message to pass to the exception.
		# @yields {|timeout| ...} The block to execute with a timeout.
		def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
			fiber = Fiber.current
			
			timer = @timers.after(duration) do
				if fiber.alive?
					fiber.raise(exception, message)
				end
			end
			
			if block.arity.zero?
				yield
			else
				yield Timeout.new(@timers, timer)
			end
		ensure
			timer&.cancel!
		end
		
		# Invoke the block, but after the specified timeout, raise the specified exception with the given message. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
		#
		# @public Since *Async v1* and *Ruby v3.1*. May be invoked from `Timeout.timeout`.
		# @asynchronous May raise an exception at any interruption point (e.g. blocking operations).
		#
		# @parameter duration [Numeric] The time in seconds, in which the task should complete.
		# @parameter exception [Class] The exception class to raise.
		# @parameter message [String] The message to pass to the exception.
		# @yields {|duration| ...} The block to execute with a timeout.
		def timeout_after(duration, exception, message, &block)
			with_timeout(duration, exception, message) do
				yield duration
			end
		end
		
		# Handle fork in the child process. This method is called automatically when `Process.fork` is invoked on Ruby versions < 4 and cleans up the scheduler state. On Ruby 4+, the scheduler is automatically cleaned up by the Ruby runtime.
		#
		# The child process starts with a clean slate - no scheduler is set. Users can create a new scheduler if needed.
		#
		# @public Since *Async v2.35*.
		def process_fork
			@profiler = nil
			@children = nil
			@selector = nil
			@timers = nil
			
			# Close the scheduler:
			Fiber.set_scheduler(nil)
		end
	end
end