File: driver.rb

package info (click to toggle)
ruby-god 0.12.1-1
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 752 kB
  • sloc: ruby: 5,913; ansic: 217; makefile: 3
file content (238 lines) | stat: -rw-r--r-- 6,196 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
require 'monitor'

# Ruby 1.9 specific fixes.
unless RUBY_VERSION < '1.9'
  require 'god/compat19'
end

module God
  # The TimedEvent class represents an event in the future. This class is used
  # by the drivers to schedule upcoming conditional tests and other scheduled
  # events.
  class TimedEvent
    include Comparable

    # The Time at which this event is due.
    attr_accessor :at

    # Instantiate a new TimedEvent that will be triggered after the specified
    # delay.
    #
    # delay - The optional Numeric number of seconds from now at which to
    #         trigger (default: 0).
    def initialize(delay = 0)
      self.at = Time.now + delay
    end

    # Is the current event due (current time >= event time)?
    #
    # Returns true if the event is due, false if not.
    def due?
      Time.now >= self.at
    end

    # Compare this event to another.
    #
    # other - The other TimedEvent.
    #
    # Returns -1 if this event is before the other, 0 if the two events are
    #   due at the same time, 1 if the other event is later.
    def <=>(other)
      self.at <=> other.at
    end
  end

  # A DriverEvent is a TimedEvent with an associated Task and Condition. This
  # is the primary mechanism for poll conditions to be scheduled.
  class DriverEvent < TimedEvent
    # Initialize a new DriverEvent.
    #
    # delay     - The Numeric delay for this event.
    # task      - The Task associated with this event.
    # condition - The Condition associated with this event.
    def initialize(delay, task, condition)
      super(delay)
      @task = task
      @condition = condition
    end

    # Handle this event by invoking the underlying condition on the associated
    # task.
    #
    # Returns nothing.
    def handle_event
      @task.handle_poll(@condition)
    end
  end

  # A DriverOperation is a TimedEvent that is due as soon as possible. It is
  # used to execute an arbitrary method on the associated Task.
  class DriverOperation < TimedEvent
    # Initialize a new DriverOperation.
    #
    # task - The Task upon which to operate.
    # name - The Symbol name of the method to call.
    # args - The Array of arguments to send to the method.
    def initialize(task, name, args)
      super(0)
      @task = task
      @name = name
      @args = args
    end

    # Handle the operation that was issued asynchronously.
    #
    # Returns nothing.
    def handle_event
      @task.send(@name, *@args)
    end
  end

  # The DriverEventQueue is a simple queue that holds TimedEvent instances in
  # order to maintain the schedule of upcoming events.
  class DriverEventQueue
    # Initialize a DriverEventQueue.
    def initialize
      @shutdown = false
      @events = []
      @monitor = Monitor.new
      @resource = @monitor.new_cond
    end

    # Wake any sleeping threads after setting the sentinel.
    #
    # Returns nothing.
    def shutdown
      @shutdown = true
      @monitor.synchronize do
        @resource.broadcast
      end
    end

    # Wait until the queue has something due, pop it off the queue, and return
    # it.
    #
    # Returns the popped event.
    def pop
      @monitor.synchronize do
        if @events.empty?
          raise ThreadError, "queue empty" if @shutdown
          @resource.wait
        else
          delay = @events.first.at - Time.now
          @resource.wait(delay) if delay > 0
        end

        @events.shift
      end
    end

    # Add an event to the queue, wake any waiters if what we added needs to
    # happen sooner than the next pending event.
    #
    # Returns nothing.
    def push(event)
      @monitor.synchronize do
        @events << event
        @events.sort!

        # If we've sorted the events and found the one we're adding is at
        # the front, it will likely need to run before the next due date.
        @resource.signal if @events.first == event
      end
    end

    # Returns true if the queue is empty, false if not.
    def empty?
      @events.empty?
    end

    # Clear the queue.
    #
    # Returns nothing.
    def clear
      @events.clear
    end

    # Returns the Integer length of the queue.
    def length
      @events.length
    end

    alias size length
  end

  # The Driver class is responsible for scheduling all of the events for a
  # given Task.
  class Driver
    # The Thread running the driver loop.
    attr_reader :thread

    # Instantiate a new Driver and start the scheduler loop to handle events.
    #
    # task - The Task this Driver belongs to.
    def initialize(task)
      @task = task
      @events = God::DriverEventQueue.new

      @thread = Thread.new do
        loop do
          begin
            @events.pop.handle_event
          rescue ThreadError => e
            # queue is empty
            break
          rescue Object => e
            message = format("Unhandled exception in driver loop - (%s): %s\n%s",
                             e.class, e.message, e.backtrace.join("\n"))
            applog(nil, :fatal, message)
          end
        end
      end
    end

    # Check if we're in the driver context.
    #
    # Returns true if in driver thread, false if not.
    def in_driver_context?
      Thread.current == @thread
    end

    # Clear all events for this Driver.
    #
    # Returns nothing.
    def clear_events
      @events.clear
    end

    # Shutdown the DriverEventQueue threads.
    #
    # Returns nothing.
    def shutdown
      @events.shutdown
    end

    # Queue an asynchronous message.
    #
    # name - The Symbol name of the operation.
    # args - An optional Array of arguments.
    #
    # Returns nothing.
    def message(name, args = [])
      @events.push(DriverOperation.new(@task, name, args))
    end

    # Create and schedule a new DriverEvent.
    #
    # condition - The Condition.
    # delay     - The Numeric number of seconds to delay (default: interval
    #             defined in condition).
    #
    # Returns nothing.
    def schedule(condition, delay = condition.interval)
      applog(nil, :debug, "driver schedule #{condition} in #{delay} seconds")

      @events.push(DriverEvent.new(delay, @task, condition))
    end
  end
end