1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
require File.join(File.dirname(__FILE__), '..', 'lib', 'ffi-rzmq')
require 'thread'
# Within a single process, we start up five threads. Main thread has a PUB (publisher)
# socket and the secondary threads have SUB (subscription) sockets. We measure the
# *throughput* between these sockets. A high-water mark (HWM) is *not* set, so the
# publisher queue is free to grow to the size of memory without dropping packets.
#
# This example also illustrates how a single context can be shared amongst several
# threads. Sharing a single context also allows a user to specify the "inproc"
# transport in addition to "tcp" and "ipc".
#
# % ruby throughput_measurement.rb tcp://127.0.0.1:5555 1024 1_000_000
#
# % ruby throughput_measurement.rb inproc://lm_sock 1024 1_000_000
#
if ARGV.length < 3
puts "usage: ruby throughput_measurement.rb <connect-to> <message-size> <roundtrip-count>"
exit
end
link = ARGV[0]
message_size = ARGV[1].to_i
count = ARGV[2].to_i
def assert(rc)
raise "Last API call failed at #{caller(1)}" unless rc >= 0
end
begin
master_context = ZMQ::Context.new
rescue ContextError => e
STDERR.puts "Failed to allocate context or socket!"
raise
end
class Receiver
def initialize context, link, size, count, stats
@context = context
@link = link
@size = size
@count = count
@stats = stats
begin
@socket = @context.socket(ZMQ::SUB)
rescue ContextError => e
STDERR.puts "Failed to allocate SUB socket!"
raise
end
assert(@socket.setsockopt(ZMQ::LINGER, 100))
assert(@socket.setsockopt(ZMQ::SUBSCRIBE, ""))
assert(@socket.connect(@link))
end
def run
msg = ZMQ::Message.new
assert(@socket.recvmsg(msg))
elapsed = elapsed_microseconds do
(@count - 1).times do
assert(@socket.recvmsg(msg))
end
end
@stats.record_elapsed(elapsed)
assert(@socket.close)
end
def elapsed_microseconds(&blk)
start = Time.now
yield
((Time.now - start) * 1_000_000)
end
end
class Transmitter
def initialize context, link, size, count
@context = context
@link = link
@size = size
@count = count
begin
@socket = @context.socket(ZMQ::PUB)
rescue ContextError => e
STDERR.puts "Failed to allocate PUB socket!"
raise
end
assert(@socket.setsockopt(ZMQ::LINGER, 100))
assert(@socket.bind(@link))
end
def run
sleep 1
contents = "#{'0' * @size}"
i = 0
while i < @count
msg = ZMQ::Message.new(contents)
assert(@socket.sendmsg(msg))
i += 1
end
end
def close
assert(@socket.close)
end
end
class Stats
def initialize size, count
@size = size
@count = count
@mutex = Mutex.new
@elapsed = []
end
def record_elapsed(elapsed)
@mutex.synchronize do
@elapsed << elapsed
end
end
def output
@elapsed.each do |elapsed|
throughput = @count * 1000000 / elapsed
megabits = throughput * @size * 8 / 1000000
puts "message size: %i [B]" % @size
puts "message count: %i" % @count
puts "mean throughput: %i [msg/s]" % throughput
puts "mean throughput: %.3f [Mb/s]" % megabits
puts
end
end
end
threads = []
stats = Stats.new message_size, count
transmitter = Transmitter.new(master_context, link, message_size, count)
threads << Thread.new do
transmitter.run
end
1.times do
threads << Thread.new do
receiver = Receiver.new(master_context, link, message_size, count, stats)
receiver.run
end
end
threads.each {|t| t.join}
transmitter.close
stats.output
master_context.terminate
|