Class: Concurrent::Promises::Channel

Inherits:
Synchronization::Object show all
Defined in:
lib/concurrent/edge/promises.rb

Constant Summary

UNLIMITED =

Default size of the Channel, makes it accept unlimited number of messages.

Object.new

Instance Method Summary (collapse)

Constructor Details

- (Channel) initialize(size = UNLIMITED)

A channel to pass messages between promises. The size is limited to support back pressure.

Parameters:

  • size (Integer, UNLIMITED) (defaults to: UNLIMITED)

    the maximum number of messages stored in the channel.



2005
2006
2007
2008
2009
2010
2011
2012
2013
# File 'lib/concurrent/edge/promises.rb', line 2005

def initialize(size = UNLIMITED)
  super()
  @Size        = size
  # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
  @Mutex       = Mutex.new
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end

Instance Method Details

- (Future) pop(probe = Concurrent::Promises.resolvable_future)

Returns a future witch will become fulfilled with a value from the channel when one is available.

Parameters:

  • probe (ResolvableFuture) (defaults to: Concurrent::Promises.resolvable_future)

    the future which will be fulfilled with a channel value

Returns:

  • (Future)

    the probe, its value will be the message when available.



2044
2045
2046
2047
# File 'lib/concurrent/edge/promises.rb', line 2044

def pop(probe = Concurrent::Promises.resolvable_future)
  # TODO (pitr-ch 26-Dec-2016): improve performance
  pop_for_select(probe).then(&:last)
end

- (Future) push(message)

Returns future which will fulfill when the message is added to the channel. Its value is the message.

Parameters:

  • message (Object)

Returns:



2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
# File 'lib/concurrent/edge/promises.rb', line 2019

def push(message)
  @Mutex.synchronize do
    while true
      if @Probes.empty?
        if @Size > @Messages.size
          @Messages.push message
          return Promises.fulfilled_future message
        else
          pushed = Promises.resolvable_future
          @PendingPush.push [message, pushed]
          return pushed.with_hidden_resolvable
        end
      else
        probe = @Probes.shift
        if probe.fulfill [self, message], false
          return Promises.fulfilled_future(message)
        end
      end
    end
  end
end

- (String) to_s Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



2069
2070
2071
# File 'lib/concurrent/edge/promises.rb', line 2069

def to_s
  format '<#%s:0x%x size:%s>', self.class, object_id << 1, @Size
end