Class: Concurrent::ProcessingActor

Inherits:
Synchronization::Object show all
Defined in:
lib/concurrent/edge/processing_actor.rb

Overview

Note:

Edge Feature: Edge features are under active development and may change frequently. They are expected not to keep backward compatibility (there may also lack tests and documentation). Semantic versions will be obeyed though. Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when final.

A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.

Examples:

# Runs on a pool, does not consume 50_000 threads
actors = 50_000.times.map do |i|
  Concurrent::ProcessingActor.act(i) { |a, i| a.receive.then_on(:fast, i) { |m, i| m + i } }
end

actors.each { |a| a.tell 1 }
values = actors.map(&:termination).map(&:value)
values[0,5]                                        # => [1, 2, 3, 4, 5]
values[-5, 5]                                      # => [49996, 49997, 49998, 49999, 50000]

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (ProcessingActor) act(*args, &process)

Creates an actor.

Examples:

actor = Concurrent::ProcessingActor.act do |actor|
  actor.receive.then do |message|
    # the actor ends normally with message
    message
  end
end

actor.tell :a_message
    # => <#Concurrent::ProcessingActor:0x7fff11280560 termination:pending>
actor.termination.value! # => :a_message

Returns:

See Also:

  • Behaves the same way, but does not take mailbox as a first argument.


50
51
52
# File 'lib/concurrent/edge/processing_actor.rb', line 50

def self.act(*args, &process)
  act_listening Promises::Channel.new, *args, &process
end

+ (ProcessingActor) act_listening(channel, *args) {|*args| ... }

Creates an actor listening to a specified channel (mailbox).

Examples:

# TODO (pitr-ch 19-Jan-2017): actor with limited mailbox

Parameters:

  • args (Object)

    Arguments passed to the process.

  • channel (Promises::Channel)

    which serves as mailing box. The channel can have limited size to achieve backpressure.

Yields:

  • args to the process to get back a future which represents the actors execution.

Yield Parameters:

  • *args (Object)

Yield Returns:

Returns:



64
65
66
67
68
69
70
71
# File 'lib/concurrent/edge/processing_actor.rb', line 64

def self.act_listening(channel, *args, &process)
  actor = ProcessingActor.new channel
  Promises.
      future(actor, *args, &process).
      run.
      chain_resolvable(actor.instance_variable_get(:@Terminated))
  actor
end

Instance Method Details

- (Promises::Future) ask(message, answer = Promises.resolvable_future)

TODO:

has to be nice also on the receive side, cannot make structure like this [message = [...], answer] all receives should receive something friendly

Simplifies common pattern when a message sender also requires an answer to the message from the actor. It appends a resolvable_future for the answer after the message.

Examples:

add_once_actor = Concurrent::ProcessingActor.act do |actor|
  actor.receive.then do |(a, b), answer|
    result = a + b
    answer.fulfill result
    # terminate with result value
    result
  end
end
# => <#Concurrent::ProcessingActor:0x7fcd1315f6e8 termination:pending>

add_once_actor.ask([1, 2]).value!                  # => 3
# fails the actor already added once
add_once_actor.ask(%w(ab cd)).reason
# => #<RuntimeError: actor terminated normally before answering with a value: 3>
add_once_actor.termination.value!                  # => 3

Parameters:

Returns:

  • (Promises::Future)

    a future which will be fulfilled with the answer to the message



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/concurrent/edge/processing_actor.rb', line 129

def ask(message, answer = Promises.resolvable_future)
  tell [message, answer]
  # do not leave answers unanswered when actor terminates.
  Promises.any(
      Promises.fulfilled_future(:answer).zip(answer),
      Promises.fulfilled_future(:termination).zip(@Terminated)
  ).chain do |fulfilled, (which, value), (_, reason)|
    # TODO (pitr-ch 20-Jan-2017): we have to know which future was resolved
    # TODO (pitr-ch 20-Jan-2017): make the combinator programmable, so anyone can create what is needed
    # TODO (pitr-ch 19-Jan-2017): ensure no callbacks are accumulated on @Terminated
    if which == :termination
      raise reason.nil? ? format('actor terminated normally before answering with a value: %s', value) : reason
    else
      fulfilled ? value : raise(reason)
    end
  end
end

- (String) inspect

Returns string representation.

Returns:

  • (String)

    string representation.



148
149
150
# File 'lib/concurrent/edge/processing_actor.rb', line 148

def inspect
  format '<#%s:0x%x termination:%s>', self.class, object_id << 1, termination.state
end

- (Promises::Channel) mailbox

Returns actor's mailbox.

Returns:



25
26
27
# File 'lib/concurrent/edge/processing_actor.rb', line 25

def mailbox
  @Mailbox
end

- (Promises::Future(Object)) receive(probe = Promises.resolvable_future)

Receives a message when available, used in the actor's process.

Returns:

  • (Promises::Future(Object))

    a future which will be fulfilled with a message from mailbox when it is available.



76
77
78
79
# File 'lib/concurrent/edge/processing_actor.rb', line 76

def receive(probe = Promises.resolvable_future)
  # TODO (pitr-ch 27-Dec-2016): patterns
  @Mailbox.pop probe
end

- (Promises::Future(ProcessingActor)) tell(message)

Tells a message to the actor.

Parameters:

  • message (Object)

Returns:



102
103
104
# File 'lib/concurrent/edge/processing_actor.rb', line 102

def tell(message)
  @Mailbox.push(message).then(self) { |_, actor| actor }
end

- (self) tell!(message)

Tells a message to the actor. May block current thread if the mailbox is full. #tell is a better option since it does not block. It's usually used to integrate with threading code.

Examples:

Thread.new(actor) do |actor|
  # ...
  actor.tell! :a_message # blocks until the message is told
  #   (there is a space for it in the channel)
  # ...
end

Parameters:

  • message (Object)

Returns:

  • (self)


93
94
95
96
# File 'lib/concurrent/edge/processing_actor.rb', line 93

def tell!(message)
  @Mailbox.push(message).wait!
  self
end

- (Promises::Future(Object)) termination

Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.

Returns:

  • (Promises::Future(Object))

    a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.



32
33
34
# File 'lib/concurrent/edge/processing_actor.rb', line 32

def termination
  @Terminated.with_hidden_resolvable
end