Class: Concurrent::Promises::Channel

Inherits:
Synchronization::Object show all
Defined in:
lib-edge/concurrent/edge/promises.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Constant Summary collapse

UNLIMITED =

Default size of the Channel, makes it accept unlimited number of messages.

::Object.new

Instance Method Summary collapse

Constructor Details

#initialize(size = UNLIMITED) ⇒ Channel

A channel to pass messages between promises. The size is limited to support back pressure.

Parameters:

  • size (Integer, UNLIMITED) (defaults to: UNLIMITED)

    the maximum number of messages stored in the channel.



72
73
74
75
76
77
78
79
80
# File 'lib-edge/concurrent/edge/promises.rb', line 72

def initialize(size = UNLIMITED)
  super()
  @Size        = size
  # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
  @Mutex       = Mutex.new
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end

Instance Method Details

#pop(probe = Concurrent::Promises.resolvable_future) ⇒ Future

Returns a future witch will become fulfilled with a value from the channel when one is available.

Parameters:

  • probe (ResolvableFuture) (defaults to: Concurrent::Promises.resolvable_future)

    the future which will be fulfilled with a channel value

Returns:

  • (Future)

    the probe, its value will be the message when available.



111
112
113
114
# File 'lib-edge/concurrent/edge/promises.rb', line 111

def pop(probe = Concurrent::Promises.resolvable_future)
  # TODO (pitr-ch 26-Dec-2016): improve performance
  pop_for_select(probe).then(&:last)
end

#push(message) ⇒ Future

Returns future which will fulfill when the message is added to the channel. Its value is the message.

Parameters:

  • message (Object)

Returns:



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib-edge/concurrent/edge/promises.rb', line 86

def push(message)
  @Mutex.synchronize do
    while true
      if @Probes.empty?
        if @Size > @Messages.size
          @Messages.push message
          return Promises.fulfilled_future message
        else
          pushed = Promises.resolvable_future
          @PendingPush.push [message, pushed]
          return pushed.with_hidden_resolvable
        end
      else
        probe = @Probes.shift
        if probe.fulfill [self, message], false
          return Promises.fulfilled_future(message)
        end
      end
    end
  end
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



136
137
138
# File 'lib-edge/concurrent/edge/promises.rb', line 136

def to_s
  format '%s size:%s>', super[0..-2], @Size
end