Class: Concurrent::Throttle

Inherits:
Synchronization::Object show all
Includes:
PromisesIntegration
Defined in:
lib/concurrent/edge/throttle.rb

Overview

A tool manage concurrency level of future tasks.

Examples:

data     = (1..5).to_a
db       = data.reduce({}) { |h, v| h.update v => v.to_s }
max_two  = Throttle.new 2

futures = data.map do |data|
  Promises.future(data) do |data|
    # un-throttled, concurrency level equal data.size
    data + 1
  end.then_throttled_by(max_two, db) do |v, db|
    # throttled, only 2 tasks executed at the same time
    # e.g. limiting access to db
    db[v]
  end
end

futures.map(&:value!) # => [2, 3, 4, 5, nil]
throttle.throttled_future(1) do |arg|
  arg.succ
end
throttle.throttled_future_chain do |trigger|
  trigger.
      # 2 throttled promises
      chain { 1 }.
      then(&:succ)
end
max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Defined Under Namespace

Modules: PromisesIntegration

Instance Method Summary (collapse)

Constructor Details

- (Throttle) initialize(limit)

New throttle.

Parameters:

  • limit (Integer)


59
60
61
62
63
64
# File 'lib/concurrent/edge/throttle.rb', line 59

def initialize(limit)
  super()
  @Limit       = limit
  self.can_run = limit
  @Queue       = LockFreeQueue.new
end

Instance Method Details

- (Integer) limit

Returns The limit.

Returns:

  • (Integer)

    The limit.



67
68
69
# File 'lib/concurrent/edge/throttle.rb', line 67

def limit
  @Limit
end

- (self) release

Has to be called once for each trigger after it is ok to execute another throttled task.

Returns:

  • (self)

See Also:



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/concurrent/edge/throttle.rb', line 93

def release
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run + 1
      if current_can_run < 0
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

- (Object) throttled_block { ... }

Blocks current thread until the block can be executed.

Examples:

max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Yields:

  • to throttled block

Yield Returns:

  • (Object)

    is used as a result of the method

Returns:

  • (Object)

    the result of the block



111
112
113
114
115
116
# File 'lib/concurrent/edge/throttle.rb', line 111

def throttled_block(&block)
  trigger.wait
  block.call
ensure
  release
end

- (Promises::Future) throttled_future(*args, &task) Originally defined in module PromisesIntegration

Behaves as Promises::FactoryMethods#future but the future is throttled.

Examples:

throttle.throttled_future(1) do |arg|
  arg.succ
end

Returns:

See Also:

- (Promises::Event, Promises::Future) throttled_future_chain {|trigger| ... } Originally defined in module PromisesIntegration

Allows to throttle a chain of promises.

Examples:

throttle.throttled_future_chain do |trigger|
  trigger.
      # 2 throttled promises
      chain { 1 }.
      then(&:succ)
end

Yields:

  • (trigger)

    a trigger which has to be used to build up a chain of promises, the last one is result of the block. When the last one resolves, Concurrent::Throttle#release is called on the throttle.

Yield Parameters:

Yield Returns:

Returns:

- (String) to_s Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



119
120
121
# File 'lib/concurrent/edge/throttle.rb', line 119

def to_s
  format '<#%s:0x%x limit:%s can_run:%d>', self.class, object_id << 1, @Limit, can_run
end

- (Promises::Event) trigger

New event which will be resolved when depending tasks can execute. Has to be used and after the critical work is done #release must be called exactly once.

Returns:

See Also:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/concurrent/edge/throttle.rb', line 75

def trigger
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run - 1
      if current_can_run > 0
        return Promises.resolved_event
      else
        event = Promises.resolvable_event
        @Queue.push event
        return event
      end
    end
  end
end