File: worker.rb

package info (click to toggle)
libsequel-core-ruby 1.5.1-1
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 648 kB
  • ctags: 840
  • sloc: ruby: 10,949; makefile: 36
file content (58 lines) | stat: -rw-r--r-- 1,067 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
require "thread"

module Sequel

  class Worker < Thread
      
    attr_reader :queue
    attr_reader :errors
  
    def initialize(db = nil)
      @queue = Queue.new
      @errors = []
      t = self
      t.abort_on_exception = true
      @transaction = !db.nil?
      db ? super {db.transaction {t.work}} : super {t.work}
    end
    
    def work
      loop {next_job}
    rescue Sequel::Error::WorkerStop # signals the worker thread to stop
    ensure
      raise Sequel::Error::Rollback if @transaction && !@errors.empty?
    end
    
    def busy?
      @cur || !@queue.empty?
    end
  
    def async(proc = nil, &block)
      @queue << (proc || block)
      self
    end
    alias_method :add, :async
    alias_method :<<, :async
  
    def join
      while busy?
        sleep 0.1
      end
      self.raise Error::WorkerStop
      super
    end

    private
    def next_job
      @cur = @queue.pop
      @cur.call
    rescue Error::WorkerStop => e
      raise e
    rescue Exception => e
      @errors << e
    ensure
      @cur = nil
    end
  end

end