1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
# Best Practices
This guide gives an overview of best practices for using Async.
## Use a top-level `Sync` to denote the root of your program
`Async{}` has two uses: it creates an event loop if one doesn't exist, and it creates a task which runs asynchronously with respect to the parent scope. However, the top level `Async{}` block will be synchronous because it creates the event loop. In some programs, you do not care about executing asynchronously, but you still want your code to run in an event loop. `Sync{}` exists to do this efficiently.
```ruby
require "async"
class Packages
def initialize(urls)
@urls = urls
end
def fetch
# A common use case is to make functions which appear synchronous, but internally use asynchronous execution:
Sync do |task|
@urls.map do |url|
task.async do
fetch(url)
end
end.map(&:wait)
end
end
end
```
`Sync{...}` is semantically equivalent to `Async{}.wait`, but it is more efficient. It is the preferred way to run code in an event loop at the top level of your program or to ensure some code runs in an event loop without creating a new task. The name `Sync` means "Synchronous Async", indicating that it runs synchronously with respect to the outer scope, but still allows for asynchronous execution within it.
### Current Task
In some scenarios, it can be invalid to call a method outside of an event loop, for example a top level `Async{...}` can block forever, which might be unexpected.
```ruby
def wait(queue)
Async do
queue.pop
end
end
```
You can force callers of a method to only call the method within an asynchronous context by using a keyword argument `parent: Async::Task.current`. If no task is present, this will raise an exception.
```ruby
def wait(queue, parent: Async::Task.current)
parent.async do
queue.pop
end
end
```
This expresses the intent to the caller that this method should only be invoked from within an asynchonous task. In addition, it allows the caller to substitute other parent objects, like semaphores or barriers, which can be useful for managing concurrency.
## Use barriers to manage unbounded concurrency
Barriers provide a way to manage an unbounded number of tasks. The top-level `Barrier` method creates a barrier with built-in load management using an `Async::Idler`.
```ruby
Barrier do |barrier|
items.each do |item|
barrier.async do
process(item)
end
end
end
```
The barrier will automatically wait for all tasks to complete and stop any outstanding tasks when the block exits. By default, it uses an `Async::Idler` to prevent system overload by scheduling tasks when the system load is below 80%.
If you want to process tasks in order of completion, you can explicitly call `wait` with a block:
```ruby
Barrier do |barrier|
items.each do |item|
barrier.async do
process(item)
end
end
# Process the tasks in order of completion:
barrier.wait do |task|
result = task.wait
# Do something with result.
# If you don't want to wait for any more tasks you can break:
break
end
end
```
To disable load management (not recommended for unbounded concurrency), you can pass `parent: nil`:
```ruby
Barrier(parent: nil) do |barrier|
# No load management - creates tasks as fast as possible
items.each do |item|
barrier.async do
process(item)
end
end
end
```
## Use a semaphore to limit the number of concurrent tasks
Semaphores allow you to limit the level of concurrency to a fixed number of tasks. When using semaphores with barriers, the barrier should be the root of your task hierarchy, and the semaphore should be a child of the barrier:
```ruby
Barrier(parent: nil) do |barrier|
semaphore = Async::Semaphore.new(4, parent: barrier)
items.each do |item|
semaphore.async do
process(item)
end
end
end
```
In this example, we use `parent: nil` for the barrier to disable load management, since the semaphore already provides concurrency control. The semaphore limits execution to 4 concurrent tasks, and the barrier ensures all tasks are stopped when the block exits.
### Idler
Idlers are like semaphores but with a limit defined by current processor utilization. In other words, an idler will schedule work up to a specific ratio of idle/busy time in the scheduler.
The top-level `Barrier` method uses an idler by default, making it safe for unbounded concurrency:
```ruby
Barrier do |barrier| # Uses Async::Idler.new(0.8) by default
work.each do |work|
barrier.async do
work.call
end
end
end
```
You can also use an idler directly without a barrier:
```ruby
Async do
# Create an idler that will aim for a load average of 80%:
idler = Async::Idler.new(0.8)
# Some list of work to be done:
work.each do |work|
idler.async do
# Do the work:
work.call
end
end
end
```
The idler will try to schedule as much work such that the load of the scheduler stays at around 80% saturation.
## Use queues to share data between tasks
Queues allow you to share data between tasks without the risk of data corruption or deadlocks.
```ruby
Async do |task|
queue = Async::Queue.new
reader = task.async do
while chunk = socket.gets
queue.push(chunk)
end
# After this point, we won't be able to add items to the queue, and popping items will eventually result in nil once all items are dequeued:
queue.close
end
# Process items from the queue:
while line = queue.pop
process(line)
end
end
```
The above program may have unbounded memory use, so it can be a good idea to use a limited queue with back-pressure:
```ruby
Async do |task|
queue = Async::LimitedQueue.new(8)
# Everything else is the same from the queue example, except that the pushing onto the queue will block once 8 items are buffered.
end
```
## Use timeouts for operations that might block forever
General timeouts can be imposed by using `task.with_timeout(duration)`.
```ruby
Async do |task|
# This will raise an Async::TimeoutError after 1 second:
task.with_timeout(1) do |timeout|
# Timeout#duration= can be used to adjust the duration of the timeout.
# Timeout#cancel can be used to cancel the timeout completely.
sleep 10
end
end
```
It can be especially important to impose timeouts when processing user-provided data.
|