1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
# -*- coding: utf-8 -*-
require_relative 'utils'
require 'delayer'
require 'delayer/deferred'
require 'set'
require 'thread'
require 'timeout'
# 渡されたブロックを順番に実行するクラス
class SerialThreadGroup
QueueExpire = Class.new(Timeout::Error)
# ブロックを同時に処理する個数。最大でこの数だけThreadが作られる
attr_accessor :max_threads
@@force_exit = false
def initialize(max_threads: 1, deferred: nil)
@lock = Monitor.new
@queue = Queue.new
@max_threads = max_threads
@deferred_class = deferred
@thread_pool = Set.new
end
# 実行するブロックを新しく登録する
# ==== Args
# [proc] 実行するブロック
def push(proc=nil, &block)
proc ||= block
promise = @deferred_class && @deferred_class.new(true)
return promise if @@force_exit
@lock.synchronize{
@queue.push(proc: proc, promise: promise)
new_thread if 0 == @queue.num_waiting and @thread_pool.size < max_threads }
promise
end
alias new push
# 処理中なら真
def busy?
@thread_pool.any?{ |t| :run == t.status.to_sym } end
# 全てのserial threadの実行をキャンセルする。終了時の処理用
def self.force_exit!
notice "all Serial Thread Group jobs canceled."
@@force_exit = true end
private
# Threadが必要なら一つ立ち上げる。
# これ以上Threadが必要ない場合はtrueを返す。
def flush
return true if @@force_exit
@lock.synchronize{
@thread_pool.delete_if{ |t| not t.alive? }
if @thread_pool.size > max_threads
return true
elsif 0 == @queue.num_waiting and @thread_pool.size < max_threads
new_thread end }
false end
def new_thread
return if @@force_exit
@thread_pool << Thread.new do
while node = Timeout.timeout(1, QueueExpire){ @queue.pop }
break if @@force_exit
result = node[:proc].call
node[:promise].call(result) if node[:promise]
break if flush
debugging_wait
Thread.pass
end
rescue QueueExpire, ThreadError
rescue Object => e
if node[:promise]
node[:promise].fail(e)
else
error e
abort
end
ensure
@lock.synchronize do
@thread_pool.delete(Thread.current)
end
end
end
end
# SerialThreadGroup のインスタンス。
# 同時実行数は1固定
SerialThread = SerialThreadGroup.new
|