File: throughput_measurement.rb

package info (click to toggle)
ruby-ffi-rzmq 2.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 380 kB
  • ctags: 204
  • sloc: ruby: 2,945; makefile: 2
file content (166 lines) | stat: -rw-r--r-- 3,620 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
require File.join(File.dirname(__FILE__), '..', 'lib', 'ffi-rzmq')
require 'thread'

# Within a single process, we start up five threads. Main thread has a PUB (publisher)
# socket and the secondary threads have SUB (subscription) sockets. We measure the
# *throughput* between these sockets. A high-water mark (HWM) is *not* set, so the
# publisher queue is free to grow to the size of memory without dropping packets.
#
# This example also illustrates how a single context can be shared amongst several
# threads. Sharing a single context also allows a user to specify the "inproc"
# transport in addition to "tcp" and "ipc".
#
#  % ruby throughput_measurement.rb tcp://127.0.0.1:5555 1024 1_000_000
#
#  % ruby throughput_measurement.rb inproc://lm_sock 1024 1_000_000
#

if ARGV.length < 3
  puts "usage: ruby throughput_measurement.rb <connect-to> <message-size> <roundtrip-count>"
  exit
end

link = ARGV[0]
message_size = ARGV[1].to_i
count = ARGV[2].to_i

def assert(rc)
  raise "Last API call failed at #{caller(1)}" unless rc >= 0
end

begin
  master_context = ZMQ::Context.new
rescue ContextError => e
  STDERR.puts "Failed to allocate context or socket!"
  raise
end


class Receiver
  def initialize context, link, size, count, stats
    @context = context
    @link = link
    @size = size
    @count = count
    @stats = stats

    begin
      @socket = @context.socket(ZMQ::SUB)
    rescue ContextError => e
      STDERR.puts "Failed to allocate SUB socket!"
      raise
    end

    assert(@socket.setsockopt(ZMQ::LINGER, 100))
    assert(@socket.setsockopt(ZMQ::SUBSCRIBE, ""))

    assert(@socket.connect(@link))
  end

  def run
    msg = ZMQ::Message.new
    assert(@socket.recvmsg(msg))

    elapsed = elapsed_microseconds do
      (@count - 1).times do
        assert(@socket.recvmsg(msg))
      end
    end

    @stats.record_elapsed(elapsed)
    assert(@socket.close)
  end

  def elapsed_microseconds(&blk)
    start = Time.now
    yield
    ((Time.now - start) * 1_000_000)
  end
end

class Transmitter
  def initialize context, link, size, count
    @context = context
    @link = link
    @size = size
    @count = count

    begin
      @socket = @context.socket(ZMQ::PUB)
    rescue ContextError => e
      STDERR.puts "Failed to allocate PUB socket!"
      raise
    end

    assert(@socket.setsockopt(ZMQ::LINGER, 100))
    assert(@socket.bind(@link))
  end

  def run
    sleep 1
    contents = "#{'0' * @size}"

    i = 0
    while i < @count
      msg = ZMQ::Message.new(contents)
      assert(@socket.sendmsg(msg))
      i += 1
    end

  end
  
  def close
    assert(@socket.close)
  end
end

class Stats
  def initialize size, count
    @size = size
    @count = count
    
    @mutex = Mutex.new
    @elapsed = []
  end

  def record_elapsed(elapsed)
    @mutex.synchronize do
      @elapsed << elapsed
    end
  end

  def output
    @elapsed.each do |elapsed|
      throughput = @count * 1000000 / elapsed
      megabits = throughput * @size * 8 / 1000000

      puts "message size: %i [B]" % @size
      puts "message count: %i" % @count
      puts "mean throughput: %i [msg/s]" % throughput
      puts "mean throughput: %.3f [Mb/s]" % megabits
      puts
    end
  end
end

threads = []
stats = Stats.new message_size, count
transmitter = Transmitter.new(master_context, link, message_size, count)

threads << Thread.new do
  transmitter.run
end

1.times do
  threads << Thread.new do
    receiver = Receiver.new(master_context, link, message_size, count, stats)
    receiver.run
  end
end


threads.each {|t| t.join}
transmitter.close
stats.output

master_context.terminate