1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
# -*- encoding: utf-8 -*-
require 'stomp'
require 'tmpdir'
# Focus on this gem's capabilities.
require 'memory_profiler'
# require 'memory-profiler'
if Kernel.respond_to?(:require_relative)
require_relative("stomp_adhoc_common")
require_relative("payload_generator")
else
$LOAD_PATH << File.dirname(__FILE__)
require "stomp_adhoc_common"
require("payload_generator")
end
include Stomp11Common
# Next of testing around issue #121.
# Different memory profiler gem.
# Use Stomp#connection to merely send
class Issue121Examp01Conn
attr_reader :connection, :session
# Initialize.
def initialize(topic = false)
@connection, @session, @topic = nil, nil, topic
@nmsgs = nmsgs()
@queue = make_destination("issue121/test_01_conn")
@id = "issue121_01_conn"
@getreceipt = conn_receipt()
#
@cmin, @cmax = 1292, 67782 # From the issue discussion
PayloadGenerator::initialize(min= @cmin, max= @cmax)
@ffmts = "%16.6f"
#
mps = 5.6 # see issue discussion
@to, @nmts, @nts, @umps = 0.0, Time.now.to_f, @nmsgs, mps
@tslt = 1.0 / @umps
end # initialize
# Startup
def start
#
connect_hdrs = {"accept-version" => "1.1,1.2",
"host" => virt_host,
}
#
connect_hash = { :hosts => [
{:login => login(), :passcode => passcode(), :host => host(), :port => port()},
],
:connect_headers => connect_hdrs,
}
#
@connection = Stomp::Connection.new(connect_hash)
puts "START: Connection Connect complete"
raise "START: Connection failed!!" unless @connection.open?
raise "START: Unexpected protocol level!!" if @connection.protocol == Stomp::SPL_10
cf = @connection.connection_frame
puts "START: Connection frame\n#{cf}"
raise "START: Connect error!!: #{cf.body}" if @connection.connection_frame.command == Stomp::CMD_ERROR
@session = @connection.connection_frame.headers['session']
puts "START: Queue/Topic Name: #{@queue}"
puts "START: Session: #{@session}"
puts "START: NMSGS: #{@nmsgs}"
puts "START: Receipt: #{@getreceipt}"
puts "START: Wanted Messages Per Second: #{@umps}"
puts "START: Sleep Time: #{@tslt}"
$stdout.flush
end # start
#
def shutdown
@connection.disconnect()
#
te = Time.now.to_f
et = te - @nmts
avgsz = @to / @nts
mps = @nts.to_f / et
#
fet = sprintf(@ffmts, et)
favgsz = sprintf(@ffmts, avgsz)
fmps = sprintf(@ffmts, mps)
#
sep = "=" * 72
puts sep
puts "\tNumber of payloads generated: #{@nts}"
puts "\tMin Length: #{@cmin}, Max Length: #{@cmax}"
puts "\tAVG_SIZE: #{favgsz}, ELAPS_SEC: #{fet}(seconds)"
puts "\tNMSGS_PER_SEC: #{fmps}"
puts sep
#
puts "SHUT: Shutdown complete"
$stdout.flush
end # shutdown
#
def msg_handler
m = "Message: "
nm = 0
@nmsgs.times do |n|
nm += 1
puts "MSH: NEXT MESSAGE NUMBER: #{nm}"; $stdout.flush
mo = PayloadGenerator::payload()
@to += mo.bytesize()
if @getreceipt
uuid = @connection.uuid()
puts "MSH: Receipt id wanted is #{uuid}"
hs = {:session => @session, :receipt => uuid}
else
hs = {:session => @session}
end
# Move data out the door
@connection.publish(@queue, mo, hs)
if @getreceipt
r = @connection.receive()
puts "MSH: received receipt, id is #{r.headers['receipt-id']}"
raise if uuid != r.headers['receipt-id']
end
#
puts "MSH: start user sleep"
sleep @tslt # see issue discussion
puts "MSH: end user sleep"
$stdout.flush
end # @nmsgs.times do
puts "MSH: end of msg_handler"
$stdout.flush
end # msg_handler
end # class
#
1.times do |i|
rpt = MemoryProfiler.report do
e = Issue121Examp01Conn.new
e.start
e.msg_handler
# No subscribes here, just msg_handler
# See discussion in issue #121
e.shutdown
end
n = Time.now
nf = "memory_profiler-ng"
nf << n.strftime("%Y%m%dT%H%M%S.%N%Z")
where_name = File::join(Dir::tmpdir(), nf)
rpt.pretty_print(to_file: where_name )
# sleep 1
end
#
|