1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
|
# Tasks
This guide explains how asynchronous tasks work and how to use them.
## Overview
Tasks are the smallest unit of sequential code execution in {ruby Async}. Tasks can create other tasks, and Async tracks the parent-child relationship between tasks. When a parent task is stopped, it will also stop all its children tasks. The reactor always starts with one root task.
```mermaid
graph LR
R[Reactor] --> WS
WS[Web Server Task] --> R1[Request 1 Task]
WS --> R2[Request 2 Task]
R1 --> Q1[Database Query Task]
R1 --> H1[HTTP Client Request Task]
R2 --> H2[HTTP Client Request Task]
R2 --> H3[HTTP Client Request Task]
```
### How are they different from fibers?
A fiber is a lightweight unit of execution that can be suspended and resumed at specific points. After a fiber is suspended, it can be resumed later at the same point with the same execution state. Because only one fiber can execute at a time, they are often referred to as a mechanism for cooperative concurrency.
A task provides extra functionality on top of fibers. A task behaves like a promise: it either succeeds with a value or fails with an exception. Tasks keep track of their parent-child relationships, and when a parent task is stopped, it will also stop all its children tasks. This makes it easier to create complex programs with many concurrent tasks.
### Why does Async manipulate tasks and not fibers?
The {ruby Async::Scheduler} actually works directly with fibers for most operations and isn't aware of tasks. However, the reactor does maintain a tree of tasks for the purpose of managing task and reactor life-cycle. For example, stopping a parent task will stop all its children tasks, and the reactor will exit when all tasks are finished.
## Task Lifecycle
Tasks represent units of work which are executed according to the following state transition diagram:
```mermaid
stateDiagram-v2
[*] --> initialized : Task.new
initialized --> running : run
running --> failed : unhandled StandardError-derived exception
running --> complete : user code finished
running --> stopped : stop
initialized --> stopped : stop
failed --> [*]
complete --> [*]
stopped --> [*]
```
Tasks are created in the `initialized` state, and are run by the reactor. During the execution, a task can either `complete` successfully, become `failed` with an unhandled `StandardError`-derived exception, or be explicitly `stopped`. In all of these cases, you can wait for a task to complete by using {ruby Async::Task#wait}.
1. In the case the task successfully completed, the result will be whatever value was generated by the last expression in the task.
2. In the case the task failed with an unhandled `StandardError`-derived exception, waiting on the task will re-raise the exception.
3. In the case the task was stopped, the result will be `nil`.
## Starting A Task
At any point in your program, you can start a reactor and a root task using the {ruby Kernel::Async} method:
```ruby
Async do
1.upto(3) do |i|
sleep(i)
puts "Hello World #{i}"
end
end
```
This program prints "Hello World" 3 times. Before printing, it sleeps for 1, then 2, then 3 seconds. The total execution time is 6 seconds because the program executes sequentially.
By using a nested task, we can ensure that each iteration of the loop creates a new task which runs concurrently.
```ruby
Async do
1.upto(3) do |i|
Async do
sleep(i)
puts "Hello World #{i}"
end
end
end
```
Instead of taking 6 seconds, this program takes 3 seconds in total. The main loop executes, rapidly creating 3 child tasks, and then each child task sleeps for 1, 2 and 3 seconds respectively before printing "Hello World".
```mermaid
graph LR
R[Reactor] --> TT[Initial Task]
TT --> H0[Hello World 0 Task]
TT --> H1[Hello World 1 Task]
TT --> H2[Hello World 2 Task]
```
By constructing your program correctly, it's easy to implement concurrent map-reduce:
```ruby
Async do
# Map (create several concurrent tasks)
users_size = Async{User.size}
posts_size = Async{Post.size}
# Reduce (wait for and merge the results)
average = posts_size.wait / users_size.wait
puts "#{users_size.wait} users created #{average} posts on average."
end
```
### Performance Considerations
Task creation and execution has been heavily optimised. Do not trade program complexity to avoid creating tasks; the cost will almost always exceed the gain.
Do consider using correct concurrency primatives like {ruby Async::Semaphore}, {ruby Async::Barrier}, etc, to ensure your program is well-behaved in the presence of large inputs (i.e. don't create an unbounded number of tasks).
## Starting a Limited Number of Tasks
When processing potentially unbounded data, you may want to limit the concurrency using {ruby Async::Semaphore}.
```ruby
Async do
# Create a semaphore with a limit of 2:
semaphore = Async::Semaphore.new(2)
file.each_line do |line|
semaphore.async do
# Only two tasks at most will be allowed to execute concurrently:
process(line)
end
end
end
```
## Waiting for Tasks
Waiting for a single task is trivial: simply invoke {ruby Async::Task#wait}. To wait for multiple tasks, you may either {ruby Async::Task#wait} on each in turn, or you may want to use a {ruby Async::Barrier}. You can use {ruby Async::Barrier#async} to create multiple child tasks, and wait for them all to complete using {ruby Async::Barrier#wait}.
```ruby
barrier = Async::Barrier.new
Async do
jobs.each do |job|
barrier.async do
# ... process job ...
end
end
# Wait for all jobs to complete:
barrier.wait
end
```
### Waiting for the First N Tasks
Occasionally, you may need to just wait for the first task (or first several tasks) to complete. You can use a combination of {ruby Async::Waiter} and {ruby Async::Barrier} for controlling this:
```ruby
Async do
barrier = Async::Barrier.new
begin
jobs.each do |job|
barrier.async do
# ... process job ...
end
end
# Wait for the first two jobs to complete:
done = []
barrier.wait do |task|
done << task.wait
# If you don't want to wait for any more tasks you can break:
break if done.size >= 2
end
ensure
# The remainder of the tasks will be stopped:
barrier.stop
end
end
```
### Combining a Barrier with a Semaphore
{ruby Async::Barrier} and {ruby Async::Semaphore} are designed to be compatible with each other, and with other tasks that nest `#async` invocations. There are other similar situations where you may want to pass in a parent task, e.g. {ruby Async::IO::Endpoint#bind}.
~~~ ruby
barrier = Async::Barrier.new
semaphore = Async::Semaphore.new(2, parent: barrier)
begin
jobs.each do |job|
semaphore.async do
# ... process job ...
end
end
# Wait until all jobs are done:
barrier.wait
ensure
# Stop any remaining jobs:
barrier.stop
end
~~~
## Stopping a Task
When a task completes execution, it will enter the `complete` state (or the `failed` state if it raises an unhandled exception).
There are various situations where you may want to stop a task ({ruby Async::Task#stop}) before it completes. The most common case is shutting down a server. A more complex example is this: you may fan out multiple (10s, 100s) of requests, wait for a subset to complete (e.g. the first 5 or all those that complete within a given deadline), and then stop (terminate/cancel) the remaining operations.
Using the above program as an example, let's stop all the tasks just after the first one completes.
```ruby
Async do
tasks = 3.times.map do |i|
Async do
sleep(i)
puts "Hello World #{i}"
end
end
# Stop all the above tasks:
tasks.each(&:stop)
end
```
### Stopping all Tasks held in a Barrier
To stop (terminate/cancel) all the tasks held in a barrier:
```ruby
barrier = Async::Barrier.new
Async do
tasks = 3.times.map do |i|
barrier.async do
sleep(i)
puts "Hello World #{i}"
end
end
barrier.stop
end
```
Unless your tasks all rescue and suppresses `StandardError`-derived exceptions, be sure to call ({ruby Async::Barrier#stop}) to stop the remaining tasks:
```ruby
barrier = Async::Barrier.new
Async do
tasks = 3.times.map do |i|
barrier.async do
sleep(i)
puts "Hello World #{i}"
end
end
begin
barrier.wait
ensure
barrier.stop
end
end
```
## Resource Management
In order to ensure your resources are cleaned up correctly, make sure you wrap resources appropriately, e.g.:
~~~ ruby
Async do
begin
socket = connect(remote_address) # May raise Async::Stop
socket.write(...) # May raise Async::Stop
socket.read(...) # May raise Async::Stop
ensure
socket.close if socket
end
end
~~~
As tasks run synchronously until they yield back to the reactor, you can guarantee this model works correctly. While in theory `IO#autoclose` allows you to automatically close file descriptors when they go out of scope via the GC, it may produce unpredictable behavour (exhaustion of file descriptors, flushing data at odd times), so it's not recommended.
## Exception Handling
{ruby Async::Task} captures and logs exceptions. All unhandled exceptions will cause the enclosing task to enter the `:failed` state. Non-`StandardError` exceptions are re-raised immediately and will cause the reactor to exit. This ensures that exceptions will always be visible and cause the program to fail appropriately.
~~~ ruby
require 'async'
task = Async do
# Exception will be logged and task will be failed.
raise "Boom"
end
puts task.status # failed
puts task.wait # raises RuntimeError: Boom
~~~
### Propagating Exceptions
If a task has finished due to an exception, calling `Task#wait` will re-raise the exception.
~~~ ruby
require 'async'
Async do
task = Async do
raise "Boom"
end
begin
task.wait # Re-raises above exception.
rescue
puts "It went #{$!}!"
end
end
~~~
## Timeouts
You can wrap asynchronous operations in a timeout. This allows you to put an upper bound on how long the enclosed code will run vs. potentially blocking indefinitely. If the enclosed code hasn't completed by the timeout, it will be interrupted with an {ruby Async::TimeoutError} exception.
~~~ ruby
require 'async'
Async do |task|
task.with_timeout(1) do
sleep(100)
rescue Async::TimeoutError
puts "I timed out 99 seconds early!"
end
end
~~~
### Periodic Timers
Sometimes you need to do some recurring work in a loop. Often it's best to measure the periodic delay end-to-start, so that your process always takes a break between iterations and doesn't risk spending 100% of its time on the periodic work. In this case, simply call {ruby sleep} between iterations:
~~~ ruby
require 'async'
period = 30
Async do |task|
loop do
puts Time.now
# ... process job ...
sleep(period)
end
end
~~~
If you need a periodic timer that runs start-to-start, you can keep track of the `run_next` time using the monotonic clock:
~~~ ruby
require 'async'
period = 30
Async do |task|
run_next = Async::Clock.now
loop do
run_next += period
puts Time.now
# ... process job ...
if (remaining = run_next - Async::Clock.now) > 0
sleep(remaining)
end
end
end
~~~
## Reactor Lifecycle
Generally, the reactor's event loop will not exit until all tasks complete. This is informed by {ruby Async::Task#finished?} which checks if the current node has completed execution, which also includes all children. However, there is one exception to this rule: tasks flagged as being `transient` ({ruby Async::Node#transient?}).
### Transient Tasks
Tasks which are flagged as `transient` are identical to normal tasks, except for one key difference: they do not keep the reactor alive. They are useful for operations which are not directly related to application concurrency, but are instead an implementation detail of the application. For example, a task which is monitoring and maintaining a connection pool, pruning unused connections or possibly ensuring those connections are periodically checked for activity (ping/pong, etc). If all *other* tasks are completed, and only transient tasks remain at the root of the reactor, the reactor should exit.
#### How To Create Transient Tasks
Specify the `transient` option when creating a task:
```ruby
@pruner = Async(transient: true) do
loop do
sleep(1)
prune_connection_pool
end
end
```
Transient tasks are similar to normal tasks, except for the following differences:
1. They are not considered by {ruby Async::Task#finished?}, so they will not keep the reactor alive. Instead, they are stopped (with a {ruby Async::Stop} exception) when all other (non-transient) tasks are finished.
2. As soon as a parent task is finished, any transient child tasks will be moved up to be children of the parent's parent. This ensures that they never keep a sub-tree alive.
3. Similarly, if you `stop` a task, any transient child tasks will be moved up the tree as above rather than being stopped.
The purpose of transient tasks is when a task is an implementation detail of an object or instance, rather than a concurrency process. Some examples of transient tasks:
- A task which is reading or writing data on behalf of a stateful connection object, e.g. HTTP/2 frame reader, Redis cache invalidation, etc.
- A task which is monitoring and maintaining a connection pool, pruning unused connections or possibly ensuring those connections are periodically checked for activity (ping/pong, etc).
- A background worker or batch processing job which is independent of any specific operation, and is lazily created.
- A cache system which needs periodic expiration / revalidation of data/values.
Here is an example that keeps a cache of the current time string since that has only 1-second granularity
and you could be handling 1000s of requests per second.
The task doing the updating in the background is an implementation detail, so it is marked as `transient`.
```ruby
require "async"
require "thread/local" # thread-local gem.
class TimeStringCache
extend Thread::Local # defines `instance` class method that lazy-creates a separate instance per thread
def initialize
@current_time_string = nil
end
def current_time_string
refresh!
return @current_time_string
end
private
def refresh!
@refresh ||= Async(transient: true) do
loop do
@current_time_string = Time.now.to_s
sleep(1)
end
ensure
# When the reactor terminates all tasks, `Async::Stop` will be raised from `sleep` and this code will be invoked. By clearing `@refresh`, we ensure that the task will be recreated if needed again:
@refresh = nil
end
end
end
Async do
p TimeStringCache.instance.current_time_string
end
```
Upon exiting the top level async block, the {ruby @refresh} task will be set to `nil`. Bear in mind, you should not share these resources across threads; doing so would need some form of mutual exclusion.
|