1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
|
require 'concurrent/constants'
require 'concurrent/errors'
require 'concurrent/maybe'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/atomic/count_down_latch'
require 'concurrent/utility/engine'
require 'concurrent/utility/monotonic_time'
module Concurrent
# @!macro exchanger
#
# A synchronization point at which threads can pair and swap elements within
# pairs. Each thread presents some object on entry to the exchange method,
# matches with a partner thread, and receives its partner's object on return.
#
# @!macro thread_safe_variable_comparison
#
# This implementation is very simple, using only a single slot for each
# exchanger (unlike more advanced implementations which use an "arena").
# This approach will work perfectly fine when there are only a few threads
# accessing a single `Exchanger`. Beyond a handful of threads the performance
# will degrade rapidly due to contention on the single slot, but the algorithm
# will remain correct.
#
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger
# @example
#
# exchanger = Concurrent::Exchanger.new
#
# threads = [
# Thread.new { puts "first: " << exchanger.exchange('foo', 1) }, #=> "first: bar"
# Thread.new { puts "second: " << exchanger.exchange('bar', 1) } #=> "second: foo"
# ]
# threads.each {|t| t.join(2) }
# @!visibility private
class AbstractExchanger < Synchronization::Object
# @!visibility private
CANCEL = ::Object.new
private_constant :CANCEL
def initialize
super
end
# @!macro exchanger_method_do_exchange
#
# Waits for another thread to arrive at this exchange point (unless the
# current thread is interrupted), and then transfers the given object to
# it, receiving its object in return. The timeout value indicates the
# approximate number of seconds the method should block while waiting
# for the exchange. When the timeout value is `nil` the method will
# block indefinitely.
#
# @param [Object] value the value to exchange with another thread
# @param [Numeric, nil] timeout in seconds, `nil` blocks indefinitely
#
# @!macro exchanger_method_exchange
#
# In some edge cases when a `timeout` is given a return value of `nil` may be
# ambiguous. Specifically, if `nil` is a valid value in the exchange it will
# be impossible to tell whether `nil` is the actual return value or if it
# signifies timeout. When `nil` is a valid value in the exchange consider
# using {#exchange!} or {#try_exchange} instead.
#
# @return [Object] the value exchanged by the other thread or `nil` on timeout
def exchange(value, timeout = nil)
(value = do_exchange(value, timeout)) == CANCEL ? nil : value
end
# @!macro exchanger_method_do_exchange
# @!macro exchanger_method_exchange_bang
#
# On timeout a {Concurrent::TimeoutError} exception will be raised.
#
# @return [Object] the value exchanged by the other thread
# @raise [Concurrent::TimeoutError] on timeout
def exchange!(value, timeout = nil)
if (value = do_exchange(value, timeout)) == CANCEL
raise Concurrent::TimeoutError
else
value
end
end
# @!macro exchanger_method_do_exchange
# @!macro exchanger_method_try_exchange
#
# The return value will be a {Concurrent::Maybe} set to `Just` on success or
# `Nothing` on timeout.
#
# @return [Concurrent::Maybe] on success a `Just` maybe will be returned with
# the item exchanged by the other thread as `#value`; on timeout a
# `Nothing` maybe will be returned with {Concurrent::TimeoutError} as `#reason`
#
# @example
#
# exchanger = Concurrent::Exchanger.new
#
# result = exchanger.exchange(:foo, 0.5)
#
# if result.just?
# puts result.value #=> :bar
# else
# puts 'timeout'
# end
def try_exchange(value, timeout = nil)
if (value = do_exchange(value, timeout)) == CANCEL
Concurrent::Maybe.nothing(Concurrent::TimeoutError)
else
Concurrent::Maybe.just(value)
end
end
private
# @!macro exchanger_method_do_exchange
#
# @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
def do_exchange(value, timeout)
raise NotImplementedError
end
end
# @!macro internal_implementation_note
# @!visibility private
class RubyExchanger < AbstractExchanger
# A simplified version of java.util.concurrent.Exchanger written by
# Doug Lea, Bill Scherer, and Michael Scott with assistance from members
# of JCP JSR-166 Expert Group and released to the public domain. It does
# not include the arena or the multi-processor spin loops.
# http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/Exchanger.java
safe_initialization!
class Node < Concurrent::Synchronization::Object
attr_atomic :value
safe_initialization!
def initialize(item)
super()
@Item = item
@Latch = Concurrent::CountDownLatch.new
self.value = nil
end
def latch
@Latch
end
def item
@Item
end
end
private_constant :Node
def initialize
super
end
private
attr_atomic(:slot)
# @!macro exchanger_method_do_exchange
#
# @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
def do_exchange(value, timeout)
# ALGORITHM
#
# From the original Java version:
#
# > The basic idea is to maintain a "slot", which is a reference to
# > a Node containing both an Item to offer and a "hole" waiting to
# > get filled in. If an incoming "occupying" thread sees that the
# > slot is null, it CAS'es (compareAndSets) a Node there and waits
# > for another to invoke exchange. That second "fulfilling" thread
# > sees that the slot is non-null, and so CASes it back to null,
# > also exchanging items by CASing the hole, plus waking up the
# > occupying thread if it is blocked. In each case CAS'es may
# > fail because a slot at first appears non-null but is null upon
# > CAS, or vice-versa. So threads may need to retry these
# > actions.
#
# This version:
#
# An exchange occurs between an "occupier" thread and a "fulfiller" thread.
# The "slot" is used to setup this interaction. The first thread in the
# exchange puts itself into the slot (occupies) and waits for a fulfiller.
# The second thread removes the occupier from the slot and attempts to
# perform the exchange. Removing the occupier also frees the slot for
# another occupier/fulfiller pair.
#
# Because the occupier and the fulfiller are operating independently and
# because there may be contention with other threads, any failed operation
# indicates contention. Both the occupier and the fulfiller operate within
# spin loops. Any failed actions along the happy path will cause the thread
# to repeat the loop and try again.
#
# When a timeout value is given the thread must be cognizant of time spent
# in the spin loop. The remaining time is checked every loop. When the time
# runs out the thread will exit.
#
# A "node" is the data structure used to perform the exchange. Only the
# occupier's node is necessary. It's the node used for the exchange.
# Each node has an "item," a "hole" (self), and a "latch." The item is the
# node's initial value. It never changes. It's what the fulfiller returns on
# success. The occupier's hole is where the fulfiller put its item. It's the
# item that the occupier returns on success. The latch is used for synchronization.
# Because a thread may act as either an occupier or fulfiller (or possibly
# both in periods of high contention) every thread creates a node when
# the exchange method is first called.
#
# The following steps occur within the spin loop. If any actions fail
# the thread will loop and try again, so long as there is time remaining.
# If time runs out the thread will return CANCEL.
#
# Check the slot for an occupier:
#
# * If the slot is empty try to occupy
# * If the slot is full try to fulfill
#
# Attempt to occupy:
#
# * Attempt to CAS myself into the slot
# * Go to sleep and wait to be woken by a fulfiller
# * If the sleep is successful then the fulfiller completed its happy path
# - Return the value from my hole (the value given by the fulfiller)
# * When the sleep fails (time ran out) attempt to cancel the operation
# - Attempt to CAS myself out of the hole
# - If successful there is no contention
# - Return CANCEL
# - On failure, I am competing with a fulfiller
# - Attempt to CAS my hole to CANCEL
# - On success
# - Let the fulfiller deal with my cancel
# - Return CANCEL
# - On failure the fulfiller has completed its happy path
# - Return th value from my hole (the fulfiller's value)
#
# Attempt to fulfill:
#
# * Attempt to CAS the occupier out of the slot
# - On failure loop again
# * Attempt to CAS my item into the occupier's hole
# - On failure the occupier is trying to cancel
# - Loop again
# - On success we are on the happy path
# - Wake the sleeping occupier
# - Return the occupier's item
value = NULL if value.nil? # The sentinel allows nil to be a valid value
me = Node.new(value) # create my node in case I need to occupy
end_at = Concurrent.monotonic_time + timeout.to_f # The time to give up
result = loop do
other = slot
if other && compare_and_set_slot(other, nil)
# try to fulfill
if other.compare_and_set_value(nil, value)
# happy path
other.latch.count_down
break other.item
end
elsif other.nil? && compare_and_set_slot(nil, me)
# try to occupy
timeout = end_at - Concurrent.monotonic_time if timeout
if me.latch.wait(timeout)
# happy path
break me.value
else
# attempt to remove myself from the slot
if compare_and_set_slot(me, nil)
break CANCEL
elsif !me.compare_and_set_value(nil, CANCEL)
# I've failed to block the fulfiller
break me.value
end
end
end
break CANCEL if timeout && Concurrent.monotonic_time >= end_at
end
result == NULL ? nil : result
end
end
if Concurrent.on_jruby?
# @!macro internal_implementation_note
# @!visibility private
class JavaExchanger < AbstractExchanger
def initialize
@exchanger = java.util.concurrent.Exchanger.new
end
private
# @!macro exchanger_method_do_exchange
#
# @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
def do_exchange(value, timeout)
result = nil
if timeout.nil?
Synchronization::JRuby.sleep_interruptibly do
result = @exchanger.exchange(value)
end
else
Synchronization::JRuby.sleep_interruptibly do
result = @exchanger.exchange(value, 1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
end
end
result
rescue java.util.concurrent.TimeoutException
CANCEL
end
end
end
# @!visibility private
# @!macro internal_implementation_note
ExchangerImplementation = case
when Concurrent.on_jruby?
JavaExchanger
else
RubyExchanger
end
private_constant :ExchangerImplementation
# @!macro exchanger
class Exchanger < ExchangerImplementation
# @!method initialize
# Creates exchanger instance
# @!method exchange(value, timeout = nil)
# @!macro exchanger_method_do_exchange
# @!macro exchanger_method_exchange
# @!method exchange!(value, timeout = nil)
# @!macro exchanger_method_do_exchange
# @!macro exchanger_method_exchange_bang
# @!method try_exchange(value, timeout = nil)
# @!macro exchanger_method_do_exchange
# @!macro exchanger_method_try_exchange
end
end
|