1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
module Clockwork
class Manager
class NoHandlerDefined < RuntimeError; end
attr_reader :config
def initialize
@events = []
@callbacks = {}
@config = default_configuration
@handler = nil
@mutex = Mutex.new
@condvar = ConditionVariable.new
@finish = false
end
def thread_available?
Thread.list.select { |t| t['creator'] == self }.count < config[:max_threads]
end
def configure
yield(config)
if config[:sleep_timeout] < 1
config[:logger].warn 'sleep_timeout must be >= 1 second'
end
end
def default_configuration
{ :sleep_timeout => 1, :logger => Logger.new(STDOUT), :thread => false, :max_threads => 10 }
end
def handler(&block)
@handler = block if block_given?
raise NoHandlerDefined unless @handler
@handler
end
def error_handler(&block)
@error_handler = block if block_given?
@error_handler if instance_variable_defined?("@error_handler")
end
def on(event, options={}, &block)
raise "Unsupported callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym)
(@callbacks[event.to_sym]||=[]) << block
end
def every(period, job='unnamed', options={}, &block)
if job.is_a?(Hash) and options.empty?
options = job
job = "unnamed"
end
if options[:at].respond_to?(:each)
every_with_multiple_times(period, job, options, &block)
else
register(period, job, block, options)
end
end
def fire_callbacks(event, *args)
@callbacks[event].nil? || @callbacks[event].all? { |h| h.call(*args) }
end
def run
log "Starting clock for #{@events.size} events: [ #{@events.map(&:to_s).join(' ')} ]"
sig_read, sig_write = IO.pipe
(%w[INT TERM HUP] & Signal.list.keys).each do |sig|
trap sig do
sig_write.puts(sig)
end
end
run_tick_loop
while io = IO.select([sig_read])
sig = io.first[0].gets.chomp
handle_signal(sig)
end
end
def handle_signal(sig)
logger.debug "Got #{sig} signal"
case sig
when 'INT'
shutdown
when 'TERM'
# Heroku sends TERM signal, and waits 10 seconds before exit
graceful_shutdown
when 'HUP'
graceful_shutdown
end
end
def shutdown
logger.info 'Shutting down'
stop_tick_loop
exit(0)
end
def graceful_shutdown
logger.info 'Gracefully shutting down'
stop_tick_loop
wait_tick_loop_finishes
exit(0)
end
def stop_tick_loop
@finish = true
end
def wait_tick_loop_finishes
@mutex.synchronize do # wait by synchronize
@condvar.signal
end
end
def run_tick_loop
Thread.new do
@mutex.synchronize do
until @finish
tick
interval = config[:sleep_timeout] - Time.now.subsec + 0.001
@condvar.wait(@mutex, interval) if interval > 0
end
end
end
end
def tick(t=Time.now)
if (fire_callbacks(:before_tick))
events = events_to_run(t)
events.each do |event|
if (fire_callbacks(:before_run, event, t))
event.run(t)
fire_callbacks(:after_run, event, t)
end
end
end
fire_callbacks(:after_tick)
events
end
def logger
config[:logger]
end
def log_error(e)
config[:logger].error(e)
end
def handle_error(e)
error_handler.call(e) if error_handler
end
def log(msg)
config[:logger].info(msg)
end
private
def events_to_run(t)
@events.select do |event|
begin
event.run_now?(t)
rescue => e
log_error(e)
handle_error(e)
false
end
end
end
def register(period, job, block, options)
event = Event.new(self, period, job, block || handler, options)
@events << event
event
end
def every_with_multiple_times(period, job, options={}, &block)
each_options = options.clone
options[:at].each do |at|
each_options[:at] = at
register(period, job, block, each_options)
end
end
end
end
|