File: activerecord_queue.rb

package info (click to toggle)
stompserver 0.9.9gem-5
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 424 kB
  • sloc: ruby: 2,765; sh: 162; makefile: 3
file content (105 lines) | stat: -rw-r--r-- 3,079 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
## Queue implementation using ActiveRecord
##
## all messages are stored in a single table
## they are indexed by 'stomp_id' which is the stomp 'message-id' header
## which must be unique accross all queues
##
require 'stomp_server/queue/ar_message'
require 'yaml'

module StompServer
class ActiveRecordQueue
  attr_accessor :checkpoint_interval

  def initialize(configdir, storagedir)
    # Default configuration, use SQLite for simplicity
    db_params = {
      'adapter' => 'sqlite3',
      'database' => "#{storagedir}/stompserver_development"
    }
    # Load DB configuration
    db_config = "#{configdir}/database.yml"
    puts "reading from #{db_config}"
    if File.exist? db_config
      db_params.merge! YAML::load(File.open(db_config))
    end

    puts "using #{db_params['database']} DB"

    # Setup activerecord
    ActiveRecord::Base.establish_connection(db_params)
    # Development <TODO> fix this
    ActiveRecord::Base.logger = Logger.new(STDERR)
    ActiveRecord::Base.logger.level = Logger::INFO
    # we need the connection, it can't be done earlier
    ArMessage.reset_column_information
    reload_queues
    @stompid = StompServer::StompId.new
  end

  # Add a frame to the queue
  def enqueue(queue_name, frame)
    unless @frames[queue_name]
      @frames[queue_name] = {
        :last_index => 0,
        :frames => [],
      }
    end
    affect_msgid_and_store(frame, queue_name)
    @frames[queue_name][:frames] << frame
  end

  # Get and remove a frame from the queue
  def dequeue(queue_name)
    return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty?
    frame = @frames[queue_name][:frames].shift
    remove_from_store(frame.headers['message-id'])
    return frame
  end

  # Requeue the frame previously pending
  def requeue(queue_name, frame)
    @frames[queue_name][:frames] << frame
    ArMessage.create!(:stomp_id => frame.headers['message-id'],
                      :frame => frame)
  end

  # remove a frame from the store
  def remove_from_store(message_id)
    ArMessage.find_by_stomp_id(message_id).destroy
  end

  # store a frame (assigning it a message-id)
  def affect_msgid_and_store(frame, queue_name)
    msgid = assign_id(frame, queue_name)
    ArMessage.create!(:stomp_id => msgid, :frame => frame)
  end

  def message_for?(queue_name)
    @frames[queue_name] && !@frames[queue_name][:frames].empty?
  end

  def assign_id(frame, queue_name)
    msgid = @stompid[@frames[queue_name][:last_index] += 1]
    frame.headers['message-id'] = msgid
  end

  private
  def reload_queues
    @frames = Hash.new
    ArMessage.find(:all).each { |message|
      frame = message.frame
      destination = frame.dest
      msgid = message.stomp_id
      @frames[destination] ||= Hash.new
      @frames[destination][:frames] ||= Array.new
      @frames[destination][:frames] << frame
    }
    # compute base index for each destination
    @frames.each_pair { |destination,hash|
      hash[:last_index] = hash[:frames].map{|f|
        f.headers['message-id'].match(/(\d+)\Z/)[0].to_i}.max
    }
  end
end
end