1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
# -*- coding: utf-8 -*-
=begin rdoc
コールバック機能付きキュー。コンストラクタに渡したブロックが、pushされた値の集合を引数に呼ばれる。
コールバックは、以下の条件の何れかを満たしたときに動く。
1. キューの個数が _max_ を超えたとき
2. キューに _expire_ 秒以上値の追加が無かったとき
=end
require 'thread'
require 'timeout'
class TimeLimitedQueue < Queue
extend Gem::Deprecate
TLQGroup = ThreadGroup.new
WaitingExpire = Class.new(Timeout::Error)
# 一度にキューを処理する上限を取得設定する
attr_accessor :max
# キューの待ち時間のリミットを取得設定する
attr_accessor :expire
# コールバックに渡すためのクラスを取得設定する。
# 通常Arrayだが、Setにすれば同じ値が同時に二つ入らない代わりに、高速に処理される。
# メソッド _push_ を実装しているクラスを指定する。
attr_accessor :storage_class
alias :strage_class :storage_class
deprecate :strage_class, "storage_class", 2019, 10
alias :strage_class= :storage_class=
deprecate :strage_class=, "storage_class=", 2019, 10
attr_reader :thread # :nodoc:
END{
TLQGroup.list.each{ |thread|
thread.kill if thread.alive?
thread[:queue].instance_eval{ callback } if thread[:queue] } }
def initialize(max=1024, expire=5, storage_class=Array, proc=nil, &block) # :yield: data
@thread = nil
@callback = proc || block
@max = max
@expire = expire
@storage_class = storage_class
@stock = @storage_class.new
super()
end
# 値 _value_ をキューに追加する。
def push(value)
result = super(value)
pushed_event
result end
undef_method(:enq, :<<)
private
# 待機中のスレッド内の処理
def waiting_proc
TLQGroup.add(Thread.current)
loop do
catch(:write) do
loop do
if @stock.size > max
throw :write
end
Timeout.timeout(expire, WaitingExpire){ @stock << pop }
rescue WaitingExpire
throw :write
end
end
callback if not @stock.empty?
break if empty?
end
end
def callback
# @stock.push(pop) while not empty?
stock = @stock.to_a
@callback.call(stock[0, max].freeze)
@stock = @storage_class.new(stock[max, stock.size] || []) end
# キューに値が追加された時のイベント
def pushed_event
if not(@thread and @thread.alive?)
@thread = Thread.new(&method(:waiting_proc))
TLQGroup.add(@thread)
@thread[:queue] = self
@thread.abort_on_exception = true
end end
end
|