Class: Concurrent::Promises::Channel
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::Promises::Channel
- Defined in:
- lib-edge/concurrent/edge/promises.rb
Overview
Note:
Edge Features are under active development and may change frequently.
- Deprecations are not added before incompatible changes.
- Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
- Edge features may also lack tests and documentation.
- Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
Constant Summary collapse
- UNLIMITED =
Default size of the Channel, makes it accept unlimited number of messages.
::Object.new
Instance Method Summary collapse
-
#initialize(size = UNLIMITED) ⇒ Channel
constructor
A channel to pass messages between promises.
-
#pop(probe = Concurrent::Promises.resolvable_future) ⇒ Future
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
#push(message) ⇒ Future
Returns future which will fulfill when the message is added to the channel.
-
#to_s ⇒ String
(also: #inspect)
Short string representation.
Constructor Details
#initialize(size = UNLIMITED) ⇒ Channel
A channel to pass messages between promises. The size is limited to support back pressure.
72 73 74 75 76 77 78 79 80 |
# File 'lib-edge/concurrent/edge/promises.rb', line 72 def initialize(size = UNLIMITED) super() @Size = size # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation @Mutex = Mutex.new @Probes = [] @Messages = [] @PendingPush = [] end |
Instance Method Details
#pop(probe = Concurrent::Promises.resolvable_future) ⇒ Future
Returns a future witch will become fulfilled with a value from the channel when one is available.
111 112 113 114 |
# File 'lib-edge/concurrent/edge/promises.rb', line 111 def pop(probe = Concurrent::Promises.resolvable_future) # TODO (pitr-ch 26-Dec-2016): improve performance pop_for_select(probe).then(&:last) end |
#push(message) ⇒ Future
Returns future which will fulfill when the message is added to the channel. Its value is the message.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib-edge/concurrent/edge/promises.rb', line 86 def push() @Mutex.synchronize do while true if @Probes.empty? if @Size > @Messages.size @Messages.push return Promises.fulfilled_future else pushed = Promises.resolvable_future @PendingPush.push [, pushed] return pushed.with_hidden_resolvable end else probe = @Probes.shift if probe.fulfill [self, ], false return Promises.fulfilled_future() end end end end end |
#to_s ⇒ String Also known as: inspect
Returns Short string representation.
136 137 138 |
# File 'lib-edge/concurrent/edge/promises.rb', line 136 def to_s format '%s size:%s>', super[0..-2], @Size end |