1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
# frozen_string_literal: true
class RedisClient
class Cluster
module ConcurrentWorker
class OnDemand
def initialize(size:)
@q = SizedQueue.new(size)
end
def new_group(size:)
::RedisClient::Cluster::ConcurrentWorker::Group.new(
worker: self,
queue: SizedQueue.new(size),
size: size
)
end
def push(task)
@q << spawn_worker(task, @q)
end
def close
@q.clear
@q.close
nil
end
def inspect
"#<#{self.class.name} active: #{@q.size}, max: #{@q.max}>"
end
private
def spawn_worker(task, queue)
Thread.new(task, queue) do |t, q|
t.exec
q.pop
end
end
end
end
end
end
|