1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
# -*- coding: utf-8 -*-
=begin rdoc
=Reserver - 指定時間にブロックを実行
指定された時間に一度だけブロックを実行します。調整できる単位は秒です。
アプリケーションが終了するなど、予期せぬ自体が起こると実行されないことがあります。
=end
require 'set'
require 'delegate'
class Reserver < Delegator
WakeUp = Class.new(Timeout::Error)
Add = Data.define(:reserver)
Delete = Data.define(:reserver)
@queue = Thread::Queue.new
attr_reader :time, :thread_class
alias __getobj__ time
def initialize(time, thread: Thread, &proc)
raise ArgumentError.new('Block necessary for Reserver.new') unless block_given?
@proc = proc
@thread_class = thread
@sequence = :wait
case time
when Time
@time = time.freeze
when String
@time = (Time.parse time).freeze
when Integer
@time = (Time.new + time).freeze
else
raise ArgumentError.new('first argument must be Integer, String or Time')
end
Reserver.register(self)
end
# コンストラクタに渡したブロックのProcオブジェクトを返す
def to_proc
@proc
end
# Reserverの実行をキャンセルする。
# 実行がキャンセルされたReserverはスケジューラから削除され、その時刻になってもブロックが実行されない。
# このメソッドを呼ぶと、このインスタンスはfreezeされる。
# 既に実行が完了しているかキャンセルされたものに対して呼んでも何も起きない。
def cancel
if !finished?
@sequence = :cancel
freeze
Reserver.unregister(self)
end
self
rescue defined?(FrozenError) ? FrozenError : RuntimeError
end
# このReserverを実行する時間になっていれば true を返す
def expired?
sleep_time <= 0
end
# このReserverを何秒後に実行するかを返す
def sleep_time
time - Time.now
end
# このReserverの処理が既に完了している場合には true を返す
def finished?
%i<complete canceled>.include?(@sequence)
end
def inspect
"#<#{self.class} #{@sequence} in #{@proc.source_location&.join(':')} at #{time}>"
end
# 内部で呼ぶためのメソッドなので呼ばないでください
def complete
@sequence = :complete
freeze
rescue defined?(FrozenError) ? FrozenError : RuntimeError
end
class << self
def register(reserver)
@queue.push(Add.new(reserver))
end
def unregister(reserver)
@queue.push(Delete.new(reserver))
end
private
attr_reader :reservers
def sorted_reservers
if @sorted
reservers
else
@sorted = true
reservers.sort_by!(&:time)
end
end
def wait_expired
next_reserver = fetch
if next_reserver
if !next_reserver.expired?
Timeout.timeout(1 + next_reserver.sleep_time / 2, WakeUp) do
wait
end
end
else
wait
end
rescue WakeUp
# ɛ ʘɞʘ ɜ
end
def wait
operation = @queue.pop
case operation
when Add
push(operation.reserver)
when Delete
reservers.delete(operation.reserver)
end
end
def fetch
sorted_reservers[0]
end
def pop
sorted_reservers.shift
end
def push(new)
reservers.unshift(new)
@sorted = false
end
def execute(reserver)
if !reserver.finished?
reserver.thread_class.new(&reserver)
reserver.complete
end
end
end
@reservers = Array.new
Thread.new do
begin
loop do
wait_expired
execute(pop) if fetch&.expired?
end
ensure
@queue.close
end
end.abort_on_exception = true
end
|