File: extend.rb

package info (click to toggle)
ruby-delayer 1.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 192 kB
  • sloc: ruby: 947; makefile: 3; sh: 2
file content (273 lines) | stat: -rw-r--r-- 7,014 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# frozen_string_literal: true

require 'set'

module Delayer
  attr_reader :priority

  class Bucket
    attr_accessor :first, :last, :priority_of, :stashed

    def initialize(first, last, priority_of, stashed)
      @first = first
      @last = last
      @priority_of = priority_of
      @stashed = stashed
    end

    def stash_size
      s = stashed
      if s
        1 + s.stash_size
      else
        0
      end
    end
  end

  def self.included(klass)
    klass.class_eval do
      extend Extend
    end
  end

  def initialize(priority = self.class.instance_eval { @default_priority }, *_args, delay: 0, &proc)
    self.class.validate_priority priority
    @priority = priority
    if delay == 0
      @procedure = Procedure.new(self, &proc)
    else
      @procedure = DelayedProcedure.new(self, delay: delay, &proc)
    end
  end

  # Cancel this job
  # ==== Exception
  # Delayer::AlreadyExecutedError :: if already called run()
  # ==== Return
  # self
  def cancel
    @procedure.cancel
    self
  end

  module Extend
    attr_accessor :expire
    attr_reader :exception

    def self.extended(klass)
      klass.class_eval do
        @busy = false
        @expire = 0
        @remain_hook = nil
        @exception = nil
        @remain_received = false
        @lock = Monitor.new
        @bucket = Bucket.new(nil, nil, {}, nil)
        @last_reserve = nil
        @reserves = Set.new
      end
    end

    def pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC))
      if @last_reserve&.reserve_at&.<=(start_time)
        lock.synchronize do
          while @last_reserve&.reserve_at&.<=(start_time)
            @last_reserve.register
            @last_reserve = @reserves.min
            @reserves.delete(@last_reserve)
          end
        end
      end
    end

    # Run registered jobs.
    # ==== Args
    # [current_expire] expire for processing (secs, 0=unexpired)
    # ==== Return
    # self
    def run(current_expire = @expire)
      start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_f
      pop_reserve(start_time)
      if current_expire == 0
        run_once_without_pop_reserve until empty?
      else
        @end_time = end_time = start_time + @expire
        run_once_without_pop_reserve while !empty? && (end_time >= Process.clock_gettime(Process::CLOCK_MONOTONIC))
        @end_time = nil
      end
      if @remain_hook
        @remain_received = !empty?
        @remain_hook.call if @remain_received
      end
    rescue Exception => e
      @exception = e
      raise e
    end

    def expire?
      !!@end_time&.<(Time.new.to_f)
    end

    # Run a job and forward pointer.
    # ==== Return
    # self
    def run_once
      pop_reserve
      run_once_without_pop_reserve
    end

    private def run_once_without_pop_reserve
      if @bucket.first
        @busy = true
        procedure = forward
        procedure = forward while @bucket.first && procedure&.canceled?
        if procedure && !procedure.canceled?
          procedure.run
        end
      end
    ensure
      @busy = false
    end

    # Return if some jobs processing now.
    # ==== Args
    # [args]
    # ==== Return
    # true if Delayer processing job
    def busy?
      @busy
    end

    # Return true if no jobs has.
    # ==== Return
    # true if no jobs has.
    def empty?
      !@bucket.first
    end

    # Return remain jobs quantity.
    # ==== Return
    # Count of remain jobs
    def size(node = @bucket.first)
      if node
        1 + size(node.next)
      else
        0
      end
    end

    # register new job.
    # ==== Args
    # [procedure] job(Delayer::Procedure)
    # ==== Return
    # self
    def register(procedure)
      priority = procedure.delayer.priority
      lock.synchronize do
        last_pointer = get_prev_point(priority)
        if last_pointer
          @bucket.priority_of[priority] = last_pointer.break procedure
        else
          procedure.next = @bucket.first
          @bucket.priority_of[priority] = @bucket.first = procedure
        end
        @bucket.last = @bucket.priority_of[priority] if @bucket.last
        if @remain_hook && !@remain_received
          @remain_received = true
          @remain_hook.call
        end
      end
      self
    end

    # Register reserved job.
    # It does not execute immediately.
    # it calls register() in _procedure.reserve_at_.
    # ==== Args
    # [procedure] job(Delayer::DelayedProcedure)
    # ==== Return
    # self
    def reserve(procedure)
      lock.synchronize do
        if @last_reserve
          if @last_reserve > procedure
            @reserves.add(@last_reserve)
            @last_reserve = procedure
          else
            @reserves.add(procedure)
          end
        else
          @last_reserve = procedure
        end
      end
      self
    end

    def register_remain_hook(&proc)
      @remain_hook = proc
    end

    def get_prev_point(priority)
      if @bucket.priority_of[priority]
        @bucket.priority_of[priority]
      else
        @priorities.index(priority)&.yield_self do |index|
          next_index = index - 1
          get_prev_point @priorities[next_index] if next_index >= 0
        end
      end
    end

    def validate_priority(symbol)
      unless @priorities.include? symbol
        raise Delayer::InvalidPriorityError, "undefined priority '#{symbol}'"
      end
    end

    # DelayerのStashレベルをインクリメントする。
    # このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。
    def stash_enter!
      @bucket = Bucket.new(nil, nil, {}, @bucket)
      self
    end

    # DelayerのStashレベルをデクリメントする。
    # このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。
    # ==== Raises
    # [Delayer::NoLowerLevelError] stash_enter!が呼ばれていない時
    # [Delayer::RemainJobsError] ジョブが残っているのにこのメソッドを呼んだ時
    def stash_exit!
      stashed = @bucket.stashed
      raise Delayer::NoLowerLevelError, 'stash_exit! called in level 0.' unless stashed
      raise Delayer::RemainJobsError, 'Current level has remain jobs. It must be empty current level jobs in call this method.' unless empty?

      @bucket = stashed
    end

    # 現在のDelayer Stashレベルを返す。
    def stash_level
      @bucket.stash_size
    end

    private

    def forward
      lock.synchronize do
        prev = @bucket.first
        raise 'Current bucket not found' unless prev
        nex = @bucket.first = prev.next
        @bucket.last = nil unless nex
        @bucket.priority_of.each do |priority, pointer|
          @bucket.priority_of[priority] = nex if prev == pointer
        end
        prev.next = nil
        prev
      end
    end

    def lock
      @lock
    end
  end
end