1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
|
require 'monitor'
# Ruby 1.9 specific fixes.
unless RUBY_VERSION < '1.9'
require 'god/compat19'
end
module God
# The TimedEvent class represents an event in the future. This class is used
# by the drivers to schedule upcoming conditional tests and other scheduled
# events.
class TimedEvent
include Comparable
# The Time at which this event is due.
attr_accessor :at
# Instantiate a new TimedEvent that will be triggered after the specified
# delay.
#
# delay - The optional Numeric number of seconds from now at which to
# trigger (default: 0).
def initialize(delay = 0)
self.at = Time.now + delay
end
# Is the current event due (current time >= event time)?
#
# Returns true if the event is due, false if not.
def due?
Time.now >= self.at
end
# Compare this event to another.
#
# other - The other TimedEvent.
#
# Returns -1 if this event is before the other, 0 if the two events are
# due at the same time, 1 if the other event is later.
def <=>(other)
self.at <=> other.at
end
end
# A DriverEvent is a TimedEvent with an associated Task and Condition. This
# is the primary mechanism for poll conditions to be scheduled.
class DriverEvent < TimedEvent
# Initialize a new DriverEvent.
#
# delay - The Numeric delay for this event.
# task - The Task associated with this event.
# condition - The Condition associated with this event.
def initialize(delay, task, condition)
super(delay)
@task = task
@condition = condition
end
# Handle this event by invoking the underlying condition on the associated
# task.
#
# Returns nothing.
def handle_event
@task.handle_poll(@condition)
end
end
# A DriverOperation is a TimedEvent that is due as soon as possible. It is
# used to execute an arbitrary method on the associated Task.
class DriverOperation < TimedEvent
# Initialize a new DriverOperation.
#
# task - The Task upon which to operate.
# name - The Symbol name of the method to call.
# args - The Array of arguments to send to the method.
def initialize(task, name, args)
super(0)
@task = task
@name = name
@args = args
end
# Handle the operation that was issued asynchronously.
#
# Returns nothing.
def handle_event
@task.send(@name, *@args)
end
end
# The DriverEventQueue is a simple queue that holds TimedEvent instances in
# order to maintain the schedule of upcoming events.
class DriverEventQueue
# Initialize a DriverEventQueue.
def initialize
@shutdown = false
@events = []
@monitor = Monitor.new
@resource = @monitor.new_cond
end
# Wake any sleeping threads after setting the sentinel.
#
# Returns nothing.
def shutdown
@shutdown = true
@monitor.synchronize do
@resource.broadcast
end
end
# Wait until the queue has something due, pop it off the queue, and return
# it.
#
# Returns the popped event.
def pop
@monitor.synchronize do
if @events.empty?
raise ThreadError, "queue empty" if @shutdown
@resource.wait
else
delay = @events.first.at - Time.now
@resource.wait(delay) if delay > 0
end
@events.shift
end
end
# Add an event to the queue, wake any waiters if what we added needs to
# happen sooner than the next pending event.
#
# Returns nothing.
def push(event)
@monitor.synchronize do
@events << event
@events.sort!
# If we've sorted the events and found the one we're adding is at
# the front, it will likely need to run before the next due date.
@resource.signal if @events.first == event
end
end
# Returns true if the queue is empty, false if not.
def empty?
@events.empty?
end
# Clear the queue.
#
# Returns nothing.
def clear
@events.clear
end
# Returns the Integer length of the queue.
def length
@events.length
end
alias size length
end
# The Driver class is responsible for scheduling all of the events for a
# given Task.
class Driver
# The Thread running the driver loop.
attr_reader :thread
# Instantiate a new Driver and start the scheduler loop to handle events.
#
# task - The Task this Driver belongs to.
def initialize(task)
@task = task
@events = God::DriverEventQueue.new
@thread = Thread.new do
loop do
begin
@events.pop.handle_event
rescue ThreadError => e
# queue is empty
break
rescue Object => e
message = format("Unhandled exception in driver loop - (%s): %s\n%s",
e.class, e.message, e.backtrace.join("\n"))
applog(nil, :fatal, message)
end
end
end
end
# Check if we're in the driver context.
#
# Returns true if in driver thread, false if not.
def in_driver_context?
Thread.current == @thread
end
# Clear all events for this Driver.
#
# Returns nothing.
def clear_events
@events.clear
end
# Shutdown the DriverEventQueue threads.
#
# Returns nothing.
def shutdown
@events.shutdown
end
# Queue an asynchronous message.
#
# name - The Symbol name of the operation.
# args - An optional Array of arguments.
#
# Returns nothing.
def message(name, args = [])
@events.push(DriverOperation.new(@task, name, args))
end
# Create and schedule a new DriverEvent.
#
# condition - The Condition.
# delay - The Numeric number of seconds to delay (default: interval
# defined in condition).
#
# Returns nothing.
def schedule(condition, delay = condition.interval)
applog(nil, :debug, "driver schedule #{condition} in #{delay} seconds")
@events.push(DriverEvent.new(delay, @task, condition))
end
end
end
|