1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
|
## Examples
The simplest example is to use the actor as an asynchronous execution.
Although, `Promises.future { 1 + 1 }` is better suited for that purpose.
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread, name: 'addition') { 1 + 1 }
# => #<Concurrent::ErlangActor::Pid:0x000002 addition running>
actor.terminated.value! # => 2
```
Let's send some messages and maintain some internal state
which is what actors are good for.
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread, name: 'sum') do
sum = 0 # internal state
# receive and sum the messages until the actor gets :done
while true
message = receive
break if message == :done
# if the message is asked and not only told,
# reply with the current sum (has no effect if actor was not asked)
reply sum += message
end
# The final value of the actor
sum
end
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
```
The actor can be either told a message asynchronously,
or asked. The ask method will block until actor replies.
```ruby
# tell returns immediately returning the actor
actor.tell(1).tell(1)
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
# blocks, waiting for the answer
actor.ask 10 # => 12
# stop the actor
actor.tell :done
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
# The final value of the actor
actor.terminated.value! # => 12
```
### Actor types
There are two types of actors.
The type is specified when calling spawn as a first argument,
`Concurrent::ErlangActor.spawn(type: :on_thread, ...` or
`Concurrent::ErlangActor.spawn(type: :on_pool, ...`.
The main difference is in how receive method returns.
- `:on_thread` it blocks the thread until message is available,
then it returns or calls the provided block first.
- However, `:on_pool` it has to free up the thread on the receive
call back to the pool. Therefore the call to receive ends the
execution of current scope. The receive has to be given block
or blocks that act as a continuations and are called
when there is message available.
Let's have a look at how the bodies of actors differ between the types:
```ruby
ping = Concurrent::ErlangActor.spawn(type: :on_thread) { reply receive }
# => #<Concurrent::ErlangActor::Pid:0x000004 running>
ping.ask 42 # => 42
```
It first calls receive, which blocks the thread of the actor.
When it returns the received message is passed an an argument to reply,
which replies the same value back to the ask method.
Then the actor terminates normally, because there is nothing else to do.
However when running on pool a block with code which should be evaluated
after the message is received has to be provided.
```ruby
ping = Concurrent::ErlangActor.spawn(type: :on_pool) { receive { |m| reply m } }
# => #<Concurrent::ErlangActor::Pid:0x000005 running>
ping.ask 42 # => 42
```
It starts by calling receive which will remember the given block for later
execution when a message is available and stops executing the current scope.
Later when a message becomes available the previously provided block is given
the message and called. The result of the block is the final value of the
normally terminated actor.
The direct blocking style of `:on_thread` is simpler to write and more straight
forward however it has limitations. Each `:on_thread` actor creates a Thread
taking time and resources.
There is also a limited number of threads the Ruby process can create
so you may hit the limit and fail to create more threads and therefore actors.
Since the `:on_pool` actor runs on a poll of threads, its creations
is faster and cheaper and it does not create new threads.
Therefore there is no limit (only RAM) on how many actors can be created.
To simplify, if you need only few actors `:on_thread` is fine.
However if you will be creating hundreds of actors or
they will be short-lived `:on_pool` should be used.
### Receiving messages
Simplest message receive.
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread) { receive }
# => #<Concurrent::ErlangActor::Pid:0x000006 running>
actor.tell :m
# => #<Concurrent::ErlangActor::Pid:0x000006 running>
actor.terminated.value! # => :m
```
which also works for actor on pool,
because if no block is given it will use a default block `{ |v| v }`
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_pool) { receive { |v| v } }
# => #<Concurrent::ErlangActor::Pid:0x000007 running>
# can simply be following
actor = Concurrent::ErlangActor.spawn(type: :on_pool) { receive }
# => #<Concurrent::ErlangActor::Pid:0x000008 running>
actor.tell :m
# => #<Concurrent::ErlangActor::Pid:0x000008 running>
actor.terminated.value! # => :m
```
The received message type can be limited.
```ruby
Concurrent::ErlangActor.
spawn(type: :on_thread) { receive(Numeric).succ }.
tell('junk'). # ignored message
tell(42).
terminated.value! # => 43
```
On pool it requires a block.
```ruby
Concurrent::ErlangActor.
spawn(type: :on_pool) { receive(Numeric) { |v| v.succ } }.
tell('junk'). # ignored message
tell(42).
terminated.value! # => 43
```
By the way, the body written for on pool actor will work for on thread actor
as well.
```ruby
Concurrent::ErlangActor.
spawn(type: :on_thread) { receive(Numeric) { |v| v.succ } }.
tell('junk'). # ignored message
tell(42).
terminated.value! # => 43
```
The `receive` method can be also used to dispatch based on the received message.
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread) do
while true
receive(on(Symbol) { |s| reply s.to_s },
on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
# put last works as else
on(ANY) do |v|
reply :bad_message
terminate [:bad_message, v]
end)
end
end
# => #<Concurrent::ErlangActor::Pid:0x000009 running>
actor.ask 1 # => 2
actor.ask 2 # => 3
actor.ask :value # => "value"
# this malformed message will terminate the actor
actor.ask -1 # => :bad_message
# the actor is no longer alive, so ask fails
actor.ask "junk" rescue $!
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x000009 terminated because of [:bad_message, -1]>>
actor.terminated.result # => [false, nil, [:bad_message, -1]]
```
And a same thing for the actor on pool.
Since it cannot loop it will call the body method repeatedly.
```ruby
module Behaviour
def body
receive(on(Symbol) do |s|
reply s.to_s
body # call again
end,
on(And[Numeric, -> v { v >= 0 }]) do |v|
reply v.succ
body # call again
end,
# put last works as else
on(ANY) do |v|
reply :bad_message
terminate [:bad_message, v]
end)
end
end # => :body
actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: Behaviour) { body }
# => #<Concurrent::ErlangActor::Pid:0x00000a running>
actor.ask 1 # => 2
actor.ask 2 # => 3
actor.ask :value # => "value"
# this malformed message will terminate the actor
actor.ask -1 # => :bad_message
# the actor is no longer alive, so ask fails
actor.ask "junk" rescue $!
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x00000a terminated because of [:bad_message, -1]>>
actor.terminated.result # => [false, nil, [:bad_message, -1]]
```
Since the behavior is stable in this case we can simplify with the `:keep` option
that will keep the receive rules until another receive is called
replacing the kept rules.
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_pool) do
receive(on(Symbol) { |s| reply s.to_s },
on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
# put last works as else
on(ANY) do |v|
reply :bad_message
terminate [:bad_message, v]
end,
keep: true)
end
# => #<Concurrent::ErlangActor::Pid:0x00000b running>
actor.ask 1 # => 2
actor.ask 2 # => 3
actor.ask :value # => "value"
# this malformed message will terminate the actor
actor.ask -1 # => :bad_message
# the actor is no longer alive, so ask fails
actor.ask "junk" rescue $!
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x00000b terminated because of [:bad_message, -1]>>
actor.terminated.result # => [false, nil, [:bad_message, -1]]
```
### Erlang behaviour
The actor matches Erlang processes in behaviour.
Therefore it supports the usual Erlang actor linking, monitoring, exit behaviour, etc.
```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread) do
spawn(link: true) do # equivalent of spawn_link in Erlang
terminate :err # equivalent of exit in Erlang
end
trap # equivalent of process_flag(trap_exit, true)
receive
end
# => #<Concurrent::ErlangActor::Pid:0x00000c running>
actor.terminated.value!
# => #<Concurrent::ErlangActor::Terminated:0x00000d
# @from=
# #<Concurrent::ErlangActor::Pid:0x00000e terminated because of err>,
# @reason=:err>
```
The methods have same or very similar name to be easily found.
The one exception from the original Erlang naming is exit.
To avoid clashing with `Kernel#exit` it's called `terminate`.
Until there is more information available here, the chapters listed below from
a book [lern you some Erlang](https://learnyousomeerlang.com)
are excellent source of information.
The Ruby ErlangActor implementation has same behaviour.
- [Links](https://learnyousomeerlang.com/errors-and-processes#links)
- [It's a trap](https://learnyousomeerlang.com/errors-and-processes#its-a-trap)
- [Monitors](https://learnyousomeerlang.com/errors-and-processes#monitors)
If anything behaves differently than in Erlang, please file an issue.
### Chapters or points to be added
* More erlang behaviour examples.
* The mailbox can be bounded in size,
then the tell and ask will block until there is space available in the mailbox.
Useful for building systems with backpressure.
* `#tell_op` and `ask_op` method examples, integration with promises.
* Best practice: always use timeout,
and do something if the message does not arrive, don't leave the actor stuck.
* Best practice: drop and log unrecognized messages,
or be even more defensive and terminate.
* Environment definition for actors.
|