Class: Concurrent::Promises::Channel
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::Promises::Channel
- Defined in:
- lib/concurrent/edge/promises.rb
Constant Summary
- UNLIMITED =
Default size of the Channel, makes it accept unlimited number of messages.
Object.new
Instance Method Summary (collapse)
-
- (Channel) initialize(size = UNLIMITED)
constructor
A channel to pass messages between promises.
-
- (Future) pop(probe = Concurrent::Promises.resolvable_future)
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
- (Future) push(message)
Returns future which will fulfill when the message is added to the channel.
-
- (String) to_s
(also: #inspect)
Short string representation.
Constructor Details
- (Channel) initialize(size = UNLIMITED)
A channel to pass messages between promises. The size is limited to support back pressure.
2005 2006 2007 2008 2009 2010 2011 2012 2013 |
# File 'lib/concurrent/edge/promises.rb', line 2005 def initialize(size = UNLIMITED) super() @Size = size # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation @Mutex = Mutex.new @Probes = [] @Messages = [] @PendingPush = [] end |
Instance Method Details
- (Future) pop(probe = Concurrent::Promises.resolvable_future)
Returns a future witch will become fulfilled with a value from the channel when one is available.
2044 2045 2046 2047 |
# File 'lib/concurrent/edge/promises.rb', line 2044 def pop(probe = Concurrent::Promises.resolvable_future) # TODO (pitr-ch 26-Dec-2016): improve performance pop_for_select(probe).then(&:last) end |
- (Future) push(message)
Returns future which will fulfill when the message is added to the channel. Its value is the message.
2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 |
# File 'lib/concurrent/edge/promises.rb', line 2019 def push() @Mutex.synchronize do while true if @Probes.empty? if @Size > @Messages.size @Messages.push return Promises.fulfilled_future else pushed = Promises.resolvable_future @PendingPush.push [, pushed] return pushed.with_hidden_resolvable end else probe = @Probes.shift if probe.fulfill [self, ], false return Promises.fulfilled_future() end end end end end |
- (String) to_s Also known as: inspect
Returns Short string representation.
2069 2070 2071 |
# File 'lib/concurrent/edge/promises.rb', line 2069 def to_s format '<#%s:0x%x size:%s>', self.class, object_id << 1, @Size end |