File: queue.rb

package info (click to toggle)
stompserver 0.9.9gem-5
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 424 kB
  • sloc: ruby: 2,765; sh: 162; makefile: 3
file content (148 lines) | stat: -rw-r--r-- 4,654 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

module StompServer
class Queue
  attr_accessor :checkpoint_interval
  def initialize(directory='.stompserver', delete_empty=true)
    @stompid = StompServer::StompId.new
    @delete_empty = delete_empty
    @directory = directory
    Dir.mkdir(@directory) unless File.directory?(@directory)
    if File.exist?("#{@directory}/qinfo")
      qinfo = Hash.new
      File.open("#{@directory}/qinfo", "rb") { |f| qinfo = Marshal.load(f.read)}
      @queues = qinfo[:queues]
      @frames = qinfo[:frames]
    else
      @queues = Hash.new
      @frames = Hash.new
    end

    @queues.keys.each do |dest|
      puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG
    end

    puts "Queue initialized in #{@directory}"

    # Cleanup dead queues and save the state of the queues every so often.  Alternatively we could save the queue state every X number
    # of frames that are put in the queue.  Should probably also read it after saving it to confirm integrity.
    # Removed, this badly corrupt the queue when stopping with messages
    #EventMachine::add_periodic_timer 1800, proc {@queues.keys.each {|dest| close_queue(dest)};save_queue_state }
  end

  def stop
    puts "Shutting down Queue"

    @queues.keys.each {|dest| close_queue(dest)}
    @queues.keys.each do |dest|
      puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG
    end
    save_queue_state
  end

  def save_queue_state
    now=Time.now
    @next_save ||=now
    if now >= @next_save
      puts "Saving Queue State" if $DEBUG
      qinfo = {:queues => @queues, :frames => @frames}
      # write then rename to make sure this is atomic
      File.open("#{@directory}/qinfo.new", "wb") { |f| f.write Marshal.dump(qinfo)}
      File.rename("#{@directory}/qinfo.new","#{@directory}/qinfo")
      @next_save=now+checkpoint_interval
    end
  end

  def monitor
    stats = Hash.new
    @queues.keys.each do |dest|
      stats[dest] = {'size' => @queues[dest][:size], 'enqueued' => @queues[dest][:enqueued], 'dequeued' => @queues[dest][:dequeued]}
    end
    stats
  end

  def close_queue(dest)
    if @queues[dest][:size] == 0 and @queues[dest][:frames].size == 0 and @delete_empty
      _close_queue(dest)
      @queues.delete(dest)
      @frames.delete(dest)
      puts "Queue #{dest} removed." if $DEBUG
    end
  end

  def open_queue(dest)
    @queues[dest] = Hash.new
    @frames[dest] = Hash.new
    @queues[dest][:size] = 0
    @queues[dest][:frames] = Array.new
    @queues[dest][:msgid] = 1
    @queues[dest][:enqueued] = 0
    @queues[dest][:dequeued] = 0
    @queues[dest][:exceptions] = 0
    _open_queue(dest)
    puts "Created queue #{dest}" if $DEBUG
  end

  def requeue(dest,frame)
    open_queue(dest) unless @queues.has_key?(dest)
    msgid = frame.headers['message-id']
    if frame.headers['max-exceptions'] and @frames[dest][msgid][:exceptions] >= frame.headers['max-exceptions'].to_i
      enqueue("/queue/deadletter",frame)
      return
    end
    writeframe(dest,frame,msgid)
    @queues[dest][:frames].unshift(msgid)
    @frames[dest][msgid][:exceptions] += 1
    @queues[dest][:dequeued] -= 1
    @queues[dest][:exceptions] += 1
    @queues[dest][:size] += 1
    save_queue_state
    return true
  end

  def enqueue(dest,frame)
    open_queue(dest) unless @queues.has_key?(dest)
    msgid = assign_id(frame, dest)
    writeframe(dest,frame,msgid)
    @queues[dest][:frames].push(msgid)
    @frames[dest][msgid] = Hash.new
    @frames[dest][msgid][:exceptions] =0
    @frames[dest][msgid][:client_id] = frame.headers['client-id'] if frame.headers['client-id']
    @frames[dest][msgid][:expires] = frame.headers['expires'] if frame.headers['expires']
    @queues[dest][:msgid] += 1
    @queues[dest][:enqueued] += 1
    @queues[dest][:size] += 1
    save_queue_state
    return true
  end

  def dequeue(dest)
    return false unless message_for?(dest)
    msgid = @queues[dest][:frames].shift
    frame = readframe(dest,msgid)
    @queues[dest][:size] -= 1
    @queues[dest][:dequeued] += 1
    @queues[dest].delete(msgid)
    close_queue(dest)
    save_queue_state
    return frame
  end

  def message_for?(dest)
    return (@queues.has_key?(dest) and (!@queues[dest][:frames].empty?))
  end

  def writeframe(dest,frame,msgid)
    _writeframe(dest,frame,msgid)
  end

  def readframe(dest,msgid)
    _readframe(dest,msgid)
  end

  def assign_id(frame, dest)
    msg_id = @queues[dest].nil? ? 1 : @queues[dest][:msgid] 
    frame.headers['message-id'] = @stompid[msg_id] 
  end
end
end