File: cyclic_barrier.rb

package info (click to toggle)
ruby-concurrent 1.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 3,872 kB
  • ctags: 3,093
  • sloc: ruby: 26,166; java: 6,028; ansic: 282; makefile: 4
file content (108 lines) | stat: -rw-r--r-- 3,543 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
require 'concurrent/synchronization'

module Concurrent

  # A synchronization aid that allows a set of threads to all wait for each
  # other to reach a common barrier point.
  class CyclicBarrier < Synchronization::LockableObject

    # @!visibility private
    Generation = Struct.new(:status)
    private_constant :Generation

    # Create a new `CyclicBarrier` that waits for `parties` threads
    #
    # @param [Fixnum] parties the number of parties
    # @yield an optional block that will be executed that will be executed after
    #  the last thread arrives and before the others are released
    #
    # @raise [ArgumentError] if `parties` is not an integer or is less than zero
    def initialize(parties, &block)
      if !parties.is_a?(Fixnum) || parties < 1
        raise ArgumentError.new('count must be in integer greater than or equal zero')
      end
      super(&nil)
      synchronize { ns_initialize parties, &block }
    end

    # @return [Fixnum] the number of threads needed to pass the barrier
    def parties
      synchronize { @parties }
    end

    # @return [Fixnum] the number of threads currently waiting on the barrier
    def number_waiting
      synchronize { @number_waiting }
    end

    # Blocks on the barrier until the number of waiting threads is equal to
    # `parties` or until `timeout` is reached or `reset` is called
    # If a block has been passed to the constructor, it will be executed once by
    #  the last arrived thread before releasing the others
    # @param [Fixnum] timeout the number of seconds to wait for the counter or
    #  `nil` to block indefinitely
    # @return [Boolean] `true` if the `count` reaches zero else false on
    #  `timeout` or on `reset` or if the barrier is broken
    def wait(timeout = nil)
      synchronize do

        return false unless @generation.status == :waiting

        @number_waiting += 1

        if @number_waiting == @parties
          @action.call if @action
          ns_generation_done @generation, :fulfilled
          true
        else
          generation = @generation
          if ns_wait_until(timeout) { generation.status != :waiting }
            generation.status == :fulfilled
          else
            ns_generation_done generation, :broken, false
            false
          end
        end
      end
    end

    # resets the barrier to its initial state
    # If there is at least one waiting thread, it will be woken up, the `wait`
    # method will return false and the barrier will be broken
    # If the barrier is broken, this method restores it to the original state
    #
    # @return [nil]
    def reset
      synchronize { ns_generation_done @generation, :reset }
    end

    # A barrier can be broken when:
    # - a thread called the `reset` method while at least one other thread was waiting
    # - at least one thread timed out on `wait` method
    #
    # A broken barrier can be restored using `reset` it's safer to create a new one
    # @return [Boolean] true if the barrier is broken otherwise false
    def broken?
      synchronize { @generation.status != :waiting }
    end

    protected

    def ns_generation_done(generation, status, continue = true)
      generation.status = status
      ns_next_generation if continue
      ns_broadcast
    end

    def ns_next_generation
      @generation     = Generation.new(:waiting)
      @number_waiting = 0
    end

    def ns_initialize(parties, &block)
      @parties = parties
      @action  = block
      ns_next_generation
    end
  end
end