1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
require 'concurrent/constants'
require 'concurrent/channel/buffer/base'
module Concurrent
class Channel
module Buffer
# A buffer with a fixed internal capacity. Items can be put onto the
# buffer without blocking until the internal capacity is reached. Once
# the buffer is at capacity, subsequent calls to {#put} will block until
# an item is removed from the buffer, creating spare capacity.
class Buffered < Base
# @!macro channel_buffer_put
#
# New items can be put onto the buffer until the number of items in
# the buffer reaches the {#size} value specified during
# initialization.
def put(item)
loop do
synchronize do
if ns_closed?
return false
elsif !ns_full?
ns_put_onto_buffer(item)
return true
end
end
Thread.pass
end
end
# @!macro channel_buffer_offer
#
# New items can be put onto the buffer until the number of items in
# the buffer reaches the {#size} value specified during
# initialization.
def offer(item)
synchronize do
if ns_closed? || ns_full?
return false
else
ns_put_onto_buffer(item)
return true
end
end
end
# @!macro channel_buffer_take
def take
item, _ = self.next
item
end
# @!macro channel_buffer_next
def next
loop do
synchronize do
if ns_closed? && ns_empty?
return Concurrent::NULL, false
elsif !ns_empty?
item = buffer.shift
return item, true
end
end
Thread.pass
end
end
# @!macro channel_buffer_poll
def poll
synchronize do
if ns_empty?
Concurrent::NULL
else
buffer.shift
end
end
end
private
# @!macro channel_buffer_initialize
#
# @param [Integer] size the maximum capacity of the buffer; must be
# greater than zero.
# @raise [ArgumentError] when the size is zero (0) or less.
def ns_initialize(size)
raise ArgumentError.new('size must be greater than 0') if size.to_i <= 0
self.capacity = size.to_i
self.buffer = []
end
# @!macro channel_buffer_size_reader
def ns_size
buffer.size
end
# @!macro channel_buffer_empty_question
def ns_empty?
ns_size == 0
end
# @!macro channel_buffer_full_question
def ns_full?
ns_size == capacity
end
# @!macro channel_buffer_put
def ns_put_onto_buffer(item)
buffer.push(item)
end
end
end
end
end
|