File: tasks.md

package info (click to toggle)
ruby-async 2.36.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 400 kB
  • sloc: ruby: 1,938; makefile: 4
file content (453 lines) | stat: -rw-r--r-- 14,898 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# Tasks

This guide explains how asynchronous tasks work and how to use them.

## Overview

Tasks are the smallest unit of sequential code execution in {ruby Async}. Tasks can create other tasks, and Async tracks the parent-child relationship between tasks. When a parent task is stopped, it will also stop all its children tasks. The reactor always starts with one root task.

```mermaid
graph LR
    R[Reactor] --> WS
    WS[Web Server Task] --> R1[Request 1 Task]
    WS --> R2[Request 2 Task]

    R1 --> Q1[Database Query Task]
    R1 --> H1[HTTP Client Request Task]

    R2 --> H2[HTTP Client Request Task]
    R2 --> H3[HTTP Client Request Task]
```

### How are they different from fibers?

A fiber is a lightweight unit of execution that can be suspended and resumed at specific points. After a fiber is suspended, it can be resumed later at the same point with the same execution state. Because only one fiber can execute at a time, they are often referred to as a mechanism for cooperative concurrency.

A task provides extra functionality on top of fibers. A task behaves like a promise: it either succeeds with a value or fails with an exception. Tasks keep track of their parent-child relationships, and when a parent task is stopped, it will also stop all its children tasks. This makes it easier to create complex programs with many concurrent tasks.

### Why does Async manipulate tasks and not fibers?

The {ruby Async::Scheduler} actually works directly with fibers for most operations and isn't aware of tasks. However, the reactor does maintain a tree of tasks for the purpose of managing task and reactor life-cycle. For example, stopping a parent task will stop all its children tasks, and the reactor will exit when all tasks are finished.

## Task Lifecycle

Tasks represent units of work which are executed according to the following state transition diagram:

```mermaid
stateDiagram-v2
    [*] --> initialized : Task.new
    initialized --> running : run
    
    running --> failed : unhandled StandardError-derived exception
    running --> complete : user code finished
    running --> stopped : stop

    initialized --> stopped : stop
    
    failed --> [*]
    complete --> [*]
    stopped --> [*]
```

Tasks are created in the `initialized` state, and are run by the reactor. During the execution, a task can either `complete` successfully, become `failed` with an unhandled `StandardError`-derived exception, or be explicitly `stopped`. In all of these cases, you can wait for a task to complete by using {ruby Async::Task#wait}.

1. In the case the task successfully completed, the result will be whatever value was generated by the last expression in the task.
2. In the case the task failed with an unhandled `StandardError`-derived exception, waiting on the task will re-raise the exception.
3. In the case the task was stopped, the result will be `nil`.

## Starting A Task

At any point in your program, you can start a reactor and a root task using the {ruby Kernel::Async} method:

```ruby
Async do
	1.upto(3) do |i|
		sleep(i)
		puts "Hello World #{i}"
	end
end
```

This program prints "Hello World" 3 times. Before printing, it sleeps for 1, then 2, then 3 seconds. The total execution time is 6 seconds because the program executes sequentially.

By using a nested task, we can ensure that each iteration of the loop creates a new task which runs concurrently.

```ruby
Async do
	1.upto(3) do |i|
		Async do
			sleep(i)
			puts "Hello World #{i}"
		end
	end
end
```

Instead of taking 6 seconds, this program takes 3 seconds in total. The main loop executes, rapidly creating 3 child tasks, and then each child task sleeps for 1, 2 and 3 seconds respectively before printing "Hello World".

```mermaid
graph LR
	R[Reactor] --> TT[Initial Task]
	
	TT --> H0[Hello World 0 Task]
	TT --> H1[Hello World 1 Task]
	TT --> H2[Hello World 2 Task]
```

By constructing your program correctly, it's easy to implement concurrent map-reduce:

```ruby
Async do
	# Map (create several concurrent tasks)
	users_size = Async{User.size}
	posts_size = Async{Post.size}
	
	# Reduce (wait for and merge the results)
	average = posts_size.wait / users_size.wait
	puts "#{users_size.wait} users created #{average} posts on average."
end
```

### Performance Considerations

Task creation and execution has been heavily optimised. Do not trade program complexity to avoid creating tasks; the cost will almost always exceed the gain.

Do consider using correct concurrency primatives like {ruby Async::Semaphore}, {ruby Async::Barrier}, etc, to ensure your program is well-behaved in the presence of large inputs (i.e. don't create an unbounded number of tasks).

## Starting a Limited Number of Tasks

When processing potentially unbounded data, you may want to limit the concurrency using {ruby Async::Semaphore}.

```ruby
Async do
	# Create a semaphore with a limit of 2:
	semaphore = Async::Semaphore.new(2)
	
	file.each_line do |line|
		semaphore.async do
			# Only two tasks at most will be allowed to execute concurrently:
			process(line)
		end
	end
end
```

## Waiting for Tasks

Waiting for a single task is trivial: simply invoke {ruby Async::Task#wait}. To wait for multiple tasks, you may either {ruby Async::Task#wait} on each in turn, or you may want to use a {ruby Async::Barrier}. You can use {ruby Async::Barrier#async} to create multiple child tasks, and wait for them all to complete using {ruby Async::Barrier#wait}.

```ruby
barrier = Async::Barrier.new

Async do
	jobs.each do |job|
		barrier.async do
			# ... process job ...
		end
	end
	
	# Wait for all jobs to complete:
	barrier.wait
end
```

### Waiting for the First N Tasks

Occasionally, you may need to just wait for the first task (or first several tasks) to complete. You can use a combination of {ruby Async::Waiter} and {ruby Async::Barrier} for controlling this:

```ruby
Async do
	barrier = Async::Barrier.new
	
	begin
		jobs.each do |job|
			barrier.async do
				# ... process job ...
			end
		end
		
		# Wait for the first two jobs to complete:
		done = []
		barrier.wait do |task|
			done << task.wait
			
			# If you don't want to wait for any more tasks you can break:
			break if done.size >= 2
		end
	ensure
		# The remainder of the tasks will be stopped:
		barrier.stop
	end
end
```

### Combining a Barrier with a Semaphore

{ruby Async::Barrier} and {ruby Async::Semaphore} are designed to be compatible with each other, and with other tasks that nest `#async` invocations. There are other similar situations where you may want to pass in a parent task, e.g. {ruby Async::IO::Endpoint#bind}.

~~~ ruby
barrier = Async::Barrier.new
semaphore = Async::Semaphore.new(2, parent: barrier)

begin
	jobs.each do |job|
		semaphore.async do
			# ... process job ...
		end
	end

	# Wait until all jobs are done:
	barrier.wait
ensure
	# Stop any remaining jobs:
	barrier.stop
end
~~~

## Stopping a Task

When a task completes execution, it will enter the `complete` state (or the `failed` state if it raises an unhandled exception).

There are various situations where you may want to stop a task ({ruby Async::Task#stop}) before it completes. The most common case is shutting down a server. A more complex example is this: you may fan out multiple (10s, 100s) of requests, wait for a subset to complete (e.g. the first 5 or all those that complete within a given deadline), and then stop (terminate/cancel) the remaining operations.

Using the above program as an example, let's stop all the tasks just after the first one completes.

```ruby
Async do
	tasks = 3.times.map do |i|
		Async do
			sleep(i)
			puts "Hello World #{i}"
		end
	end
	
	# Stop all the above tasks:
	tasks.each(&:stop)
end
```

### Stopping all Tasks held in a Barrier

To stop (terminate/cancel) all the tasks held in a barrier:

```ruby
barrier = Async::Barrier.new

Async do
	tasks = 3.times.map do |i|
		barrier.async do
			sleep(i)
			puts "Hello World #{i}"
		end
	end
	
	barrier.stop
end
```

Unless your tasks all rescue and suppresses `StandardError`-derived exceptions, be sure to call ({ruby Async::Barrier#stop}) to stop the remaining tasks:

```ruby
barrier = Async::Barrier.new

Async do
	tasks = 3.times.map do |i|
		barrier.async do
			sleep(i)
			puts "Hello World #{i}"
		end
	end
	
	begin
		barrier.wait
	ensure
		barrier.stop
	end
end
```

## Resource Management

In order to ensure your resources are cleaned up correctly, make sure you wrap resources appropriately, e.g.:

~~~ ruby
Async do
	begin
		socket = connect(remote_address) # May raise Async::Stop

		socket.write(...) # May raise Async::Stop
		socket.read(...) # May raise Async::Stop
	ensure
		socket.close if socket
	end
end
~~~

As tasks run synchronously until they yield back to the reactor, you can guarantee this model works correctly. While in theory `IO#autoclose` allows you to automatically close file descriptors when they go out of scope via the GC, it may produce unpredictable behavour (exhaustion of file descriptors, flushing data at odd times), so it's not recommended.

## Exception Handling

{ruby Async::Task} captures and logs exceptions. All unhandled exceptions will cause the enclosing task to enter the `:failed` state. Non-`StandardError` exceptions are re-raised immediately and will cause the reactor to exit. This ensures that exceptions will always be visible and cause the program to fail appropriately.

~~~ ruby
require 'async'

task = Async do
	# Exception will be logged and task will be failed.
	raise "Boom"
end

puts task.status # failed
puts task.wait # raises RuntimeError: Boom
~~~

### Propagating Exceptions

If a task has finished due to an exception, calling `Task#wait` will re-raise the exception.

~~~ ruby
require 'async'

Async do
	task = Async do
		raise "Boom"
	end
	
	begin
		task.wait # Re-raises above exception.
	rescue
		puts "It went #{$!}!"
	end
end
~~~

## Timeouts

You can wrap asynchronous operations in a timeout. This allows you to put an upper bound on how long the enclosed code will run vs. potentially blocking indefinitely. If the enclosed code hasn't completed by the timeout, it will be interrupted with an {ruby Async::TimeoutError} exception.

~~~ ruby
require 'async'

Async do |task|
	task.with_timeout(1) do
		sleep(100)
	rescue Async::TimeoutError
		puts "I timed out 99 seconds early!"
	end
end
~~~

### Periodic Timers

Sometimes you need to do some recurring work in a loop. Often it's best to measure the periodic delay end-to-start, so that your process always takes a break between iterations and doesn't risk spending 100% of its time on the periodic work. In this case, simply call {ruby sleep} between iterations:

~~~ ruby
require 'async'

period = 30

Async do |task|
	loop do
		puts Time.now
		# ... process job ...
		sleep(period)
	end
end
~~~

If you need a periodic timer that runs start-to-start, you can keep track of the `run_next` time using the monotonic clock:

~~~ ruby
require 'async'

period = 30

Async do |task|
  run_next = Async::Clock.now
	loop do
		run_next += period
		puts Time.now
		# ... process job ...
		if (remaining = run_next - Async::Clock.now) > 0
		  sleep(remaining)
		end
	end
end
~~~

## Reactor Lifecycle

Generally, the reactor's event loop will not exit until all tasks complete. This is informed by {ruby Async::Task#finished?} which checks if the current node has completed execution, which also includes all children. However, there is one exception to this rule: tasks flagged as being `transient` ({ruby Async::Node#transient?}).

### Transient Tasks

Tasks which are flagged as `transient` are identical to normal tasks, except for one key difference: they do not keep the reactor alive. They are useful for operations which are not directly related to application concurrency, but are instead an implementation detail of the application. For example, a task which is monitoring and maintaining a connection pool, pruning unused connections or possibly ensuring those connections are periodically checked for activity (ping/pong, etc). If all *other* tasks are completed, and only transient tasks remain at the root of the reactor, the reactor should exit.

#### How To Create Transient Tasks

Specify the `transient` option when creating a task:

```ruby
@pruner = Async(transient: true) do
	loop do	
		sleep(1)
		prune_connection_pool
	end
end
```

Transient tasks are similar to normal tasks, except for the following differences:

1. They are not considered by {ruby Async::Task#finished?}, so they will not keep the reactor alive. Instead, they are stopped (with a {ruby Async::Stop} exception) when all other (non-transient) tasks are finished.
2. As soon as a parent task is finished, any transient child tasks will be moved up to be children of the parent's parent. This ensures that they never keep a sub-tree alive.
3. Similarly, if you `stop` a task, any transient child tasks will be moved up the tree as above rather than being stopped.

The purpose of transient tasks is when a task is an implementation detail of an object or instance, rather than a concurrency process. Some examples of transient tasks:

- A task which is reading or writing data on behalf of a stateful connection object, e.g. HTTP/2 frame reader, Redis cache invalidation, etc.
- A task which is monitoring and maintaining a connection pool, pruning unused connections or possibly ensuring those connections are periodically checked for activity (ping/pong, etc).
- A background worker or batch processing job which is independent of any specific operation, and is lazily created.
- A cache system which needs periodic expiration / revalidation of data/values.

Here is an example that keeps a cache of the current time string since that has only 1-second granularity
and you could be handling 1000s of requests per second.
The task doing the updating in the background is an implementation detail, so it is marked as `transient`.

```ruby
require "async"
require "thread/local" # thread-local gem.

class TimeStringCache
	extend Thread::Local # defines `instance` class method that lazy-creates a separate instance per thread
	
	def initialize
		@current_time_string = nil
	end
	
	def current_time_string
		refresh!
		
		return @current_time_string
	end
	
	private
	
	def refresh!
		@refresh ||= Async(transient: true) do
			loop do
				@current_time_string = Time.now.to_s
				sleep(1)
			end
		ensure
			# When the reactor terminates all tasks, `Async::Stop` will be raised from `sleep` and  this code will be invoked. By clearing `@refresh`, we ensure that the task will be recreated if needed again:
			@refresh = nil
		end
	end
end

Async do
	p TimeStringCache.instance.current_time_string
end
```

Upon exiting the top level async block, the {ruby @refresh} task will be set to `nil`. Bear in mind, you should not share these resources across threads; doing so would need some form of mutual exclusion.