Class: Concurrent::ProcessingActor
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::ProcessingActor
- Defined in:
- lib-edge/concurrent/edge/processing_actor.rb
Overview
Edge Features are under active development and may change frequently.
- Deprecations are not added before incompatible changes.
- Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
- Edge features may also lack tests and documentation.
- Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.
Class Method Summary collapse
-
.act(*args, &process) ⇒ ProcessingActor
Creates an actor.
-
.act_listening(channel, *args) {|*args| ... } ⇒ ProcessingActor
Creates an actor listening to a specified channel (mailbox).
Instance Method Summary collapse
-
#ask(message, answer = Promises.resolvable_future) ⇒ Promises::Future
Simplifies common pattern when a message sender also requires an answer to the message from the actor.
-
#inspect ⇒ String
String representation.
-
#mailbox ⇒ Promises::Channel
Actor's mailbox.
-
#receive(probe = Promises.resolvable_future) ⇒ Promises::Future(Object)
Receives a message when available, used in the actor's process.
-
#tell(message) ⇒ Promises::Future(ProcessingActor)
Tells a message to the actor.
-
#tell!(message) ⇒ self
Tells a message to the actor.
-
#termination ⇒ Promises::Future(Object)
A future which is resolved when the actor ends its processing.
Class Method Details
.act(*args, &process) ⇒ ProcessingActor
Creates an actor.
50 51 52 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 50 def self.act(*args, &process) act_listening Promises::Channel.new, *args, &process end |
.act_listening(channel, *args) {|*args| ... } ⇒ ProcessingActor
Creates an actor listening to a specified channel (mailbox).
64 65 66 67 68 69 70 71 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 64 def self.act_listening(channel, *args, &process) actor = ProcessingActor.new channel Promises. future(actor, *args, &process). run. chain_resolvable(actor.instance_variable_get(:@Terminated)) actor end |
Instance Method Details
#ask(message, answer = Promises.resolvable_future) ⇒ Promises::Future
has to be nice also on the receive side, cannot make structure like this [message = [...], answer] all receives should receive something friendly
Simplifies common pattern when a message sender also requires an answer to the message from the actor. It appends a resolvable_future for the answer after the message.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 129 def ask(, answer = Promises.resolvable_future) tell [, answer] # do not leave answers unanswered when actor terminates. Promises.any( Promises.fulfilled_future(:answer).zip(answer), Promises.fulfilled_future(:termination).zip(@Terminated) ).chain do |fulfilled, (which, value), (_, reason)| # TODO (pitr-ch 20-Jan-2017): we have to know which future was resolved # TODO (pitr-ch 20-Jan-2017): make the combinator programmable, so anyone can create what is needed # FIXME (pitr-ch 19-Jan-2017): ensure no callbacks are accumulated on @Terminated if which == :termination raise reason.nil? ? format('actor terminated normally before answering with a value: %s', value) : reason else fulfilled ? value : raise(reason) end end end |
#inspect ⇒ String
Returns string representation.
148 149 150 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 148 def inspect format '%s termination:%s>', super[0..-2], termination.state end |
#mailbox ⇒ Promises::Channel
Returns actor's mailbox.
25 26 27 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 25 def mailbox @Mailbox end |
#receive(probe = Promises.resolvable_future) ⇒ Promises::Future(Object)
Receives a message when available, used in the actor's process.
76 77 78 79 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 76 def receive(probe = Promises.resolvable_future) # TODO (pitr-ch 27-Dec-2016): patterns @Mailbox.pop probe end |
#tell(message) ⇒ Promises::Future(ProcessingActor)
Tells a message to the actor.
102 103 104 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 102 def tell() @Mailbox.push().then(self) { |_, actor| actor } end |
#tell!(message) ⇒ self
Tells a message to the actor. May block current thread if the mailbox is full. #tell is a better option since it does not block. It's usually used to integrate with threading code.
93 94 95 96 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 93 def tell!() @Mailbox.push().wait! self end |
#termination ⇒ Promises::Future(Object)
Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.
32 33 34 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 32 def termination @Terminated.with_hidden_resolvable end |