1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
require "thread"
module Sequel
class Worker < Thread
attr_reader :queue
attr_reader :errors
def initialize(db = nil)
@queue = Queue.new
@errors = []
t = self
t.abort_on_exception = true
@transaction = !db.nil?
db ? super {db.transaction {t.work}} : super {t.work}
end
def work
loop {next_job}
rescue Sequel::Error::WorkerStop # signals the worker thread to stop
ensure
raise Sequel::Error::Rollback if @transaction && !@errors.empty?
end
def busy?
@cur || !@queue.empty?
end
def async(proc = nil, &block)
@queue << (proc || block)
self
end
alias_method :add, :async
alias_method :<<, :async
def join
while busy?
sleep 0.1
end
self.raise Error::WorkerStop
super
end
private
def next_job
@cur = @queue.pop
@cur.call
rescue Error::WorkerStop => e
raise e
rescue Exception => e
@errors << e
ensure
@cur = nil
end
end
end
|