Class: Concurrent::Throttle

Inherits:
Synchronization::Object show all
Includes:
PromisesIntegration
Defined in:
lib-edge/concurrent/edge/throttle.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A tool manage concurrency level of future tasks.

Examples:

data     = (1..5).to_a
db       = data.reduce({}) { |h, v| h.update v => v.to_s }
max_two  = Throttle.new 2

futures = data.map do |data|
  Promises.future(data) do |data|
    # un-throttled, concurrency level equal data.size
    data + 1
  end.then_throttled_by(max_two, db) do |v, db|
    # throttled, only 2 tasks executed at the same time
    # e.g. limiting access to db
    db[v]
  end
end

futures.map(&:value!) # => [2, 3, 4, 5, nil]
throttle.throttled_future(1) do |arg|
  arg.succ
end
throttle.throttled_future_chain do |trigger|
  trigger.
      # 2 throttled promises
      chain { 1 }.
      then(&:succ)
end
max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Defined Under Namespace

Modules: PromisesIntegration

Instance Method Summary collapse

Constructor Details

#initialize(limit) ⇒ Throttle

New throttle.

Parameters:

  • limit (Integer)


61
62
63
64
65
66
# File 'lib-edge/concurrent/edge/throttle.rb', line 61

def initialize(limit)
  super()
  @Limit       = limit
  self.can_run = limit
  @Queue       = LockFreeQueue.new
end

Instance Method Details

#limitInteger

Returns The limit.

Returns:

  • (Integer)

    The limit.



69
70
71
# File 'lib-edge/concurrent/edge/throttle.rb', line 69

def limit
  @Limit
end

#releaseself

Has to be called once for each trigger after it is ok to execute another throttled task.

Returns:

  • (self)

See Also:



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib-edge/concurrent/edge/throttle.rb', line 95

def release
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run + 1
      if current_can_run < 0
        # release called after trigger which pushed a trigger, busy wait is ok
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

#throttled_block { ... } ⇒ Object

Blocks current thread until the block can be executed.

Examples:

max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Yields:

  • to throttled block

Yield Returns:

  • (Object)

    is used as a result of the method

Returns:

  • (Object)

    the result of the block



114
115
116
117
118
119
# File 'lib-edge/concurrent/edge/throttle.rb', line 114

def throttled_block(&block)
  trigger.wait
  block.call
ensure
  release
end

#throttled_future(*args, &task) ⇒ Promises::Future Originally defined in module PromisesIntegration

Behaves as Promises::FactoryMethods#future but the future is throttled.

Examples:

throttle.throttled_future(1) do |arg|
  arg.succ
end

Returns:

See Also:

#throttled_future_chain {|trigger| ... } ⇒ Promises::Event, Promises::Future Originally defined in module PromisesIntegration

Allows to throttle a chain of promises.

Examples:

throttle.throttled_future_chain do |trigger|
  trigger.
      # 2 throttled promises
      chain { 1 }.
      then(&:succ)
end

Yields:

  • (trigger)

    a trigger which has to be used to build up a chain of promises, the last one is result of the block. When the last one resolves, Concurrent::Throttle#release is called on the throttle.

Yield Parameters:

Yield Returns:

Returns:

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



122
123
124
# File 'lib-edge/concurrent/edge/throttle.rb', line 122

def to_s
  format '%s limit:%s can_run:%d>', super[0..-2], @Limit, can_run
end

#triggerPromises::Event

New event which will be resolved when depending tasks can execute. Has to be used and after the critical work is done #release must be called exactly once.

Returns:

See Also:



77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib-edge/concurrent/edge/throttle.rb', line 77

def trigger
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run - 1
      if current_can_run > 0
        return Promises.resolved_event
      else
        event = Promises.resolvable_event
        @Queue.push event
        return event
      end
    end
  end
end