1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
require 'concurrent/constants'
require 'concurrent/channel/buffer/base'
require 'concurrent/atomic/atomic_reference'
module Concurrent
class Channel
module Buffer
# A blocking buffer with a size of zero. An item can only be put onto
# the buffer when a thread is waiting to take. Similarly, an item can
# only be put onto the buffer when a thread is waiting to put. When
# either {#put} or {#take} is called and there is no corresponding call
# in progress, the call will block indefinitely. Any other calls to the
# same method will queue behind the first call and block as well. As
# soon as a corresponding put/take call is made an exchange will occur
# and the first blocked call will return.
class Unbuffered < Base
# @!macro channel_buffer_size_reader
def size
synchronize do
putting.empty? ? 0 : 1
end
end
# @!macro channel_buffer_empty_question
def empty?
size == 0
end
# @!macro channel_buffer_full_question
def full?
!empty?
end
# @!macro channel_buffer_put
#
# Items can only be put onto the buffer when one or more threads are
# waiting to {#take} items off the buffer. When there is a thread
# waiting to take an item this method will give its item and return
# immediately. When there are no threads waiting to take, this method
# will block. As soon as a thread calls `take` the exchange will
# occur and this method will return.
def put(item)
mine = synchronize do
return false if ns_closed?
ref = Concurrent::AtomicReference.new(item)
if taking.empty?
putting.push(ref)
else
taken = taking.shift
taken.value = item
ref.value = nil
end
ref
end
loop do
return true if mine.value.nil?
Thread.pass
end
end
# @!macro channel_buffer_offer
#
# Items can only be put onto the buffer when one or more threads are
# waiting to {#take} items off the buffer. When there is a thread
# waiting to take an item this method will give its item and return
# `true` immediately. When there are no threads waiting to take or the
# buffer is closed, this method will return `false` immediately.
def offer(item)
synchronize do
return false if ns_closed? || taking.empty?
taken = taking.shift
taken.value = item
true
end
end
# @!macro channel_buffer_take
#
# Items can only be taken from the buffer when one or more threads are
# waiting to {#put} items onto the buffer. When there is a thread
# waiting to put an item this method will take that item and return it
# immediately. When there are no threads waiting to put, this method
# will block. As soon as a thread calls `pur` the exchange will occur
# and this method will return.
def take
mine = synchronize do
return Concurrent::NULL if ns_closed? && putting.empty?
ref = Concurrent::AtomicReference.new(nil)
if putting.empty?
taking.push(ref)
else
put = putting.shift
ref.value = put.value
put.value = nil
end
ref
end
loop do
item = mine.value
return item if item
Thread.pass
end
end
# @!macro channel_buffer_poll
#
# Items can only be taken off the buffer when one or more threads are
# waiting to {#put} items onto the buffer. When there is a thread
# waiting to put an item this method will take the item and return
# it immediately. When there are no threads waiting to put or the
# buffer is closed, this method will return `Concurrent::NULL` immediately.
def poll
synchronize do
return Concurrent::NULL if putting.empty?
put = putting.shift
value = put.value
put.value = nil
value
end
end
# @!macro channel_buffer_next
#
# Items can only be taken from the buffer when one or more threads are
# waiting to {#put} items onto the buffer. This method exhibits the
# same blocking behavior as {#take}.
#
# @see #take
def next
item = take
more = (item != Concurrent::NULL)
return item, more
end
private
def putting() @putting; end
def taking() @taking; end
# @!macro channel_buffer_initialize
def ns_initialize
# one will always be empty
@putting = []
@taking = []
self.closed = false
self.capacity = 1
end
end
end
end
end
|