File: reserver.rb

package info (click to toggle)
mikutter 5.1.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 9,780 kB
  • sloc: ruby: 22,912; sh: 186; makefile: 21
file content (162 lines) | stat: -rw-r--r-- 3,831 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- coding: utf-8 -*-
=begin rdoc
=Reserver - 指定時間にブロックを実行
指定された時間に一度だけブロックを実行します。調整できる単位は秒です。
アプリケーションが終了するなど、予期せぬ自体が起こると実行されないことがあります。
=end

require 'set'
require 'delegate'

class Reserver < Delegator
  WakeUp = Class.new(Timeout::Error)
  Add = Data.define(:reserver)
  Delete = Data.define(:reserver)
  @queue = Thread::Queue.new

  attr_reader :time, :thread_class
  alias __getobj__ time

  def initialize(time, thread: Thread, &proc)
    raise ArgumentError.new('Block necessary for Reserver.new') unless block_given?
    @proc = proc
    @thread_class = thread
    @sequence = :wait
    case time
    when Time
      @time = time.freeze
    when String
      @time = (Time.parse time).freeze
    when Integer
      @time = (Time.new + time).freeze
    else
      raise ArgumentError.new('first argument must be Integer, String or Time')
    end
    Reserver.register(self)
  end

  # コンストラクタに渡したブロックのProcオブジェクトを返す
  def to_proc
    @proc
  end

  # Reserverの実行をキャンセルする。
  # 実行がキャンセルされたReserverはスケジューラから削除され、その時刻になってもブロックが実行されない。
  # このメソッドを呼ぶと、このインスタンスはfreezeされる。
  # 既に実行が完了しているかキャンセルされたものに対して呼んでも何も起きない。
  def cancel
    if !finished?
      @sequence = :cancel
      freeze
      Reserver.unregister(self)
    end
    self
  rescue defined?(FrozenError) ? FrozenError : RuntimeError
  end

  # このReserverを実行する時間になっていれば true を返す
  def expired?
    sleep_time <= 0
  end

  # このReserverを何秒後に実行するかを返す
  def sleep_time
    time - Time.now
  end

  # このReserverの処理が既に完了している場合には true を返す
  def finished?
    %i<complete canceled>.include?(@sequence)
  end

  def inspect
    "#<#{self.class} #{@sequence} in #{@proc.source_location&.join(':')} at #{time}>"
  end

  # 内部で呼ぶためのメソッドなので呼ばないでください
  def complete
    @sequence = :complete
    freeze
  rescue defined?(FrozenError) ? FrozenError : RuntimeError
  end

  class << self
    def register(reserver)
      @queue.push(Add.new(reserver))
    end

    def unregister(reserver)
      @queue.push(Delete.new(reserver))
    end

    private

    attr_reader :reservers

    def sorted_reservers
      if @sorted
        reservers
      else
        @sorted = true
        reservers.sort_by!(&:time)
      end
    end

    def wait_expired
      next_reserver = fetch
      if next_reserver
        if !next_reserver.expired?
          Timeout.timeout(1 + next_reserver.sleep_time / 2, WakeUp) do
            wait
          end
        end
      else
        wait
      end
    rescue WakeUp
      # ɛ ʘɞʘ ɜ
    end

    def wait
      operation = @queue.pop
      case operation
      when Add
        push(operation.reserver)
      when Delete
        reservers.delete(operation.reserver)
      end
    end

    def fetch
      sorted_reservers[0]
    end

    def pop
      sorted_reservers.shift
    end

    def push(new)
      reservers.unshift(new)
      @sorted = false
    end

    def execute(reserver)
      if !reserver.finished?
        reserver.thread_class.new(&reserver)
        reserver.complete
      end
    end
  end

  @reservers = Array.new
  Thread.new do
    begin
      loop do
        wait_expired
        execute(pop) if fetch&.expired?
      end
    ensure
      @queue.close
    end
  end.abort_on_exception = true
end