File: throttle_spec.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (94 lines) | stat: -rw-r--r-- 3,043 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
require 'concurrent/edge/promises'
require 'thread'

RSpec.describe 'Concurrent' do
  describe 'Throttle' do
    specify 'acquiring' do
      throttle = Concurrent::Throttle.new 2
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 2

      expect(throttle.try_acquire).to be_truthy
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 1

      thread = in_thread { throttle.acquire; throttle.release; :ok }
      expect(thread.value).to eq :ok

      expect(throttle.try_acquire).to be_truthy
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 0

      thread1 = in_thread { throttle.acquire(2); throttle.release; :ok }
      thread2 = in_thread { throttle.acquire(0.01) { :ok } }
      thread3 = in_thread { throttle.acquire(2) { :ok } }
      is_sleeping thread1
      expect(thread2.value).to be_falsey
      is_sleeping thread3

      expect(throttle.try_acquire).to be_falsey
      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq(0)
      expect(throttle.send(:capacity)).to eq(-3)

      throttle.release

      expect(throttle.max_capacity).to eq 2
      expect(throttle.available_capacity).to eq 0
      expect(thread1.value).to eq :ok
      expect(thread3.value).to eq :ok
    end

    specify '#to_s' do
      throttle = Concurrent::Throttle.new 2
      expect(throttle.to_s).to match(/Throttle.*available 2 of 2/)
    end

    specify '#on' do
      throttle = Concurrent::Throttle.new 2
      io_proxy = throttle.on :io
      expect(throttle.on(:io)).to eq io_proxy

      expect(io_proxy.can_overflow?).to eq Concurrent.executor(:io).can_overflow?
      expect(io_proxy.serialized?).to eq Concurrent.executor(:io).serialized?

      # cache only one proxy
      fast_proxy = throttle.on :fast
      expect(throttle.on(:io)).to eq io_proxy
      expect(throttle.on(:fast)).not_to eq fast_proxy
    end

    specify 'capacity limited' do
      limit    = 4
      throttle = Concurrent::Throttle.new limit
      counter  = Concurrent::AtomicFixnum.new
      testing  = -> i do
        counter.increment
        sleep rand * 0.02 + 0.02
        # returns less then 3 since it's throttled
        v = counter.value
        counter.decrement
        v
      end

      result = Concurrent::Promises.zip(
          *20.times.map { |i| throttle.future(i, &testing) }
      ).value!
      expect(result.all? { |v| v <= limit }).to be_truthy, result.to_s

      result = Array.new(20) do |i1|
        Thread.new(i1) { |i2| throttle.acquire { testing.call i2 } }
      end.map(&:value)
      expect(result.all? { |v| v <= limit }).to be_truthy, result.to_s

      throttled_futures = 20.times.map do |i|
        Concurrent::Promises.
            fulfilled_future(i).
            then_on(throttle.on(:io), &testing)
      end

      result = Concurrent::Promises.zip(*throttled_futures).value!
      expect(result.all? { |v| v <= limit }).to be_truthy, result.to_s
    end
  end
end