File: stomp.rb

package info (click to toggle)
stompserver 0.9.9gem-5
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 424 kB
  • sloc: ruby: 2,765; sh: 162; makefile: 3
file content (186 lines) | stat: -rw-r--r-- 4,619 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186

module StompServer
module StompServer::Protocols
VALID_COMMANDS = [:connect, :send, :subscribe, :unsubscribe, :begin, :commit, :abort, :ack, :disconnect]

class Stomp < EventMachine::Connection

  def initialize *args
    super
  end

  def post_init
    @sfr = StompServer::StompFrameRecognizer.new
    @transactions = {}
    @connected = false
  end

  def receive_data(data)
    stomp_receive_data(data)
  end
 
  def stomp_receive_data(data)
    begin
      puts "receive_data: #{data.inspect}" if $DEBUG
      @sfr << data
      process_frames
    rescue Exception => e
      puts "err: #{e} #{e.backtrace.join("\n")}"
      send_error(e.to_s)
      close_connection_after_writing
    end
  end 
 
  def stomp_receive_frame(frame)
    begin
      puts "receive_frame: #{frame.inspect}" if $DEBUG
      process_frame(frame)
    rescue Exception => e
      puts "err: #{e} #{e.backtrace.join("\n")}"
      send_error(e.to_s)
      close_connection_after_writing
    end
  end
 
  def process_frames
    frame = nil
    process_frame(frame) while frame = @sfr.frames.shift
  end
  
  def process_frame(frame)
    cmd = frame.command.downcase.to_sym
    raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd)
    raise "Not connected" if !@connected && cmd != :connect

    # I really like this code, but my needs are a little trickier
    # 

    if trans = frame.headers['transaction']
      handle_transaction(frame, trans, cmd)
    else
      cmd = :sendmsg if cmd == :send
      send(cmd, frame) 
    end
    
    send_receipt(frame.headers['receipt']) if frame.headers['receipt']
  end
  
  def handle_transaction(frame, trans, cmd)
    if [:begin, :commit, :abort].include?(cmd)
      send(cmd, frame, trans)
    else
      raise "transaction does not exist" unless @transactions.has_key?(trans)
      @transactions[trans] << frame
    end    
  end
  
  def connect(frame)
    if @@auth_required
      unless frame.headers['login'] and frame.headers['passcode'] and  @@stompauth.authorized[frame.headers['login']] == frame.headers['passcode']
        raise "Invalid Login"
      end
    end
    puts "Connecting" if $DEBUG
    response = StompServer::StompFrame.new("CONNECTED", {'session' => 'wow'})
    stomp_send_data(response)
    @connected = true
  end
  
  def sendmsg(frame)
    # set message id
    if frame.dest.match(%r|^/queue|)
      @@queue_manager.sendmsg(frame)
    else
      frame.headers['message-id'] = "msg-#stompcma-#{@@topic_manager.next_index}"
      @@topic_manager.sendmsg(frame)
    end
  end
  
  def subscribe(frame)
    use_ack = false
    use_ack = true  if frame.headers['ack'] == 'client'
    if frame.dest =~ %r|^/queue|
      @@queue_manager.subscribe(frame.dest, self,use_ack)
    else
      @@topic_manager.subscribe(frame.dest, self)
    end
  end
  
  def unsubscribe(frame)
    if frame.dest =~ %r|^/queue|
      @@queue_manager.unsubscribe(frame.dest,self)
    else
      @@topic_manager.unsubscribe(frame.dest,self)
    end
  end
  
  def begin(frame, trans=nil)
    raise "Missing transaction" unless trans
    raise "transaction exists" if @transactions.has_key?(trans)
    @transactions[trans] = []
  end
  
  def commit(frame, trans=nil)
    raise "Missing transaction" unless trans
    raise "transaction does not exist" unless @transactions.has_key?(trans)
    
    (@transactions[trans]).each do |frame|
      frame.headers.delete('transaction')
      process_frame(frame)
    end
    @transactions.delete(trans)
  end
  
  def abort(frame, trans=nil)
    raise "Missing transaction" unless trans
    raise "transaction does not exist" unless @transactions.has_key?(trans)
    @transactions.delete(trans)
  end
  
  def ack(frame)
    @@queue_manager.ack(self, frame)
  end
  
  def disconnect(frame)
    puts "Polite disconnect" if $DEBUG
    close_connection_after_writing
  end

  def unbind
    p "Unbind called" if $DEBUG
    @connected = false
    @@queue_manager.disconnect(self)
    @@topic_manager.disconnect(self)
  end
 
  def connected?
    @connected
  end
 
  def send_message(msg)
    msg.command = "MESSAGE"
    stomp_send_data(msg)
  end
    
  def send_receipt(id)
    send_frame("RECEIPT", { 'receipt-id' => id})
  end
  
  def send_error(msg)
    send_frame("ERROR",{'message' => 'See below'},msg)
  end
 
  def stomp_send_data(frame)
    send_data(frame.to_s)
    puts "Sending frame #{frame.to_s}" if $DEBUG
  end

  def send_frame(command, headers={}, body='')
    headers['content-length'] = body.size.to_s
    response = StompServer::StompFrame.new(command, headers, body)
    stomp_send_data(response)
  end
end

end
end