File: actor_stress_test.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; ansic: 288; makefile: 9; sh: 6
file content (140 lines) | stat: -rwxr-xr-x 3,305 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env ruby

#$: << File.expand_path('../../lib', __FILE__)

require 'benchmark'
require 'optparse'
require 'thread'
require 'rspec/expectations'

require 'concurrent/actor'

class ActorStressTester
  include ::RSpec::Matchers

  TESTS_PER_RUN = 5
  THREADS_PER_TEST = 10
  LOOPS_PER_THREAD = 25

  class Ping < Concurrent::Actor::Context
    def initialize(queue)
      @queue = queue
    end

    def on_message(message)
      case message
      when :child
        Concurrent::Actor::Utils::AdHoc.spawn(:pong, @queue) do |queue|
          -> m { queue << m }
        end
      else
        @queue << message
        message
      end
    end
  end

  def initialize(opts = {})
    @tests = opts.fetch(:tests, TESTS_PER_RUN)
    @threads = opts.fetch(:threads, THREADS_PER_TEST)
    @loops = opts.fetch(:loops, LOOPS_PER_THREAD)
  end

  def run
    plural = ->(number){ number == 1 ? '' : 's' }

    puts "Running #{@tests} test#{plural.call(@tests)} " +
      "with #{@threads} thread#{plural.call(@threads)} each " +
      "and #{@loops} loop#{plural.call(@loops)} per thread..."

    Benchmark.bmbm do |bm|
      @tests.times do
        bm.report do
          test(@threads, @loops)
        end
      end
    end
  end

  def test(threads, loops)
    (1..threads).collect do
      Thread.new do
        loops.times do

          queue = Queue.new
          actor = Ping.spawn(:ping, queue)

          core = Concurrent::Actor.root.send(:core)
          children = core.instance_variable_get(:@children)
          expect(children).to include(actor)

          actor << 'a' << 1
          expect(queue.pop).to eq 'a'
          expect(actor.ask(2).value).to eq 2

          expect(actor.parent).to eq Concurrent::Actor.root
          expect(Concurrent::Actor.root.path).to eq '/'
          expect(actor.path).to eq '/ping'

          child = actor.ask(:child).value
          expect(child.path).to eq '/ping/pong'

          queue.clear
          child.ask(3)
          expect(queue.pop).to eq 3

          actor << :terminate!
          #expect(actor.ask(:blow_up).wait).to be_rejected
          expect(actor.ask(:blow_up).wait).to be_failed
          terminate_actors(actor, child)
        end
      end
    end.each(&:join)
  end

  def terminate_actors(*actors)
    actors.each do |actor|
      unless actor.ask!(:terminated?)
        actor.ask!(:terminate!)
      end
    end
  end
end

# def trace!
#   set_trace_func proc { |event, file, line, id, binding, classname|
#     # thread = eval('Thread.current', binding).object_id.to_s(16)
#     printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line
#   }
#   yield
# ensure
#   set_trace_func nil
# end

if $0 == __FILE__

  options = {}

  OptionParser.new do |opts|
    opts.banner = "Usage: #{File.basename(__FILE__)} [options]"

    opts.on("--tests=TESTS", "Number of tests per run") do |value|
      options[:tests] = value.to_i
    end

    opts.on("--threads=THREADS", "Number of threads per test") do |value|
      options[:threads] = value.to_i
    end

    opts.on("--loops=LOOPS", "Number of loops per thread") do |value|
      options[:loops] = value.to_i
    end

    opts.on("-h", "--help", "Prints this help") do
      puts opts
      exit
    end
  end.parse!

  ActorStressTester.new(options).run
end