1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
module StompServer
class MemoryQueue
attr_accessor :checkpoint_interval
def initialize
@frame_index =0
@stompid = StompServer::StompId.new
@stats = Hash.new
@messages = Hash.new { Array.new }
puts "MemoryQueue initialized"
end
def stop
end
def monitor
stats = Hash.new
@messages.keys.each do |dest|
stats[dest] = {'size' => @messages[dest].size, 'enqueued' => @stats[dest][:enqueued], 'dequeued' => @stats[dest][:dequeued]}
end
stats
end
def dequeue(dest)
if frame = @messages[dest].shift
@stats[dest][:dequeued] += 1
return frame
else
return false
end
end
def enqueue(dest,frame)
@frame_index += 1
if @stats[dest]
@stats[dest][:enqueued] += 1
else
@stats[dest] = Hash.new
@stats[dest][:enqueued] = 1
@stats[dest][:dequeued] = 0
end
assign_id(frame, dest)
requeue(dest, frame)
end
def requeue(dest,frame)
@messages[dest] += [frame]
end
def message_for?(dest)
!@messages[dest].empty?
end
def assign_id(frame, dest)
frame.headers['message-id'] = @stompid[@frame_index]
end
end
end
|