File: erlang_actor.out.md

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; ansic: 288; makefile: 9; sh: 6
file content (300 lines) | stat: -rw-r--r-- 11,049 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
## Examples

The simplest example is to use the actor as an asynchronous execution.
Although, `Promises.future { 1 + 1 }` is better suited for that purpose.

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread, name: 'addition') { 1 + 1 }
# => #<Concurrent::ErlangActor::Pid:0x000002 addition running>
actor.terminated.value!                  # => 2
```

Let's send some messages and maintain some internal state 
which is what actors are good for.

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread, name: 'sum') do
  sum = 0 # internal state
  # receive and sum the messages until the actor gets :done
  while true
    message = receive
    break if message == :done
    # if the message is asked and not only told, 
    # reply with the current sum (has no effect if actor was not asked)
    reply sum += message   
  end
  # The final value of the actor
  sum
end
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
```

The actor can be either told a message asynchronously, 
or asked. The ask method will block until actor replies.

```ruby
# tell returns immediately returning the actor 
actor.tell(1).tell(1)
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
# blocks, waiting for the answer 
actor.ask 10                             # => 12
# stop the actor
actor.tell :done
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
# The final value of the actor 
actor.terminated.value!                  # => 12
```

### Actor types

There are two types of actors. 
The type is specified when calling spawn as a first argument, 
`Concurrent::ErlangActor.spawn(type: :on_thread, ...` or 
`Concurrent::ErlangActor.spawn(type: :on_pool, ...`.

The main difference is in how receive method returns.
 
-   `:on_thread` it blocks the thread until message is available, 
    then it returns or calls the provided block first. 
 
-   However, `:on_pool` it has to free up the thread on the receive 
    call back to the pool. Therefore the call to receive ends the 
    execution of current scope. The receive has to be given block
    or blocks that act as a continuations and are called 
    when there is message available.
 
Let's have a look at how the bodies of actors differ between the types:

```ruby
ping = Concurrent::ErlangActor.spawn(type: :on_thread) { reply receive }
# => #<Concurrent::ErlangActor::Pid:0x000004 running>
ping.ask 42                              # => 42
```

It first calls receive, which blocks the thread of the actor. 
When it returns the received message is passed an an argument to reply,
which replies the same value back to the ask method. 
Then the actor terminates normally, because there is nothing else to do.

However when running on pool a block with code which should be evaluated 
after the message is received has to be provided. 

```ruby
ping = Concurrent::ErlangActor.spawn(type: :on_pool) { receive { |m| reply m } }
# => #<Concurrent::ErlangActor::Pid:0x000005 running>
ping.ask 42                              # => 42
```

It starts by calling receive which will remember the given block for later
execution when a message is available and stops executing the current scope.
Later when a message becomes available the previously provided block is given
the message and called. The result of the block is the final value of the 
normally terminated actor.

The direct blocking style of `:on_thread` is simpler to write and more straight
forward however it has limitations. Each `:on_thread` actor creates a Thread 
taking time and resources. 
There is also a limited number of threads the Ruby process can create 
so you may hit the limit and fail to create more threads and therefore actors.  

Since the `:on_pool` actor runs on a poll of threads, its creations 
is faster and cheaper and it does not create new threads. 
Therefore there is no limit (only RAM) on how many actors can be created.

To simplify, if you need only few actors `:on_thread` is fine. 
However if you will be creating hundreds of actors or 
they will be short-lived `:on_pool` should be used.      

### Receiving messages

Simplest message receive.

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread) { receive }
# => #<Concurrent::ErlangActor::Pid:0x000006 running>
actor.tell :m
# => #<Concurrent::ErlangActor::Pid:0x000006 running>
actor.terminated.value!                  # => :m
```

which also works for actor on pool, 
because if no block is given it will use a default block `{ |v| v }` 

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_pool) { receive { |v| v } }
# => #<Concurrent::ErlangActor::Pid:0x000007 running>
# can simply be following
actor = Concurrent::ErlangActor.spawn(type: :on_pool) { receive }
# => #<Concurrent::ErlangActor::Pid:0x000008 running>
actor.tell :m
# => #<Concurrent::ErlangActor::Pid:0x000008 running>
actor.terminated.value!                  # => :m
```

The received message type can be limited.

```ruby
Concurrent::ErlangActor.
  spawn(type: :on_thread) { receive(Numeric).succ }.
  tell('junk'). # ignored message
  tell(42).
  terminated.value!                      # => 43
```

On pool it requires a block.

```ruby
Concurrent::ErlangActor.
  spawn(type: :on_pool) { receive(Numeric) { |v| v.succ } }.
  tell('junk'). # ignored message
  tell(42).
  terminated.value!                      # => 43
```

By the way, the body written for on pool actor will work for on thread actor 
as well. 

```ruby
Concurrent::ErlangActor.
  spawn(type: :on_thread) { receive(Numeric) { |v| v.succ } }.
  tell('junk'). # ignored message
  tell(42).
  terminated.value!                      # => 43
```

The `receive` method can be also used to dispatch based on the received message.

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread) do
  while true
    receive(on(Symbol) { |s| reply s.to_s },
            on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
            # put last works as else
            on(ANY) do |v| 
              reply :bad_message
              terminate [:bad_message, v]
            end)            
  end 
end
# => #<Concurrent::ErlangActor::Pid:0x000009 running>
actor.ask 1                              # => 2
actor.ask 2                              # => 3
actor.ask :value                         # => "value"
# this malformed message will terminate the actor
actor.ask -1                             # => :bad_message
# the actor is no longer alive, so ask fails
actor.ask "junk" rescue $!
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x000009 terminated because of [:bad_message, -1]>>
actor.terminated.result                  # => [false, nil, [:bad_message, -1]]
```

And a same thing for the actor on pool. 
Since it cannot loop it will call the body method repeatedly.

```ruby
module Behaviour
  def body
    receive(on(Symbol) do |s| 
              reply s.to_s 
              body # call again  
            end,
            on(And[Numeric, -> v { v >= 0 }]) do |v| 
              reply v.succ
              body # call again 
            end,
            # put last works as else
            on(ANY) do |v| 
              reply :bad_message
              terminate [:bad_message, v]
            end)  
  end
end                                      # => :body

actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: Behaviour) { body }
# => #<Concurrent::ErlangActor::Pid:0x00000a running>
actor.ask 1                              # => 2
actor.ask 2                              # => 3
actor.ask :value                         # => "value"
# this malformed message will terminate the actor
actor.ask -1                             # => :bad_message
# the actor is no longer alive, so ask fails
actor.ask "junk" rescue $!
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x00000a terminated because of [:bad_message, -1]>>
actor.terminated.result                  # => [false, nil, [:bad_message, -1]]
```

Since the behavior is stable in this case we can simplify with the `:keep` option
that will keep the receive rules until another receive is called
replacing the kept rules.

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_pool) do
  receive(on(Symbol) { |s| reply s.to_s },
          on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
          # put last works as else
          on(ANY) do |v| 
            reply :bad_message
            terminate [:bad_message, v]
          end,
          keep: true)            
end
# => #<Concurrent::ErlangActor::Pid:0x00000b running>
actor.ask 1                              # => 2
actor.ask 2                              # => 3
actor.ask :value                         # => "value"
# this malformed message will terminate the actor
actor.ask -1                             # => :bad_message
# the actor is no longer alive, so ask fails
actor.ask "junk" rescue $!
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x00000b terminated because of [:bad_message, -1]>>
actor.terminated.result                  # => [false, nil, [:bad_message, -1]]
```

### Erlang behaviour

The actor matches Erlang processes in behaviour. 
Therefore it supports the usual Erlang actor linking, monitoring, exit behaviour, etc.

```ruby
actor = Concurrent::ErlangActor.spawn(type: :on_thread) do
  spawn(link: true) do # equivalent of spawn_link in Erlang
    terminate :err # equivalent of exit in Erlang    
  end
  trap # equivalent of process_flag(trap_exit, true) 
  receive  
end
# => #<Concurrent::ErlangActor::Pid:0x00000c running>
actor.terminated.value!
# => #<Concurrent::ErlangActor::Terminated:0x00000d
#     @from=
#      #<Concurrent::ErlangActor::Pid:0x00000e terminated because of err>,
#     @reason=:err>
```

The methods have same or very similar name to be easily found. 
The one exception from the original Erlang naming is exit.
To avoid clashing with `Kernel#exit` it's called `terminate`. 

Until there is more information available here, the chapters listed below from 
a book [lern you some Erlang](https://learnyousomeerlang.com) 
are excellent source of information. 
The Ruby ErlangActor implementation has same behaviour. 

-   [Links](https://learnyousomeerlang.com/errors-and-processes#links)
-   [It's a trap](https://learnyousomeerlang.com/errors-and-processes#its-a-trap)
-   [Monitors](https://learnyousomeerlang.com/errors-and-processes#monitors)

If anything behaves differently than in Erlang, please file an issue.

### Chapters or points to be added

*   More erlang behaviour examples.
*   The mailbox can be bounded in size, 
    then the tell and ask will block until there is space available in the mailbox.
    Useful for building systems with backpressure.
*   `#tell_op` and `ask_op` method examples, integration with promises.
*   Best practice: always use timeout, 
    and do something if the message does not arrive, don't leave the actor stuck.
*   Best practice: drop and log unrecognized messages, 
    or be even more defensive and terminate.
*   Environment definition for actors.