1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
# -*- encoding: utf-8 -*-
require 'thread'
require 'digest/sha1'
require 'timeout'
require 'forwardable'
module Stomp
# Typical Stomp client class. Uses a listener thread to receive frames
# from the server, any thread can send.
#
# Receives all happen in one thread, so consider not doing much processing
# in that thread if you have much message volume.
class Client
extend Forwardable
# Parameters hash
attr_reader :parameters
def_delegators :@connection, :login, :passcode, :port, :host, :ssl
def_delegator :@parameters, :reliable
# A new Client object can be initialized using three forms:
#
# Hash (this is the recommended Client initialization method):
#
# hash = {
# :hosts => [
# {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
# {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
# ],
# # These are the default parameters and do not need to be set
# :reliable => true, # reliable (use failover)
# :initial_reconnect_delay => 0.01, # initial delay before reconnect (secs)
# :max_reconnect_delay => 30.0, # max delay before reconnect
# :use_exponential_back_off => true, # increase delay between reconnect attpempts
# :back_off_multiplier => 2, # next delay multiplier
# :max_reconnect_attempts => 0, # retry forever, use # for maximum attempts
# :randomize => false, # do not radomize hosts hash before reconnect
# :connect_timeout => 0, # Timeout for TCP/TLS connects, use # for max seconds
# :connect_headers => {}, # user supplied CONNECT headers (req'd for Stomp 1.1+)
# :parse_timeout => 5, # IO::select wait time on socket reads
# :logger => nil, # user suplied callback logger instance
# :dmh => false, # do not support multihomed IPV4 / IPV6 hosts during failover
# :closed_check => true, # check first if closed in each protocol method
# :hbser => false, # raise on heartbeat send exception
# :stompconn => false, # Use STOMP instead of CONNECT
# :usecrlf => false, # Use CRLF command and header line ends (1.2+)
# :max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry
# :max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry
# :fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ...
# # For fast heartbeat senders. 'fast' == YMMV. If not
# # correct for your environment, expect unnecessary fail overs
# :connread_timeout => 0, # Timeout during CONNECT for read of CONNECTED/ERROR, secs
# :tcp_nodelay => true, # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
# :start_timeout => 0, # Timeout around Stomp::Client initialization
# :sslctx_newparm => nil, # Param for SSLContext.new
# :ssl_post_conn_check => true, # Further verify broker identity
# :nto_cmd_read => true, # No timeout on COMMAND read
# }
#
# e.g. c = Stomp::Client.new(hash)
#
# Positional parameters:
# login (String, default : '')
# passcode (String, default : '')
# host (String, default : 'localhost')
# port (Integer, default : 61613)
# reliable (Boolean, default : false)
#
# e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
#
# Stomp URL :
# A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
#
# stomp://host:port
# stomp://host.domain.tld:port
# stomp://login:passcode@host:port
# stomp://login:passcode@host.domain.tld:port
#
# e.g. c = Stomp::Client.new(urlstring)
#
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)
parse_hash_params(login) ||
parse_stomp_url(login) ||
parse_failover_url(login) ||
parse_positional_params(login, passcode, host, port, reliable)
@logger = @parameters[:logger] ||= Stomp::NullLogger.new
@start_timeout = @parameters[:start_timeout] || 0
@parameters[:client_main] = Thread::current
## p [ "CLINDBG", @parameters[:client_main] ]
check_arguments!()
# p [ "cldbg01", @parameters ]
begin
Timeout::timeout(@start_timeout) {
create_error_handler
create_connection(autoflush)
start_listeners()
}
rescue Timeout::Error
# p [ "cldbg02" ]
ex = Stomp::Error::StartTimeoutException.new(@start_timeout)
raise ex
end
end
def create_error_handler
client_thread = Thread.current
if client_thread.respond_to?(:report_on_exception=)
client_thread.report_on_exception=false
end
@error_listener = lambda do |error|
exception = case error.body
when /ResourceAllocationException/i
Stomp::Error::ProducerFlowControlException.new(error)
when /ProtocolException/i
Stomp::Error::ProtocolException.new(error)
else
Stomp::Error::BrokerException.new(error)
end
@receipt_listeners.delete(error.headers['receipt-id']) if error.headers['receipt-id']
client_thread.raise exception
end
end
def create_connection(autoflush)
# p [ "ccon01", @parameters ]
@connection = Connection.new(@parameters)
@connection.autoflush = autoflush
end
private :create_connection
# open is syntactic sugar for 'Client.new', see 'initialize' for usage.
def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
Client.new(login, passcode, host, port, reliable)
end
# join the listener thread for this client,
# generally used to wait for a quit signal.
def join(limit = nil)
@listener_thread.join(limit)
end
# Begin starts work in a a transaction by name.
def begin(name, headers = {})
@connection.begin(name, headers)
end
# Abort aborts work in a transaction by name.
def abort(name, headers = {})
@connection.abort(name, headers)
# replay any ack'd messages in this transaction
replay_list = @replay_messages_by_txn[name]
if replay_list
replay_list.each do |message|
find_listener(message) # find_listener also calls the listener
end
end
end
# Commit commits work in a transaction by name.
def commit(name, headers = {})
txn_id = headers[:transaction]
@replay_messages_by_txn.delete(txn_id)
@connection.commit(name, headers)
end
# Subscribe to a destination, must be passed a block
# which will be used as a callback listener.
# Accepts a transaction header ( :transaction => 'some_transaction_id' ).
def subscribe(destination, headers = {})
raise Stomp::Error::NoListenerGiven unless block_given?
headers = headers.symbolize_keys
raise Stomp::Error::DestinationRequired unless destination
# use subscription id to correlate messages to subscription. As described in
# the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
# If no subscription id is provided, generate one.
headers = headers.merge(:id => build_subscription_id(destination, headers))
if @listeners[headers[:id]]
raise Stomp::Error::DuplicateSubscription
end
@listeners[headers[:id]] = lambda {|msg| yield msg}
@connection.subscribe(destination, headers)
end
# Unsubscribe from a subscription by name.
def unsubscribe(destination, headers = {})
headers = headers.symbolize_keys
raise Stomp::Error::DestinationRequired unless destination
headers = headers.merge(:id => build_subscription_id(destination, headers))
@connection.unsubscribe(destination, headers)
@listeners[headers[:id]] = nil
end
# Acknowledge a message, used when a subscription has specified
# client acknowledgement ( connection.subscribe("/queue/a",{:ack => 'client'}).
# Accepts a transaction header ( :transaction => 'some_transaction_id' ).
def ack(message, headers = {})
txn_id = headers[:transaction]
if txn_id
# lets keep around messages ack'd in this transaction in case we rollback
replay_list = @replay_messages_by_txn[txn_id]
if replay_list.nil?
replay_list = []
@replay_messages_by_txn[txn_id] = replay_list
end
replay_list << message
end
if block_given?
headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r}))
end
context = ack_context_for(message, headers)
@connection.ack context[:message_id], context[:headers]
end
# For posterity, we alias:
alias acknowledge ack
# Stomp 1.1+ NACK.
def nack(message, headers = {})
context = ack_context_for(message, headers)
@connection.nack context[:message_id], context[:headers]
end
#
def ack_context_for(message, headers)
id = case protocol
when Stomp::SPL_12
'ack'
when Stomp::SPL_11
headers = headers.merge(:subscription => message.headers['subscription'])
'message-id'
else
'message-id'
end
{:message_id => message.headers[id], :headers => headers}
end
# Unreceive a message, sending it back to its queue or to the DLQ.
def unreceive(message, options = {})
@connection.unreceive(message, options)
end
# Publishes message to destination.
# If a block is given a receipt will be requested and passed to the
# block on receipt.
# Accepts a transaction header ( :transaction => 'some_transaction_id' ).
def publish(destination, message, headers = {})
headers = headers.symbolize_keys
raise Stomp::Error::DestinationRequired unless destination
if block_given?
headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r}))
end
@connection.publish(destination, message, headers)
end
# Return the broker's CONNECTED frame to the client. Misnamed.
def connection_frame()
@connection.connection_frame
end
# Return any RECEIPT frame received by DISCONNECT.
def disconnect_receipt()
@connection.disconnect_receipt
end
# open? tests if this client connection is open.
def open?
@connection.open?()
end
# close? tests if this client connection is closed.
def closed?()
@connection.closed?()
end
# jruby? tests if the connection has detcted a JRuby environment
def jruby?()
@connection.jruby
end
# close frees resources in use by this client. The listener thread is
# terminated, and disconnect on the connection is called.
def close(headers={})
@listener_thread.exit
@connection.disconnect(headers)
end
# running checks if the thread was created and is not dead.
def running()
@listener_thread && !!@listener_thread.status
end
# set_logger identifies a new callback logger.
def set_logger(logger)
@logger = logger
@connection.set_logger(logger)
end
# protocol returns the current client's protocol level.
def protocol()
@connection.protocol()
end
# valid_utf8? validates any given string for UTF8 compliance.
def valid_utf8?(s)
@connection.valid_utf8?(s)
end
# sha1 returns a SHA1 sum of a given string.
def sha1(data)
@connection.sha1(data)
end
# uuid returns a type 4 UUID.
def uuid()
@connection.uuid()
end
# hbsend_interval returns the connection's heartbeat send interval.
def hbsend_interval()
@connection.hbsend_interval()
end
# hbrecv_interval returns the connection's heartbeat receive interval.
def hbrecv_interval()
@connection.hbrecv_interval()
end
# hbsend_count returns the current connection's heartbeat send count.
def hbsend_count()
@connection.hbsend_count()
end
# hbrecv_count returns the current connection's heartbeat receive count.
def hbrecv_count()
@connection.hbrecv_count()
end
# Poll for asynchronous messages issued by broker.
# Return nil of no message available, else the message
def poll()
@connection.poll()
end
# autoflush= sets the current connection's autoflush setting.
def autoflush=(af)
@connection.autoflush = af
end
# autoflush returns the current connection's autoflush setting.
def autoflush()
@connection.autoflush()
end
end # Class
end # Module
|