File: serialthread.rb

package info (click to toggle)
mikutter 5.0.4%2Bdfsg1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 9,700 kB
  • sloc: ruby: 21,307; sh: 181; makefile: 19
file content (95 lines) | stat: -rw-r--r-- 2,465 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# -*- coding: utf-8 -*-

require_relative 'utils'

require 'delayer'
require 'delayer/deferred'
require 'set'
require 'thread'
require 'timeout'

# 渡されたブロックを順番に実行するクラス
class SerialThreadGroup
  QueueExpire = Class.new(Timeout::Error)

  # ブロックを同時に処理する個数。最大でこの数だけThreadが作られる
  attr_accessor :max_threads

  @@force_exit = false

  def initialize(max_threads: 1, deferred: nil)
    @lock = Monitor.new
    @queue = Queue.new
    @max_threads = max_threads
    @deferred_class = deferred
    @thread_pool = Set.new
  end

  # 実行するブロックを新しく登録する
  # ==== Args
  # [proc] 実行するブロック
  def push(proc=nil, &block)
    proc ||= block
    promise = @deferred_class && @deferred_class.new(true)
    return promise if @@force_exit
    @lock.synchronize{
      @queue.push(proc: proc, promise: promise)
      new_thread if 0 == @queue.num_waiting and @thread_pool.size < max_threads }
    promise
  end
  alias new push

  # 処理中なら真
  def busy?
    @thread_pool.any?{ |t| :run == t.status.to_sym } end

  # 全てのserial threadの実行をキャンセルする。終了時の処理用
  def self.force_exit!
    notice "all Serial Thread Group jobs canceled."
    @@force_exit = true end

  private

  # Threadが必要なら一つ立ち上げる。
  # これ以上Threadが必要ない場合はtrueを返す。
  def flush
    return true if @@force_exit
    @lock.synchronize{
      @thread_pool.delete_if{ |t| not t.alive? }
      if @thread_pool.size > max_threads
        return true
      elsif 0 == @queue.num_waiting and @thread_pool.size < max_threads
        new_thread end }
    false end

  def new_thread
    return if @@force_exit
    @thread_pool << Thread.new do
      while node = Timeout.timeout(1, QueueExpire){ @queue.pop }
        break if @@force_exit
        result = node[:proc].call
        node[:promise].call(result) if node[:promise]
        break if flush
        debugging_wait
        Thread.pass
      end
    rescue QueueExpire, ThreadError
    rescue Object => e
      if node[:promise]
        node[:promise].fail(e)
      else
        error e
        abort
      end
    ensure
      @lock.synchronize do
        @thread_pool.delete(Thread.current)
      end
    end
  end

end

# SerialThreadGroup のインスタンス。
# 同時実行数は1固定
SerialThread = SerialThreadGroup.new