1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
module StompServer
class Queue
attr_accessor :checkpoint_interval
def initialize(directory='.stompserver', delete_empty=true)
@stompid = StompServer::StompId.new
@delete_empty = delete_empty
@directory = directory
Dir.mkdir(@directory) unless File.directory?(@directory)
if File.exist?("#{@directory}/qinfo")
qinfo = Hash.new
File.open("#{@directory}/qinfo", "rb") { |f| qinfo = Marshal.load(f.read)}
@queues = qinfo[:queues]
@frames = qinfo[:frames]
else
@queues = Hash.new
@frames = Hash.new
end
@queues.keys.each do |dest|
puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG
end
puts "Queue initialized in #{@directory}"
# Cleanup dead queues and save the state of the queues every so often. Alternatively we could save the queue state every X number
# of frames that are put in the queue. Should probably also read it after saving it to confirm integrity.
# Removed, this badly corrupt the queue when stopping with messages
#EventMachine::add_periodic_timer 1800, proc {@queues.keys.each {|dest| close_queue(dest)};save_queue_state }
end
def stop
puts "Shutting down Queue"
@queues.keys.each {|dest| close_queue(dest)}
@queues.keys.each do |dest|
puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG
end
save_queue_state
end
def save_queue_state
now=Time.now
@next_save ||=now
if now >= @next_save
puts "Saving Queue State" if $DEBUG
qinfo = {:queues => @queues, :frames => @frames}
# write then rename to make sure this is atomic
File.open("#{@directory}/qinfo.new", "wb") { |f| f.write Marshal.dump(qinfo)}
File.rename("#{@directory}/qinfo.new","#{@directory}/qinfo")
@next_save=now+checkpoint_interval
end
end
def monitor
stats = Hash.new
@queues.keys.each do |dest|
stats[dest] = {'size' => @queues[dest][:size], 'enqueued' => @queues[dest][:enqueued], 'dequeued' => @queues[dest][:dequeued]}
end
stats
end
def close_queue(dest)
if @queues[dest][:size] == 0 and @queues[dest][:frames].size == 0 and @delete_empty
_close_queue(dest)
@queues.delete(dest)
@frames.delete(dest)
puts "Queue #{dest} removed." if $DEBUG
end
end
def open_queue(dest)
@queues[dest] = Hash.new
@frames[dest] = Hash.new
@queues[dest][:size] = 0
@queues[dest][:frames] = Array.new
@queues[dest][:msgid] = 1
@queues[dest][:enqueued] = 0
@queues[dest][:dequeued] = 0
@queues[dest][:exceptions] = 0
_open_queue(dest)
puts "Created queue #{dest}" if $DEBUG
end
def requeue(dest,frame)
open_queue(dest) unless @queues.has_key?(dest)
msgid = frame.headers['message-id']
if frame.headers['max-exceptions'] and @frames[dest][msgid][:exceptions] >= frame.headers['max-exceptions'].to_i
enqueue("/queue/deadletter",frame)
return
end
writeframe(dest,frame,msgid)
@queues[dest][:frames].unshift(msgid)
@frames[dest][msgid][:exceptions] += 1
@queues[dest][:dequeued] -= 1
@queues[dest][:exceptions] += 1
@queues[dest][:size] += 1
save_queue_state
return true
end
def enqueue(dest,frame)
open_queue(dest) unless @queues.has_key?(dest)
msgid = assign_id(frame, dest)
writeframe(dest,frame,msgid)
@queues[dest][:frames].push(msgid)
@frames[dest][msgid] = Hash.new
@frames[dest][msgid][:exceptions] =0
@frames[dest][msgid][:client_id] = frame.headers['client-id'] if frame.headers['client-id']
@frames[dest][msgid][:expires] = frame.headers['expires'] if frame.headers['expires']
@queues[dest][:msgid] += 1
@queues[dest][:enqueued] += 1
@queues[dest][:size] += 1
save_queue_state
return true
end
def dequeue(dest)
return false unless message_for?(dest)
msgid = @queues[dest][:frames].shift
frame = readframe(dest,msgid)
@queues[dest][:size] -= 1
@queues[dest][:dequeued] += 1
@queues[dest].delete(msgid)
close_queue(dest)
save_queue_state
return frame
end
def message_for?(dest)
return (@queues.has_key?(dest) and (!@queues[dest][:frames].empty?))
end
def writeframe(dest,frame,msgid)
_writeframe(dest,frame,msgid)
end
def readframe(dest,msgid)
_readframe(dest,msgid)
end
def assign_id(frame, dest)
msg_id = @queues[dest].nil? ? 1 : @queues[dest][:msgid]
frame.headers['message-id'] = @stompid[msg_id]
end
end
end
|