File: exchanger.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (352 lines) | stat: -rw-r--r-- 12,997 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
require 'concurrent/constants'
require 'concurrent/errors'
require 'concurrent/maybe'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/atomic/count_down_latch'
require 'concurrent/utility/engine'
require 'concurrent/utility/monotonic_time'

module Concurrent

  # @!macro exchanger
  #
  #   A synchronization point at which threads can pair and swap elements within
  #   pairs. Each thread presents some object on entry to the exchange method,
  #   matches with a partner thread, and receives its partner's object on return.
  #
  #   @!macro thread_safe_variable_comparison
  #
  #   This implementation is very simple, using only a single slot for each
  #   exchanger (unlike more advanced implementations which use an "arena").
  #   This approach will work perfectly fine when there are only a few threads
  #   accessing a single `Exchanger`. Beyond a handful of threads the performance
  #   will degrade rapidly due to contention on the single slot, but the algorithm
  #   will remain correct.
  #
  #   @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger
  #   @example
  #
  #     exchanger = Concurrent::Exchanger.new
  #
  #     threads = [
  #       Thread.new { puts "first: " << exchanger.exchange('foo', 1) }, #=> "first: bar"
  #       Thread.new { puts "second: " << exchanger.exchange('bar', 1) } #=> "second: foo"
  #     ]
  #     threads.each {|t| t.join(2) }

  # @!visibility private
  class AbstractExchanger < Synchronization::Object

    # @!visibility private
    CANCEL = ::Object.new
    private_constant :CANCEL

    def initialize
      super
    end

    # @!macro exchanger_method_do_exchange
    #
    #   Waits for another thread to arrive at this exchange point (unless the
    #   current thread is interrupted), and then transfers the given object to
    #   it, receiving its object in return. The timeout value indicates the
    #   approximate number of seconds the method should block while waiting
    #   for the exchange. When the timeout value is `nil` the method will
    #   block indefinitely.
    #
    #   @param [Object] value the value to exchange with another thread
    #   @param [Numeric, nil] timeout in seconds, `nil` blocks indefinitely
    #
    # @!macro exchanger_method_exchange
    #
    #   In some edge cases when a `timeout` is given a return value of `nil` may be
    #   ambiguous. Specifically, if `nil` is a valid value in the exchange it will
    #   be impossible to tell whether `nil` is the actual return value or if it
    #   signifies timeout. When `nil` is a valid value in the exchange consider
    #   using {#exchange!} or {#try_exchange} instead.
    #
    #   @return [Object] the value exchanged by the other thread or `nil` on timeout
    def exchange(value, timeout = nil)
      (value = do_exchange(value, timeout)) == CANCEL ? nil : value
    end

    # @!macro exchanger_method_do_exchange
    # @!macro exchanger_method_exchange_bang
    #
    #   On timeout a {Concurrent::TimeoutError} exception will be raised.
    #
    #   @return [Object] the value exchanged by the other thread
    #   @raise [Concurrent::TimeoutError] on timeout
    def exchange!(value, timeout = nil)
      if (value = do_exchange(value, timeout)) == CANCEL
        raise Concurrent::TimeoutError
      else
        value
      end
    end

    # @!macro exchanger_method_do_exchange
    # @!macro exchanger_method_try_exchange
    #
    #   The return value will be a {Concurrent::Maybe} set to `Just` on success or
    #   `Nothing` on timeout.
    #
    #   @return [Concurrent::Maybe] on success a `Just` maybe will be returned with
    #     the item exchanged by the other thread as `#value`; on timeout a
    #     `Nothing` maybe will be returned with {Concurrent::TimeoutError} as `#reason`
    #
    #   @example
    #
    #     exchanger = Concurrent::Exchanger.new
    #
    #     result = exchanger.exchange(:foo, 0.5)
    #
    #     if result.just?
    #       puts result.value #=> :bar
    #     else
    #       puts 'timeout'
    #     end
    def try_exchange(value, timeout = nil)
      if (value = do_exchange(value, timeout)) == CANCEL
        Concurrent::Maybe.nothing(Concurrent::TimeoutError)
      else
        Concurrent::Maybe.just(value)
      end
    end

    private

    # @!macro exchanger_method_do_exchange
    #
    # @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
    def do_exchange(value, timeout)
      raise NotImplementedError
    end
  end

  # @!macro internal_implementation_note
  # @!visibility private
  class RubyExchanger < AbstractExchanger
    # A simplified version of java.util.concurrent.Exchanger written by
    # Doug Lea, Bill Scherer, and Michael Scott with assistance from members
    # of JCP JSR-166 Expert Group and released to the public domain. It does
    # not include the arena or the multi-processor spin loops.
    # http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/Exchanger.java

    safe_initialization!

    class Node < Concurrent::Synchronization::Object
      attr_atomic :value
      safe_initialization!

      def initialize(item)
        super()
        @Item      = item
        @Latch     = Concurrent::CountDownLatch.new
        self.value = nil
      end

      def latch
        @Latch
      end

      def item
        @Item
      end
    end
    private_constant :Node

    def initialize
      super
    end

    private

    attr_atomic(:slot)

    # @!macro exchanger_method_do_exchange
    #
    # @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
    def do_exchange(value, timeout)

      # ALGORITHM
      #
      # From the original Java version:
      #
      # > The basic idea is to maintain a "slot", which is a reference to
      # > a Node containing both an Item to offer and a "hole" waiting to
      # > get filled in.  If an incoming "occupying" thread sees that the
      # > slot is null, it CAS'es (compareAndSets) a Node there and waits
      # > for another to invoke exchange.  That second "fulfilling" thread
      # > sees that the slot is non-null, and so CASes it back to null,
      # > also exchanging items by CASing the hole, plus waking up the
      # > occupying thread if it is blocked.  In each case CAS'es may
      # > fail because a slot at first appears non-null but is null upon
      # > CAS, or vice-versa.  So threads may need to retry these
      # > actions.
      #
      # This version:
      #
      # An exchange occurs between an "occupier" thread and a "fulfiller" thread.
      # The "slot" is used to setup this interaction. The first thread in the
      # exchange puts itself into the slot (occupies) and waits for a fulfiller.
      # The second thread removes the occupier from the slot and attempts to
      # perform the exchange. Removing the occupier also frees the slot for
      # another occupier/fulfiller pair.
      #
      # Because the occupier and the fulfiller are operating independently and
      # because there may be contention with other threads, any failed operation
      # indicates contention. Both the occupier and the fulfiller operate within
      # spin loops. Any failed actions along the happy path will cause the thread
      # to repeat the loop and try again.
      #
      # When a timeout value is given the thread must be cognizant of time spent
      # in the spin loop. The remaining time is checked every loop. When the time
      # runs out the thread will exit.
      #
      # A "node" is the data structure used to perform the exchange. Only the
      # occupier's node is necessary. It's the node used for the exchange.
      # Each node has an "item," a "hole" (self), and a "latch." The item is the
      # node's initial value. It never changes. It's what the fulfiller returns on
      # success. The occupier's hole is where the fulfiller put its item. It's the
      # item that the occupier returns on success. The latch is used for synchronization.
      # Because a thread may act as either an occupier or fulfiller (or possibly
      # both in periods of high contention) every thread creates a node when
      # the exchange method is first called.
      #
      # The following steps occur within the spin loop. If any actions fail
      # the thread will loop and try again, so long as there is time remaining.
      # If time runs out the thread will return CANCEL.
      #
      # Check the slot for an occupier:
      #
      #   * If the slot is empty try to occupy
      #   * If the slot is full try to fulfill
      #
      # Attempt to occupy:
      #
      #   * Attempt to CAS myself into the slot
      #   * Go to sleep and wait to be woken by a fulfiller
      #   * If the sleep is successful then the fulfiller completed its happy path
      #     - Return the value from my hole (the value given by the fulfiller)
      #   * When the sleep fails (time ran out) attempt to cancel the operation
      #     - Attempt to CAS myself out of the hole
      #     - If successful there is no contention
      #       - Return CANCEL
      #     - On failure, I am competing with a fulfiller
      #       - Attempt to CAS my hole to CANCEL
      #       - On success
      #         - Let the fulfiller deal with my cancel
      #         - Return CANCEL
      #       - On failure the fulfiller has completed its happy path
      #         - Return th value from my hole (the fulfiller's value)
      #
      # Attempt to fulfill:
      #
      # * Attempt to CAS the occupier out of the slot
      #   - On failure loop again
      # * Attempt to CAS my item into the occupier's hole
      #   - On failure the occupier is trying to cancel
      #     - Loop again
      #   - On success we are on the happy path
      #     - Wake the sleeping occupier
      #     - Return the occupier's item

      value  = NULL if value.nil? # The sentinel allows nil to be a valid value
      me     = Node.new(value) # create my node in case I need to occupy
      end_at = Concurrent.monotonic_time + timeout.to_f # The time to give up

      result = loop do
        other = slot
        if other && compare_and_set_slot(other, nil)
          # try to fulfill
          if other.compare_and_set_value(nil, value)
            # happy path
            other.latch.count_down
            break other.item
          end
        elsif other.nil? && compare_and_set_slot(nil, me)
          # try to occupy
          timeout = end_at - Concurrent.monotonic_time if timeout
          if me.latch.wait(timeout)
            # happy path
            break me.value
          else
            # attempt to remove myself from the slot
            if compare_and_set_slot(me, nil)
              break CANCEL
            elsif !me.compare_and_set_value(nil, CANCEL)
              # I've failed to block the fulfiller
              break me.value
            end
          end
        end
        break CANCEL if timeout && Concurrent.monotonic_time >= end_at
      end

      result == NULL ? nil : result
    end
  end

  if Concurrent.on_jruby?

    # @!macro internal_implementation_note
    # @!visibility private
    class JavaExchanger < AbstractExchanger

      def initialize
        @exchanger = java.util.concurrent.Exchanger.new
      end

      private

      # @!macro exchanger_method_do_exchange
      #
      # @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
      def do_exchange(value, timeout)
        result = nil
        if timeout.nil?
          Synchronization::JRuby.sleep_interruptibly do
            result = @exchanger.exchange(value)
          end
        else
          Synchronization::JRuby.sleep_interruptibly do
            result = @exchanger.exchange(value, 1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
          end
        end
        result
      rescue java.util.concurrent.TimeoutException
        CANCEL
      end
    end
  end

  # @!visibility private
  # @!macro internal_implementation_note
  ExchangerImplementation = case
                            when Concurrent.on_jruby?
                              JavaExchanger
                            else
                              RubyExchanger
                            end
  private_constant :ExchangerImplementation

  # @!macro exchanger
  class Exchanger < ExchangerImplementation

    # @!method initialize
    #   Creates exchanger instance

    # @!method exchange(value, timeout = nil)
    #   @!macro exchanger_method_do_exchange
    #   @!macro exchanger_method_exchange

    # @!method exchange!(value, timeout = nil)
    #   @!macro exchanger_method_do_exchange
    #   @!macro exchanger_method_exchange_bang

    # @!method try_exchange(value, timeout = nil)
    #   @!macro exchanger_method_do_exchange
    #   @!macro exchanger_method_try_exchange
  end
end