File: scheduler.rb

package info (click to toggle)
ruby-rack-timeout 0.7.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212 kB
  • sloc: ruby: 515; makefile: 4
file content (153 lines) | stat: -rw-r--r-- 6,092 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
require_relative "namespace"
require_relative "monotonic_time"

# Runs code at a later time
#
# Basic usage:
#
#     Scheduler.run_in(5) { do_stuff }  # <- calls do_stuff 5 seconds from now
#
# Scheduled events run in sequence in a separate thread, the main thread continues on.
# That means you may need to #join the scheduler if the main thread is only waiting on scheduled events to run.
#
#     Scheduler.join
#
# Basic usage is through a singleton instance, its methods are available as class methods, as shown above.
# One could also instantiate separate instances which would get you separate run threads, but generally there's no point in it.
class Rack::Timeout::Scheduler
  MAX_IDLE_SECS = 30 # how long the runner thread is allowed to live doing nothing
  include Rack::Timeout::MonotonicTime # gets us the #fsecs method

  # stores a proc to run later, and the time it should run at
  class RunEvent < Struct.new(:monotime, :proc)
    def initialize(*args)
      @cancelled = false
      super(*args)
    end

    def cancel!
      @cancelled = true
    end

    def cancelled?
      !!@cancelled
    end

    def run!
      return if @cancelled
      proc.call(self)
    end
  end

  class RepeatEvent < RunEvent
    def initialize(monotime, proc, every)
      @start = monotime
      @every = every
      @iter  = 0
      super(monotime, proc)
    end

    def run!
      super
    ensure
      self.monotime = @start + @every * (@iter += 1) until monotime >= Rack::Timeout::MonotonicTime.fsecs
    end
  end

  def initialize
    @runner    = nil
    @events    = []         # array of `RunEvent`s
    @mx_events = Mutex.new  # mutex to change said array
    @mx_runner = Mutex.new  # mutex for creating a runner thread
  end


  private

  # returns the runner thread, creating it if needed
  def runner
    @mx_runner.synchronize {
      return @runner unless @runner.nil? || !@runner.alive?
      @joined = false
      @runner = Thread.new { run_loop! }
    }
  end

  # the actual runner thread loop
  def run_loop!
    Thread.current.abort_on_exception = true                       # always be aborting
    sleep_for, run, last_run = nil, nil, fsecs                     # sleep_for: how long to sleep before next run; last_run: time of last run; run: just initializing it outside of the synchronize scope, will contain events to run now
    loop do                                                        # begin event reader loop
      @mx_events.synchronize {                                     #
        @events.reject!(&:cancelled?)                              # get rid of cancelled events
        if @events.empty?                                          # if there are no further events …
          return if @joined                                        # exit the run loop if this runner thread has been joined (the thread will die and the join will return)
          return if fsecs - last_run > MAX_IDLE_SECS               # exit the run loop if done nothing for the past MAX_IDLE_SECS seconds
          sleep_for = MAX_IDLE_SECS                                # sleep for MAX_IDLE_SECS (mind it that we get awaken when new events are scheduled)
        else                                                       #
          sleep_for = [@events.map(&:monotime).min - fsecs, 0].max # if we have events, set to sleep until it's time for the next one to run. (the max bit ensure we don't have negative sleep times)
        end                                                        #
        @mx_events.sleep sleep_for                                 # do sleep
                                                                   #
        now = fsecs                                                #
        run, defer = @events.partition { |ev| ev.monotime <= now } # separate events to run now and events to run later
        defer += run.select { |ev| ev.is_a? RepeatEvent }          # repeat events both run and are deferred
        @events.replace(defer)                                     # keep only events to run later
      }                                                            #
                                                                   #
      next if run.empty?                                           # done here if there's nothing to run now
      run.sort_by(&:monotime).each { |ev| ev.run! }                # run the events scheduled to run now
      last_run = fsecs                                             # store that we did run things at this time, go immediately on to the next loop iteration as it may be time to run more things
    end
  end


  public

  # waits on the runner thread to finish
  def join
    @joined = true
    runner.join
  end

  # adds a RunEvent struct to the run schedule
  def schedule(event)
    @mx_events.synchronize { @events << event }
    runner.run  # wakes up the runner thread so it can recalculate sleep length taking this new event into consideration
    return event
  end

  # reschedules an event by the given number of seconds. can be negative to run sooner.
  # returns nil and does nothing if the event is not already in the queue (might've run already), otherwise updates the event time in-place; returns the updated event.
  def delay(event, secs)
    @mx_events.synchronize {
      return unless @events.include? event
      event.monotime += secs
      runner.run
      return event
    }
  end

  # schedules a block to run in the given number of seconds; returns the created event object
  def run_in(secs, &block)
    schedule RunEvent.new(fsecs + secs, block)
  end

  # schedules a block to run every x seconds; returns the created event object
  def run_every(seconds, &block)
    schedule RepeatEvent.new(fsecs, block, seconds)
  end


  ### Singleton access

  # accessor to the singleton instance
  def self.singleton
    @singleton ||= new
  end

  # define public instance methods as class methods that delegate to the singleton instance
  instance_methods(false).each do |m|
    define_singleton_method(m) { |*a, &b| singleton.send(m, *a, &b) }
  end
end