File: wait_group_spec.cr

package info (click to toggle)
crystal 1.14.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 24,384 kB
  • sloc: javascript: 6,400; sh: 695; makefile: 269; ansic: 121; python: 105; cpp: 77; xml: 32
file content (198 lines) | stat: -rw-r--r-- 4,324 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
require "spec"
require "wait_group"

private def block_until_pending_waiter(wg)
  while wg.@waiting.empty?
    Fiber.yield
  end
end

private def forge_counter(wg, value)
  wg.@counter.set(value)
end

describe WaitGroup do
  describe "#add" do
    it "can't decrement to a negative counter" do
      wg = WaitGroup.new
      wg.add(5)
      wg.add(-3)
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.add(-5) }
    end

    it "resumes waiters when reaching negative counter" do
      wg = WaitGroup.new(1)
      spawn do
        block_until_pending_waiter(wg)
        wg.add(-2)
      rescue RuntimeError
      end
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.wait }
    end

    it "can't increment after reaching negative counter" do
      wg = WaitGroup.new
      forge_counter(wg, -1)

      # check twice, to make sure the waitgroup counter wasn't incremented back
      # to a positive value!
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.add(5) }
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.add(3) }
    end
  end

  describe "#done" do
    it "can't decrement to a negative counter" do
      wg = WaitGroup.new
      wg.add(1)
      wg.done
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.done }
    end

    it "resumes waiters when reaching negative counter" do
      wg = WaitGroup.new(1)
      spawn do
        block_until_pending_waiter(wg)
        forge_counter(wg, 0)
        wg.done
      rescue RuntimeError
      end
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.wait }
    end
  end

  describe "#wait" do
    it "immediately returns when counter is zero" do
      channel = Channel(Nil).new(1)

      spawn do
        wg = WaitGroup.new(0)
        wg.wait
        channel.send(nil)
      end

      select
      when channel.receive
        # success
      when timeout(1.second)
        fail "expected #wait to not block the fiber"
      end
    end

    it "immediately raises when counter is negative" do
      wg = WaitGroup.new(0)
      expect_raises(RuntimeError) { wg.done }
      expect_raises(RuntimeError, "Negative WaitGroup counter") { wg.wait }
    end

    it "raises when counter is positive after wake up" do
      wg = WaitGroup.new(1)
      waiter = Fiber.current

      spawn do
        block_until_pending_waiter(wg)
        waiter.enqueue
      end

      expect_raises(RuntimeError, "Positive WaitGroup counter (early wake up?)") { wg.wait }
    end
  end

  it "waits until concurrent executions are finished" do
    wg1 = WaitGroup.new
    wg2 = WaitGroup.new

    8.times do
      wg1.add(16)
      wg2.add(16)
      exited = Channel(Bool).new(16)

      16.times do
        spawn do
          wg1.done
          wg2.wait
          exited.send(true)
        end
      end

      wg1.wait

      16.times do
        select
        when exited.receive
          fail "WaitGroup released group too soon"
        else
        end
        wg2.done
      end

      16.times do
        select
        when x = exited.receive
          x.should eq(true)
        when timeout(1.millisecond)
          fail "Expected channel to receive value"
        end
      end
    end
  end

  it "increments the counter from executing fibers" do
    wg = WaitGroup.new(16)
    extra = Atomic(Int32).new(0)

    16.times do
      spawn do
        wg.add(2)

        2.times do
          spawn do
            extra.add(1)
            wg.done
          end
        end

        wg.done
      end
    end

    wg.wait
    extra.get.should eq(32)
  end

  it "takes a block to WaitGroup.wait" do
    fiber_count = 10
    completed = Array.new(fiber_count) { false }

    WaitGroup.wait do |wg|
      fiber_count.times do |i|
        wg.spawn { completed[i] = true }
      end
    end

    completed.should eq [true] * 10
  end

  # the test takes far too much time for the interpreter to complete
  {% unless flag?(:interpreted) %}
    it "stress add/done/wait" do
      wg = WaitGroup.new

      1000.times do
        counter = Atomic(Int32).new(0)

        2.times do
          wg.add(1)

          spawn do
            counter.add(1)
            wg.done
          end
        end

        wg.wait
        counter.get.should eq(2)
      end
    end
  {% end %}
end