1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
#!/usr/bin/env ruby
#$: << File.expand_path('../../lib', __FILE__)
require 'benchmark'
require 'optparse'
require 'thread'
require 'rspec/expectations'
require 'concurrent/actor'
class ActorStressTester
include ::RSpec::Matchers
TESTS_PER_RUN = 5
THREADS_PER_TEST = 10
LOOPS_PER_THREAD = 25
class Ping < Concurrent::Actor::Context
def initialize(queue)
@queue = queue
end
def on_message(message)
case message
when :child
Concurrent::Actor::Utils::AdHoc.spawn(:pong, @queue) do |queue|
-> m { queue << m }
end
else
@queue << message
message
end
end
end
def initialize(opts = {})
@tests = opts.fetch(:tests, TESTS_PER_RUN)
@threads = opts.fetch(:threads, THREADS_PER_TEST)
@loops = opts.fetch(:loops, LOOPS_PER_THREAD)
end
def run
plural = ->(number){ number == 1 ? '' : 's' }
puts "Running #{@tests} test#{plural.call(@tests)} " +
"with #{@threads} thread#{plural.call(@threads)} each " +
"and #{@loops} loop#{plural.call(@loops)} per thread..."
Benchmark.bmbm do |bm|
@tests.times do
bm.report do
test(@threads, @loops)
end
end
end
end
def test(threads, loops)
(1..threads).collect do
Thread.new do
loops.times do
queue = Queue.new
actor = Ping.spawn(:ping, queue)
core = Concurrent::Actor.root.send(:core)
children = core.instance_variable_get(:@children)
expect(children).to include(actor)
actor << 'a' << 1
expect(queue.pop).to eq 'a'
expect(actor.ask(2).value).to eq 2
expect(actor.parent).to eq Concurrent::Actor.root
expect(Concurrent::Actor.root.path).to eq '/'
expect(actor.path).to eq '/ping'
child = actor.ask(:child).value
expect(child.path).to eq '/ping/pong'
queue.clear
child.ask(3)
expect(queue.pop).to eq 3
actor << :terminate!
#expect(actor.ask(:blow_up).wait).to be_rejected
expect(actor.ask(:blow_up).wait).to be_failed
terminate_actors(actor, child)
end
end
end.each(&:join)
end
def terminate_actors(*actors)
actors.each do |actor|
unless actor.ask!(:terminated?)
actor.ask!(:terminate!)
end
end
end
end
# def trace!
# set_trace_func proc { |event, file, line, id, binding, classname|
# # thread = eval('Thread.current', binding).object_id.to_s(16)
# printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line
# }
# yield
# ensure
# set_trace_func nil
# end
if $0 == __FILE__
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: #{File.basename(__FILE__)} [options]"
opts.on("--tests=TESTS", "Number of tests per run") do |value|
options[:tests] = value.to_i
end
opts.on("--threads=THREADS", "Number of threads per test") do |value|
options[:threads] = value.to_i
end
opts.on("--loops=LOOPS", "Number of loops per thread") do |value|
options[:loops] = value.to_i
end
opts.on("-h", "--help", "Prints this help") do
puts opts
exit
end
end.parse!
ActorStressTester.new(options).run
end
|