File: worker_pool.rb

package info (click to toggle)
ruby-bootsnap 1.22.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 520 kB
  • sloc: ruby: 3,637; ansic: 844; sh: 14; makefile: 9
file content (208 lines) | stat: -rw-r--r-- 5,447 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# frozen_string_literal: true

require "etc"
require "rbconfig"
require "io/wait" unless IO.method_defined?(:wait_readable)

module Bootsnap
  class CLI
    class WorkerPool
      class << self
        def create(size:, jobs:)
          size ||= default_size
          if size > 0 && Process.respond_to?(:fork)
            new(size: size, jobs: jobs)
          else
            Inline.new(jobs: jobs)
          end
        end

        def default_size
          nprocessors = Etc.nprocessors
          size = [nprocessors, cpu_quota&.to_i || nprocessors].min
          case size
          when 0, 1
            0
          else
            if fork_defunct?
              $stderr.puts "warning: faulty fork(2) detected, probably in cross platform docker builds. " \
                           "Disabling parallel compilation."
              0
            else
              size
            end
          end
        end

        def cpu_quota
          if RbConfig::CONFIG["target_os"].include?("linux")
            if File.exist?("/sys/fs/cgroup/cpu.max")
              # cgroups v2: https://docs.kernel.org/admin-guide/cgroup-v2.html#cpu-interface-files
              cpu_max = File.read("/sys/fs/cgroup/cpu.max")
              return nil if cpu_max.start_with?("max ") # no limit

              max, period = cpu_max.split.map(&:to_f)
              max / period
            elsif File.exist?("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us")
              # cgroups v1: https://kernel.googlesource.com/pub/scm/linux/kernel/git/glommer/memcg/+/cpu_stat/Documentation/cgroups/cpu.txt
              max = File.read("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").to_i
              # If the cpu.cfs_quota_us is -1, cgroup does not adhere to any CPU time restrictions
              # https://docs.kernel.org/scheduler/sched-bwc.html#management
              return nil if max <= 0

              period = File.read("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us").to_f
              max / period
            end
          end
        end

        def fork_defunct?
          return true unless ::Process.respond_to?(:fork)

          # Ref: https://github.com/rails/bootsnap/issues/495
          # The second forked process will hang on some QEMU environments
          r, w = IO.pipe
          pids = 2.times.map do
            ::Process.fork do
              exit!(true)
            end
          end
          w.close
          r.wait_readable(1) # Wait at most 1s

          defunct = false

          pids.each do |pid|
            _pid, status = ::Process.wait2(pid, ::Process::WNOHANG)
            if status.nil? # Didn't exit in 1s
              defunct = true
              Process.kill(:KILL, pid)
              ::Process.wait2(pid)
            end
          end

          defunct
        end
      end

      class Inline
        def initialize(jobs: {})
          @jobs = jobs
        end

        def push(job, *args)
          @jobs.fetch(job).call(*args)
          nil
        end

        def spawn
          # noop
        end

        def shutdown
          # noop
        end
      end

      class Worker
        attr_reader :to_io, :pid

        def initialize(jobs)
          @jobs = jobs
          @pipe_out, @to_io = IO.pipe(binmode: true)
          # Set the writer encoding to binary since IO.pipe only sets it for the reader.
          # https://github.com/rails/rails/issues/16514#issuecomment-52313290
          @to_io.set_encoding(Encoding::BINARY)

          @pid = nil
        end

        def write(message, block: true)
          payload = Marshal.dump(message)
          if block
            to_io.write(payload)
            true
          else
            to_io.write_nonblock(payload, exception: false) != :wait_writable
          end
        end

        def close
          to_io.close
        end

        def work_loop
          loop do
            job, *args = Marshal.load(@pipe_out)
            return if job == :exit

            @jobs.fetch(job).call(*args)
          end
        rescue IOError
          nil
        end

        def spawn
          @pid = Process.fork do
            to_io.close
            work_loop
            exit!(0)
          end
          @pipe_out.close
          true
        end
      end

      def initialize(size:, jobs: {})
        @size = size
        @jobs = jobs
        @queue = Queue.new
        @pids = []
      end

      def spawn
        @workers = @size.times.map { Worker.new(@jobs) }
        @workers.each(&:spawn)
        @dispatcher_thread = Thread.new { dispatch_loop }
        @dispatcher_thread.abort_on_exception = true
        true
      end

      def dispatch_loop
        loop do
          case job = @queue.pop
          when nil
            @workers.each do |worker|
              worker.write([:exit])
              worker.close
            end
            return true
          else
            unless @workers.sample.write(job, block: false)
              free_worker.write(job)
            end
          end
        end
      end

      def free_worker
        IO.select(nil, @workers)[1].sample
      end

      def push(*args)
        @queue.push(args)
        nil
      end

      def shutdown
        @queue.close
        @dispatcher_thread.join
        @workers.each do |worker|
          _pid, status = Process.wait2(worker.pid)
          return status.exitstatus unless status.success?
        end
        nil
      end
    end
  end
end