1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
|
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2020-2025, by Samuel Williams.
# Copyright, 2020, by Jun Jiang.
# Copyright, 2021, by Julien Portalier.
# Copyright, 2025, by Shopify Inc.
require_relative "clock"
require_relative "task"
require_relative "timeout"
require_relative "fork_handler"
require "io/event"
require "console"
require "resolv"
module Async
begin
require "fiber/profiler"
Profiler = Fiber::Profiler
rescue LoadError
# Fiber::Profiler is not available.
Profiler = nil
end
# Handles scheduling of fibers. Implements the fiber scheduler interface.
class Scheduler < Node
WORKER_POOL = ENV.fetch("ASYNC_SCHEDULER_WORKER_POOL", nil).then do |value|
value == "true" ? true : nil
end
# Raised when an operation is attempted on a closed scheduler.
class ClosedError < RuntimeError
# Create a new error.
#
# @parameter message [String] The error message.
def initialize(message = "Scheduler is closed!")
super
end
end
# Whether the fiber scheduler is supported.
# @public Since *Async v1*.
def self.supported?
true
end
# Used to augment the scheduler to add support for blocking operations.
module BlockingOperationWait
# Wait for the given work to be executed.
#
# @public Since *Async v2.21* and *Ruby v3.4*.
# @asynchronous May be non-blocking.
#
# @parameter work [Proc] The work to execute on a background thread.
# @returns [Object] The result of the work.
def blocking_operation_wait(work)
@worker_pool.call(work)
end
end
private_constant :BlockingOperationWait
if ::IO::Event.const_defined?(:WorkerPool)
WorkerPool = ::IO::Event::WorkerPool
else
WorkerPool = nil
end
# Create a new scheduler.
#
# @public Since *Async v1*.
# @parameter parent [Node | Nil] The parent node to use for task hierarchy.
# @parameter selector [IO::Event::Selector] The selector to use for event handling.
def initialize(parent = nil, selector: nil, profiler: Profiler&.default, worker_pool: WORKER_POOL)
super(parent)
@selector = selector || ::IO::Event::Selector.new(Fiber.current)
@profiler = profiler
@interrupted = false
@blocked = 0
@busy_time = 0.0
@idle_time = 0.0
@timers = ::IO::Event::Timers.new
if worker_pool == true
@worker_pool = WorkerPool&.new
else
@worker_pool = worker_pool
end
if @worker_pool
self.singleton_class.prepend(BlockingOperationWait)
end
end
# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
#
# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
def load
total_time = @busy_time + @idle_time
# If the total time is zero, then the load is zero:
return 0.0 if total_time.zero?
# We normalize to a 1 second window:
if total_time > 1.0
ratio = 1.0 / total_time
@busy_time *= ratio
@idle_time *= ratio
# We don't need to divide here as we've already normalised it to a 1s window:
return @busy_time
else
return @busy_time / total_time
end
end
# Invoked when the fiber scheduler is being closed.
#
# Executes the run loop until all tasks are finished, then closes the scheduler.
def scheduler_close(error = $!)
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless error
self.run
end
ensure
self.close
end
# Terminate all child tasks.
def terminate
# If that doesn't work, take more serious action:
@children&.each do |child|
child.terminate
end
return @children.nil?
end
# Terminate all child tasks and close the scheduler.
# @public Since *Async v1*.
def close
unless @children.nil?
self.run_loop do
until self.terminate
self.run_once!
end
end
end
Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
ensure
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
if selector = @selector
@selector = nil
selector.close
end
if worker_pool = @worker_pool
@worker_pool = nil
worker_pool.close
end
consume
end
# @returns [Boolean] Whether the scheduler has been closed.
# @public Since *Async v1*.
def closed?
@selector.nil?
end
# @returns [String] A description of the scheduler.
def to_s
"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
end
# Interrupt the event loop and cause it to exit.
# @asynchronous May be called from any thread.
def interrupt
@interrupted = true
@selector&.wakeup
end
# Transfer from the calling fiber to the event loop.
def transfer
@selector.transfer
end
# Yield the current fiber and resume it on the next iteration of the event loop.
def yield
@selector.yield
end
# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
# @parameter fiber [Fiber | Object] The object to be resumed on the next iteration of the run-loop.
def push(fiber)
@selector.push(fiber)
end
# Raise an exception on a specified fiber with the given arguments.
#
# This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution.
#
# @parameter fiber [Fiber] The fiber to raise the exception on.
# @parameter *arguments [Array] The arguments to pass to the fiber.
def raise(...)
@selector.raise(...)
end
# Resume execution of the specified fiber.
#
# @parameter fiber [Fiber] The fiber to resume.
# @parameter arguments [Array] The arguments to pass to the fiber.
def resume(fiber, *arguments)
@selector.resume(fiber, *arguments)
end
# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
#
# @public Since *Async v2*.
# @asynchronous May only be called on same thread as fiber scheduler.
#
# @parameter blocker [Object] The object that is blocking the fiber.
# @parameter timeout [Float | Nil] The maximum time to block, or if nil, indefinitely.
def block(blocker, timeout)
# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
fiber = Fiber.current
if timeout
timer = @timers.after(timeout) do
if fiber.alive?
fiber.transfer(false)
end
end
end
begin
@blocked += 1
@selector.transfer
ensure
@blocked -= 1
end
ensure
timer&.cancel!
end
# Unblock a fiber that was previously blocked.
#
# @public Since *Async v2* and *Ruby v3.1*.
# @asynchronous May be called from any thread.
#
# @parameter blocker [Object] The object that was blocking the fiber.
# @parameter fiber [Fiber] The fiber to unblock.
def unblock(blocker, fiber)
# Fiber.blocking{$stderr.puts "unblock(#{blocker}, #{fiber})"}
# This operation is protected by the GVL:
if selector = @selector
selector.push(fiber)
selector.wakeup
end
end
# Sleep for the specified duration.
#
# @public Since *Async v2* and *Ruby v3.1*.
# @asynchronous May be non-blocking.
#
# @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely.
def kernel_sleep(duration = nil)
# Fiber.blocking{$stderr.puts "kernel_sleep(#{duration}, #{Fiber.current})"}
if duration
self.block(nil, duration)
else
self.transfer
end
end
# Resolve the address of the given hostname.
#
# @public Since *Async v2*.
# @asynchronous May be non-blocking.
#
# @parameter hostname [String] The hostname to resolve.
def address_resolve(hostname)
# On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving.
# See <https://github.com/socketry/async/issues/180> for more details.
hostname = hostname.split("%", 2).first
::Resolv.getaddresses(hostname)
end
# Wait for the specified IO to become ready for the specified events.
#
# @public Since *Async v2*.
# @asynchronous May be non-blocking.
#
# @parameter io [IO] The IO object to wait on.
# @parameter events [Integer] The events to wait for, e.g. `IO::READABLE`, `IO::WRITABLE`, etc.
# @parameter timeout [Float | Nil] The maximum time to wait, or if nil, indefinitely.
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
if timeout
# If an explicit timeout is specified, we expect that the user will handle it themselves:
timer = @timers.after(timeout) do
fiber.transfer
end
elsif timeout = io.timeout
# Otherwise, if we default to the io's timeout, we raise an exception:
timer = @timers.after(timeout) do
fiber.raise(::IO::TimeoutError, "Timeout (#{timeout}s) while waiting for IO to become ready!")
end
end
return @selector.io_wait(fiber, io, events)
ensure
timer&.cancel!
end
if ::IO::Event::Support.buffer?
# Read from the specified IO into the buffer.
#
# @public Since *Async v2* and Ruby with `IO::Buffer` support.
# @asynchronous May be non-blocking.
#
# @parameter io [IO] The IO object to read from.
# @parameter buffer [IO::Buffer] The buffer to read into.
# @parameter length [Integer] The minimum number of bytes to read.
# @parameter offset [Integer] The offset within the buffer to read into.
def io_read(io, buffer, length, offset = 0)
fiber = Fiber.current
if timeout = io.timeout
timer = @timers.after(timeout) do
fiber.raise(::IO::TimeoutError, "Timeout (#{timeout}s) while waiting for IO to become readable!")
end
end
@selector.io_read(fiber, io, buffer, length, offset)
ensure
timer&.cancel!
end
if RUBY_ENGINE != "ruby" || RUBY_VERSION >= "3.3.1"
# Write the specified buffer to the IO.
#
# @public Since *Async v2* and *Ruby v3.3.1* with `IO::Buffer` support.
# @asynchronous May be non-blocking.
#
# @parameter io [IO] The IO object to write to.
# @parameter buffer [IO::Buffer] The buffer to write from.
# @parameter length [Integer] The minimum number of bytes to write.
# @parameter offset [Integer] The offset within the buffer to write from.
def io_write(io, buffer, length, offset = 0)
fiber = Fiber.current
if timeout = io.timeout
timer = @timers.after(timeout) do
fiber.raise(::IO::TimeoutError, "Timeout (#{timeout}s) while waiting for IO to become writable!")
end
end
@selector.io_write(fiber, io, buffer, length, offset)
ensure
timer&.cancel!
end
end
end
# Used to defer stopping the current task until later.
class FiberInterrupt
# Create a new stop later operation.
#
# @parameter task [Task] The task to stop later.
def initialize(fiber, exception)
@fiber = fiber
@exception = exception
end
# @returns [Boolean] Whether the task is alive.
def alive?
@fiber.alive?
end
# Transfer control to the operation - this will stop the task.
def transfer
# Fiber.blocking{$stderr.puts "FiberInterrupt#transfer(#{@fiber}, #{@exception})"}
@fiber.raise(@exception)
end
end
# Raise an exception on the specified fiber, waking up the event loop if necessary.
def fiber_interrupt(fiber, exception)
# Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"}
unblock(nil, FiberInterrupt.new(fiber, exception))
end
# Wait for the specified process ID to exit.
#
# @public Since *Async v2*.
# @asynchronous May be non-blocking.
#
# @parameter pid [Integer] The process ID to wait for.
# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
# @returns [Process::Status] A process status instance.
# @asynchronous May be non-blocking..
def process_wait(pid, flags)
return @selector.process_wait(Fiber.current, pid, flags)
end
# Wait for the specified IOs to become ready for the specified events.
#
# @public Since *Async v2.25*.
# @asynchronous May be non-blocking.
def io_select(...)
Thread.new do
# Don't make unnecessary output, since we will propagate the exception:
Thread.current.report_on_exception = false
::IO.select(...)
end.value
end
# Run one iteration of the event loop.
#
# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
#
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
private def run_once!(timeout = nil)
start_time = Async::Clock.now
interval = @timers.wait_interval
# If there is no interval to wait (thus no timers), and no tasks, we could be done:
if interval.nil?
# Allow the user to specify a maximum interval if we would otherwise be sleeping indefinitely:
interval = timeout
elsif interval < 0
# We have timers ready to fire, don't sleep in the selctor:
interval = 0
elsif timeout and interval > timeout
interval = timeout
end
begin
@selector.select(interval)
rescue Errno::EINTR
# Ignore.
end
@timers.fire
# Compute load:
end_time = Async::Clock.now
total_duration = end_time - start_time
idle_duration = @selector.idle_duration
busy_duration = total_duration - idle_duration
@busy_time += busy_duration
@idle_time += idle_duration
# The reactor still has work to do:
return true
end
# Run one iteration of the event loop.
#
# @public Since *Async v1*.
# @asynchronous Must be invoked from blocking (root) fiber.
#
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
if self.finished?
self.stop
end
# If we are finished, we stop the task tree and exit:
if @children.nil?
return false
end
return run_once!(timeout)
end
# Checks and clears the interrupted state of the scheduler.
#
# @returns [Boolean] Whether the reactor has been interrupted.
private def interrupted?
if @interrupted
@interrupted = false
return true
end
if Thread.pending_interrupt?
return true
end
return false
end
# Stop all children, including transient children.
#
# @public Since *Async v1*.
def stop
@children&.each do |child|
child.stop
end
end
private def run_loop(&block)
interrupt = nil
begin
# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
Thread.handle_interrupt(::SignalException => :never) do
until self.interrupted?
# If we are finished, we need to exit:
break unless yield
end
end
rescue Interrupt => interrupt
# If an interrupt did occur during an iteration of the event loop, we need to handle it. More specifically, `self.stop` is not safe to interrupt without potentially corrupting the task tree.
Thread.handle_interrupt(::SignalException => :never) do
Console.debug(self) do |buffer|
buffer.puts "Scheduler interrupted: #{interrupt.inspect}"
self.print_hierarchy(buffer)
end
self.stop
end
retry
end
# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
if interrupt
Kernel.raise(interrupt)
end
end
# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
#
# Forwards all parameters to {#async} if a block is given.
#
# @public Since *Async v1*.
#
# @yields {|task| ...} The top level task, if a block is given.
# @returns [Task] The initial task that was scheduled into the reactor.
def run(...)
Kernel.raise ClosedError if @selector.nil?
begin
@profiler&.start
initial_task = self.async(...) if block_given?
self.run_loop do
run_once
end
return initial_task
ensure
@profiler&.stop
end
end
# Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.
#
# @public Since *Async v1*.
# @asynchronous May context switch immediately to new task.
# @deprecated Use {#run} or {Task#async} instead.
#
# @yields {|task| ...} Executed within the task.
# @returns [Task] The task that was scheduled into the reactor.
def async(*arguments, **options, &block)
# Since this method is called by `run`, this warning is too excessive:
# warn("Async::Scheduler#async is deprecated. Use `run` or `Task#async` instead.", uplevel: 1, category: :deprecated) if $VERBOSE
Kernel.raise ClosedError if @selector.nil?
task = Task.new(Task.current? || self, **options, &block)
task.run(*arguments)
return task
end
# Create a new fiber and return it without starting execution.
# @returns [Fiber] The fiber that was created.
def fiber(...)
return async(...).fiber
end
# Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
#
# @public Since *Async v1*.
# @asynchronous May raise an exception at any interruption point (e.g. blocking operations).
#
# @parameter duration [Numeric] The time in seconds, in which the task should complete.
# @parameter exception [Class] The exception class to raise.
# @parameter message [String] The message to pass to the exception.
# @yields {|timeout| ...} The block to execute with a timeout.
def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
fiber = Fiber.current
timer = @timers.after(duration) do
if fiber.alive?
fiber.raise(exception, message)
end
end
if block.arity.zero?
yield
else
yield Timeout.new(@timers, timer)
end
ensure
timer&.cancel!
end
# Invoke the block, but after the specified timeout, raise the specified exception with the given message. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
#
# @public Since *Async v1* and *Ruby v3.1*. May be invoked from `Timeout.timeout`.
# @asynchronous May raise an exception at any interruption point (e.g. blocking operations).
#
# @parameter duration [Numeric] The time in seconds, in which the task should complete.
# @parameter exception [Class] The exception class to raise.
# @parameter message [String] The message to pass to the exception.
# @yields {|duration| ...} The block to execute with a timeout.
def timeout_after(duration, exception, message, &block)
with_timeout(duration, exception, message) do
yield duration
end
end
# Handle fork in the child process. This method is called automatically when `Process.fork` is invoked on Ruby versions < 4 and cleans up the scheduler state. On Ruby 4+, the scheduler is automatically cleaned up by the Ruby runtime.
#
# The child process starts with a clean slate - no scheduler is set. Users can create a new scheduler if needed.
#
# @public Since *Async v2.35*.
def process_fork
@profiler = nil
@children = nil
@selector = nil
@timers = nil
# Close the scheduler:
Fiber.set_scheduler(nil)
end
end
end
|