Class: Concurrent::ProcessingActor

Inherits:
Synchronization::Object show all
Defined in:
lib-edge/concurrent/edge/processing_actor.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.

Examples:

# Runs on a pool, does not consume 50_000 threads
actors = 50_000.times.map do |i|
  Concurrent::ProcessingActor.act(i) { |a, i| a.receive.then_on(:fast, i) { |m, i| m + i } }
end

actors.each { |a| a.tell 1 }
values = actors.map(&:termination).map(&:value)
values[0,5]                                        # => [1, 2, 3, 4, 5]
values[-5, 5]                                      # => [49996, 49997, 49998, 49999, 50000]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.act(*args, &process) ⇒ ProcessingActor

Creates an actor.

Examples:

actor = Concurrent::ProcessingActor.act do |actor|
  actor.receive.then do |message|
    # the actor ends normally with message
    message
  end
end

actor.tell :a_message
    # => <#Concurrent::ProcessingActor:0x7fff11280560 termination:pending>
actor.termination.value! # => :a_message

Returns:

See Also:

  • Behaves the same way, but does not take mailbox as a first argument.


50
51
52
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 50

def self.act(*args, &process)
  act_listening Promises::Channel.new, *args, &process
end

.act_listening(channel, *args) {|*args| ... } ⇒ ProcessingActor

Creates an actor listening to a specified channel (mailbox).

Examples:

# TODO (pitr-ch 19-Jan-2017): actor with limited mailbox

Parameters:

  • args (Object)

    Arguments passed to the process.

  • channel (Promises::Channel)

    which serves as mailing box. The channel can have limited size to achieve backpressure.

Yields:

  • args to the process to get back a future which represents the actors execution.

Yield Parameters:

  • *args (Object)

Yield Returns:

Returns:



64
65
66
67
68
69
70
71
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 64

def self.act_listening(channel, *args, &process)
  actor = ProcessingActor.new channel
  Promises.
      future(actor, *args, &process).
      run.
      chain_resolvable(actor.instance_variable_get(:@Terminated))
  actor
end

Instance Method Details

#ask(message, answer = Promises.resolvable_future) ⇒ Promises::Future

TODO:

has to be nice also on the receive side, cannot make structure like this [message = [...], answer] all receives should receive something friendly

Simplifies common pattern when a message sender also requires an answer to the message from the actor. It appends a resolvable_future for the answer after the message.

Examples:

add_once_actor = Concurrent::ProcessingActor.act do |actor|
  actor.receive.then do |(a, b), answer|
    result = a + b
    answer.fulfill result
    # terminate with result value
    result
  end
end
# => <#Concurrent::ProcessingActor:0x7fcd1315f6e8 termination:pending>

add_once_actor.ask([1, 2]).value!                  # => 3
# fails the actor already added once
add_once_actor.ask(%w(ab cd)).reason
# => #<RuntimeError: actor terminated normally before answering with a value: 3>
add_once_actor.termination.value!                  # => 3

Parameters:

Returns:

  • (Promises::Future)

    a future which will be fulfilled with the answer to the message



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 129

def ask(message, answer = Promises.resolvable_future)
  tell [message, answer]
  # do not leave answers unanswered when actor terminates.
  Promises.any(
      Promises.fulfilled_future(:answer).zip(answer),
      Promises.fulfilled_future(:termination).zip(@Terminated)
  ).chain do |fulfilled, (which, value), (_, reason)|
    # TODO (pitr-ch 20-Jan-2017): we have to know which future was resolved
    # TODO (pitr-ch 20-Jan-2017): make the combinator programmable, so anyone can create what is needed
    # FIXME (pitr-ch 19-Jan-2017): ensure no callbacks are accumulated on @Terminated
    if which == :termination
      raise reason.nil? ? format('actor terminated normally before answering with a value: %s', value) : reason
    else
      fulfilled ? value : raise(reason)
    end
  end
end

#inspectString

Returns string representation.

Returns:

  • (String)

    string representation.



148
149
150
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 148

def inspect
  format '%s termination:%s>', super[0..-2], termination.state
end

#mailboxPromises::Channel

Returns actor's mailbox.

Returns:



25
26
27
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 25

def mailbox
  @Mailbox
end

#receive(probe = Promises.resolvable_future) ⇒ Promises::Future(Object)

Receives a message when available, used in the actor's process.

Returns:

  • (Promises::Future(Object))

    a future which will be fulfilled with a message from mailbox when it is available.



76
77
78
79
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 76

def receive(probe = Promises.resolvable_future)
  # TODO (pitr-ch 27-Dec-2016): patterns
  @Mailbox.pop probe
end

#tell(message) ⇒ Promises::Future(ProcessingActor)

Tells a message to the actor.

Parameters:

  • message (Object)

Returns:



102
103
104
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 102

def tell(message)
  @Mailbox.push(message).then(self) { |_, actor| actor }
end

#tell!(message) ⇒ self

Tells a message to the actor. May block current thread if the mailbox is full. #tell is a better option since it does not block. It's usually used to integrate with threading code.

Examples:

Thread.new(actor) do |actor|
  # ...
  actor.tell! :a_message # blocks until the message is told
  #   (there is a space for it in the channel)
  # ...
end

Parameters:

  • message (Object)

Returns:

  • (self)


93
94
95
96
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 93

def tell!(message)
  @Mailbox.push(message).wait!
  self
end

#terminationPromises::Future(Object)

Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.

Returns:

  • (Promises::Future(Object))

    a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.



32
33
34
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 32

def termination
  @Terminated.with_hidden_resolvable
end