1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
## Queue implementation using ActiveRecord
##
## all messages are stored in a single table
## they are indexed by 'stomp_id' which is the stomp 'message-id' header
## which must be unique accross all queues
##
require 'stomp_server/queue/ar_message'
require 'yaml'
module StompServer
class ActiveRecordQueue
attr_accessor :checkpoint_interval
def initialize(configdir, storagedir)
# Default configuration, use SQLite for simplicity
db_params = {
'adapter' => 'sqlite3',
'database' => "#{storagedir}/stompserver_development"
}
# Load DB configuration
db_config = "#{configdir}/database.yml"
puts "reading from #{db_config}"
if File.exist? db_config
db_params.merge! YAML::load(File.open(db_config))
end
puts "using #{db_params['database']} DB"
# Setup activerecord
ActiveRecord::Base.establish_connection(db_params)
# Development <TODO> fix this
ActiveRecord::Base.logger = Logger.new(STDERR)
ActiveRecord::Base.logger.level = Logger::INFO
# we need the connection, it can't be done earlier
ArMessage.reset_column_information
reload_queues
@stompid = StompServer::StompId.new
end
# Add a frame to the queue
def enqueue(queue_name, frame)
unless @frames[queue_name]
@frames[queue_name] = {
:last_index => 0,
:frames => [],
}
end
affect_msgid_and_store(frame, queue_name)
@frames[queue_name][:frames] << frame
end
# Get and remove a frame from the queue
def dequeue(queue_name)
return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty?
frame = @frames[queue_name][:frames].shift
remove_from_store(frame.headers['message-id'])
return frame
end
# Requeue the frame previously pending
def requeue(queue_name, frame)
@frames[queue_name][:frames] << frame
ArMessage.create!(:stomp_id => frame.headers['message-id'],
:frame => frame)
end
# remove a frame from the store
def remove_from_store(message_id)
ArMessage.find_by_stomp_id(message_id).destroy
end
# store a frame (assigning it a message-id)
def affect_msgid_and_store(frame, queue_name)
msgid = assign_id(frame, queue_name)
ArMessage.create!(:stomp_id => msgid, :frame => frame)
end
def message_for?(queue_name)
@frames[queue_name] && !@frames[queue_name][:frames].empty?
end
def assign_id(frame, queue_name)
msgid = @stompid[@frames[queue_name][:last_index] += 1]
frame.headers['message-id'] = msgid
end
private
def reload_queues
@frames = Hash.new
ArMessage.find(:all).each { |message|
frame = message.frame
destination = frame.dest
msgid = message.stomp_id
@frames[destination] ||= Hash.new
@frames[destination][:frames] ||= Array.new
@frames[destination][:frames] << frame
}
# compute base index for each destination
@frames.each_pair { |destination,hash|
hash[:last_index] = hash[:frames].map{|f|
f.headers['message-id'].match(/(\d+)\Z/)[0].to_i}.max
}
end
end
end
|