Class: Concurrent::ProcessingActor
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::ProcessingActor
- Defined in:
- lib/concurrent/edge/processing_actor.rb
Overview
Edge Feature: Edge features are under active development and may change frequently. They are expected not to
keep backward compatibility (there may also lack tests and documentation). Semantic versions will
be obeyed though. Features developed in concurrent-ruby-edge
are expected to move
to concurrent-ruby
when final.
A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.
Class Method Summary (collapse)
-
+ (ProcessingActor) act(*args, &process)
Creates an actor.
-
+ (ProcessingActor) act_listening(channel, *args) {|*args| ... }
Creates an actor listening to a specified channel (mailbox).
Instance Method Summary (collapse)
-
- (Promises::Future) ask(message, answer = Promises.resolvable_future)
Simplifies common pattern when a message sender also requires an answer to the message from the actor.
-
- (String) inspect
String representation.
-
- (Promises::Channel) mailbox
Actor's mailbox.
-
- (Promises::Future(Object)) receive(probe = Promises.resolvable_future)
Receives a message when available, used in the actor's process.
-
- (Promises::Future(ProcessingActor)) tell(message)
Tells a message to the actor.
-
- (self) tell!(message)
Tells a message to the actor.
-
- (Promises::Future(Object)) termination
A future which is resolved when the actor ends its processing.
Class Method Details
+ (ProcessingActor) act(*args, &process)
Creates an actor.
50 51 52 |
# File 'lib/concurrent/edge/processing_actor.rb', line 50 def self.act(*args, &process) act_listening Promises::Channel.new, *args, &process end |
+ (ProcessingActor) act_listening(channel, *args) {|*args| ... }
Creates an actor listening to a specified channel (mailbox).
64 65 66 67 68 69 70 71 |
# File 'lib/concurrent/edge/processing_actor.rb', line 64 def self.act_listening(channel, *args, &process) actor = ProcessingActor.new channel Promises. future(actor, *args, &process). run. chain_resolvable(actor.instance_variable_get(:@Terminated)) actor end |
Instance Method Details
- (Promises::Future) ask(message, answer = Promises.resolvable_future)
has to be nice also on the receive side, cannot make structure like this [message = [...], answer] all receives should receive something friendly
Simplifies common pattern when a message sender also requires an answer to the message from the actor. It appends a resolvable_future for the answer after the message.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/concurrent/edge/processing_actor.rb', line 129 def ask(, answer = Promises.resolvable_future) tell [, answer] # do not leave answers unanswered when actor terminates. Promises.any( Promises.fulfilled_future(:answer).zip(answer), Promises.fulfilled_future(:termination).zip(@Terminated) ).chain do |fulfilled, (which, value), (_, reason)| # TODO (pitr-ch 20-Jan-2017): we have to know which future was resolved # TODO (pitr-ch 20-Jan-2017): make the combinator programmable, so anyone can create what is needed # TODO (pitr-ch 19-Jan-2017): ensure no callbacks are accumulated on @Terminated if which == :termination raise reason.nil? ? format('actor terminated normally before answering with a value: %s', value) : reason else fulfilled ? value : raise(reason) end end end |
- (String) inspect
Returns string representation.
148 149 150 |
# File 'lib/concurrent/edge/processing_actor.rb', line 148 def inspect format '<#%s:0x%x termination:%s>', self.class, object_id << 1, termination.state end |
- (Promises::Channel) mailbox
Returns actor's mailbox.
25 26 27 |
# File 'lib/concurrent/edge/processing_actor.rb', line 25 def mailbox @Mailbox end |
- (Promises::Future(Object)) receive(probe = Promises.resolvable_future)
Receives a message when available, used in the actor's process.
76 77 78 79 |
# File 'lib/concurrent/edge/processing_actor.rb', line 76 def receive(probe = Promises.resolvable_future) # TODO (pitr-ch 27-Dec-2016): patterns @Mailbox.pop probe end |
- (Promises::Future(ProcessingActor)) tell(message)
Tells a message to the actor.
102 103 104 |
# File 'lib/concurrent/edge/processing_actor.rb', line 102 def tell() @Mailbox.push().then(self) { |_, actor| actor } end |
- (self) tell!(message)
Tells a message to the actor. May block current thread if the mailbox is full. #tell is a better option since it does not block. It's usually used to integrate with threading code.
93 94 95 96 |
# File 'lib/concurrent/edge/processing_actor.rb', line 93 def tell!() @Mailbox.push().wait! self end |
- (Promises::Future(Object)) termination
Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.
32 33 34 |
# File 'lib/concurrent/edge/processing_actor.rb', line 32 def termination @Terminated.with_hidden_resolvable end |