File: in_parallel.rb

package info (click to toggle)
ruby-in-parallel 1.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 224 kB
  • sloc: ruby: 487; sh: 11; makefile: 3
file content (402 lines) | stat: -rw-r--r-- 17,675 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
require_relative 'parallel_logger'
require_relative 'parallel_enumerable'
require 'tempfile'

module InParallel
  include ParallelLogger

  class InParallelExecutor
    # How many seconds between outputting to stdout that we are waiting for child processes.
    # 0 or < 0 means no signaling.
    @@parallel_signal_interval = 30
    @@parallel_default_timeout = 1800

    @@process_infos = []

    def self.process_infos
      @@process_infos
    end

    @@background_objs = []
    @@result_id       = 0

    @@pids = []

    @@main_pid = Process.pid

    def self.main_pid
      @@main_pid
    end

    def self.parallel_default_timeout
      @@parallel_default_timeout
    end

    def self.parallel_default_timeout=(value)
      @@parallel_default_timeout = value
    end

    def self.logger
      @@logger
    end

    def self.logger=(value)
      @@logger = value
    end

    # Runs all methods within the block in parallel and waits for them to complete
    #
    # Example - will spawn 2 processes, (1 for each method) wait until they both complete, and log STDOUT:
    #   InParallel.run_in_parallel do
    #     @result_1 = method1
    #     @result_2 = method2
    #   end
    # NOTE: Only supports assigning instance variables within the block, not local variables
    def self.run_in_parallel(timeout = @@parallel_default_timeout, kill_all_on_error = false, &block)
      if fork_supported?
        proxy = BlankBindingParallelProxy.new(block.binding)
        proxy.instance_eval(&block)
        return wait_for_processes(proxy, block.binding, timeout, kill_all_on_error)
      end
      # if fork is not supported
      block.call
    end

    # Runs all methods within the block in parallel in the background
    #
    # Example - Will spawn a process in the background to run puppet agent on two agents and return immediately:
    #   Parallel.run_in_background do
    #     @result_1 = method1
    #     @result_2 = method2
    #   end
    #   # Do something else here before waiting for the process to complete
    #
    #   # Optionally wait for the processes to complete before continuing.
    #   # Otherwise use run_in_background(true) to clean up the process status and output immediately.
    #   wait_for_processes(self)
    #
    # NOTE: must call get_background_results to allow instance variables in calling object to be set, otherwise @result_1 will evaluate to "unresolved_parallel_result_0"
    def self.run_in_background(ignore_result = true, &block)
      if fork_supported?
        proxy = BlankBindingParallelProxy.new(block.binding)
        proxy.instance_eval(&block)

        if ignore_result
          Process.detach(@@process_infos.last[:pid])
          @@process_infos.pop
        else
          @@background_objs << { :proxy => proxy, :target => block.binding }
          return process_infos.last[:tmp_result]
        end
        return
      end
      # if fork is not supported
      result = block.call
      return nil if ignore_result
      result
    end

    # Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class
    # @param [Object] proxy - The instance of the proxy class that the method was executed within (probably only useful when called by run_in_background)
    # @param [Object] binding - The binding of the block to assign return values to instance variables (probably only useful when called by run_in_background)
    # @param [Int] timeout Time in seconds to wait before giving up on a child process
    # @param [Boolean] kill_all_on_error Whether to wait for all processes to complete, or fail immediately - killing all other forked processes - when one process errors.
    def self.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all_on_error = false)
      raise_error = nil
      timeout     ||= @@parallel_default_timeout
      send_int = false
      trap(:INT) do
        # Can't use logger inside of trap
        puts "Warning, recieved interrupt.  Processing child results and exiting."
        send_int = true
        kill_child_processes
      end
      return unless Process.respond_to?(:fork)
      # Custom process to wait so that we can do things like time out, and kill child processes if
      # one process returns with an error before the others complete.
      results_map = Array.new(@@process_infos.count)
      start_time  = Time.now
      timer       = start_time
      while !@@process_infos.empty? do
        if @@parallel_signal_interval > 0 && Time.now > timer + @@parallel_signal_interval
          @@logger.debug 'Waiting for child processes.'
          timer = Time.now
        end
        if Time.now > start_time + timeout
          kill_child_processes
          raise_error = ::RuntimeError.new("Child process ran longer than timeout of #{timeout}")
        end

        if result = IO.select(@@process_infos.map {|p| p[:result]}, nil, nil, 0.5)
          read_ios = result.first
          read_ios.each do |reader|
            process_info = @@process_infos.find {|p| p[:result] == reader}
            process_info[:result_buffer] << reader.read
            if reader.eof?
              result = process_info[:result_buffer].string
              # the process completed, get the result and rethrow on error.
              begin
                # Print the STDOUT and STDERR for each process with signals for start and end
                @@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}"
                # Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc)
                # So don't use logger, just use puts.
                puts "  " + File.new(process_info[:std_out], 'r').readlines.join("  ")
                @@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}"
                marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result)
                # Kill all other processes and let them log their stdout before re-raising
                # if a child process raised an error.
                if marshalled_result.is_a?(Exception)
                  raise_error = marshalled_result.dup
                  kill_child_processes if kill_all_on_error
                    marshalled_result = nil
                end
                results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result }
              ensure
                File.delete(process_info[:std_out]) if File.exist?(process_info[:std_out])
                # close the read end pipe
                process_info[:result].close unless process_info[:result].closed?
                @@process_infos.delete(process_info)
              end
            end
          end
        end
      end

      results = []

      # pass in the 'self' from the block.binding which is the instance of the class
      # that contains the initial binding call.
      # This gives us access to the instance variables from that context.
      results = result_lookup(proxy, binding, results_map) if binding

      # If there are background_objs AND results, don't return the background obj results
      # (which would mess up expected results from each_in_parallel),
      # but do process their results in case they are assigned to instance variables
      @@background_objs.each { |obj| result_lookup(obj[:proxy], obj[:target], results_map) }
      @@background_objs.clear

      Process.kill("INT", Process.pid) if send_int
      raise raise_error unless raise_error.nil?
      return results
    end

    # private method to execute a block of code in a separate process and store the STDOUT and return value for later retrieval
    def self._execute_in_parallel(method_sym, obj = self, &block)
      ret_val                   = nil
      # Communicate the return value of the method or block
      read_result, write_result = IO.pipe
      Dir.mkdir('tmp') unless Dir.exist? 'tmp'
      pid                       = fork do
        stdout_file = File.new("tmp/pp_#{Process.pid}", 'w')
        exit_status = 0
        trap(:INT) do
          # Can't use logger inside of trap
          puts "Warning: Interrupt received in child process; exiting #{Process.pid}"
          kill_child_processes
          return
        end

        # IO buffer is 64kb, which isn't much... if debug logging is turned on,
        # this can be exceeded before a process completes.
        # Storing output in file rather than using IO.pipe
        STDOUT.reopen(stdout_file)
        STDERR.reopen(stdout_file)

        begin
          # close subprocess's copy of read_result since it only needs to write
          read_result.close
          ret_val = obj.instance_eval(&block)
          ret_val = strip_singleton(ret_val)
          # In case there are other types that can't be dumped
          begin
            # Write the result to the write_result IO stream.
            Marshal.dump(ret_val, write_result) unless ret_val.nil?
          rescue StandardError => err
            @@logger.warn "Warning: return value from child process #{ret_val} " +
                              "could not be transferred to parent process: #{err.message}"
          end
        rescue Exception => err
          @@logger.error "Error in process #{Process.pid}: #{err.message}"
          # Return the error if an error is rescued so we can re-throw in the main process.
          Marshal.dump(err, write_result)
          exit_status = 1
        ensure
          write_result.close
          exit exit_status
        end
      end

      @@logger.info "Forked process for #{method_sym} - PID = '#{pid}'"
      write_result.close
      # Process.detach returns a thread that will be nil if the process is still running and thr if not.
      # This allows us to check to see if processes have exited without having to call the blocking Process.wait functions.
      wait_thread  = Process.detach(pid)
      # store the IO object with the STDOUT and waiting thread for each pid
      process_info = { :wait_thread => wait_thread,
                       :pid         => pid,
                       :method_sym  => method_sym,
                       :std_out     => "tmp/pp_#{pid}",
                       :result      => read_result,
                       :tmp_result  => "unresolved_parallel_result_#{@@result_id}",
                       :result_buffer => StringIO.new,
                       :index       => @@process_infos.count }
      @@process_infos.push(process_info)
      @@result_id += 1
      process_info
    end

    def self.fork_supported?
      @@supported ||= Process.respond_to?(:fork)
      @@logger.warn 'Warning: Fork is not supported on this OS, executing block normally' unless @@supported
      @@supported
    end

    def self.kill_child_processes
      @@process_infos.each do |process_info|
        # Send INT to each child process so it returns and can print stdout and stderr to console before exiting.
        begin
          Process.kill("INT", process_info[:pid])
        rescue Errno::ESRCH
          # If one of the other processes has completed in the very short time before we try to kill it, handle the exception
        end
      end
    end
    private_class_method :kill_child_processes

    def self.strip_singleton(obj)
      unless (obj.nil? || obj.singleton_methods.empty?)
        obj = obj.dup
      end
      begin
        obj.singleton_class.class_eval do
          instance_variables.each { |v| instance_eval("remove_instance_variable(:#{v})") }
        end
      rescue TypeError # if no singleton_class exists for the object it raises a TypeError
      end

      # Recursively check any objects assigned to instance variables for singleton methods, or variables
      obj.instance_variables.each do |v|
        obj.instance_variable_set(v, strip_singleton(obj.instance_variable_get(v)))
      end
      obj
    end
    private_class_method :strip_singleton

    # Private method to lookup results from the results_map and replace the
    # temp values with actual return values
    def self.result_lookup(proxy_obj, target_obj, results_map)
      target_obj = eval('self', target_obj)
      proxy_obj  ||= target_obj
      vars       = proxy_obj.instance_variables
      results    = []
      results_map.each do |tmp_result|
        results << tmp_result.values[0]
        vars.each do |var|
          if proxy_obj.instance_variable_get(var) == tmp_result.keys[0]
            target_obj.instance_variable_set(var, tmp_result.values[0])
            break
          end
        end
      end
      results
    end

    private_class_method :result_lookup

    # Proxy class used to wrap each method execution in a block and run it in parallel
    # A block from Parallel.run_in_parallel is executed with a binding of an instance of this class
    class BlankBindingParallelProxy < BasicObject
      # Don't worry about running methods like puts or other basic stuff in parallel
      include ::Kernel

      def initialize(obj)
        @object    = obj
        @result_id = 0
      end

      # All methods within the block should show up as missing (unless defined in :Kernel)
      def method_missing(method_sym, *args, &block)
        if InParallelExecutor.main_pid == ::Process.pid
          out = InParallelExecutor._execute_in_parallel("'#{method_sym.to_s}' #{caller[0].to_s}",
                                                        @object.eval('self')) { send(method_sym, *args, &block) }
          out[:tmp_result]
        end
      end
    end
  end

  InParallelExecutor.logger = @logger

  # Gets how many seconds to wait between logging a 'Waiting for child processes.'
  def parallel_signal_interval
    InParallelExecutor.parallel_signal_interval
  end

  # Sets how many seconds to wait between logging a 'Waiting for child processes.'
  # @param [Int] value Time in seconds to wait before logging 'Waiting for child processes.'
  def parallel_signal_interval=(value)
    InParallelExecutor.parallel_signal_interval = value
  end

  # Gets how many seconds to wait before timing out a forked child process and raising an exception
  def parallel_default_timeout
    InParallelExecutor.parallel_default_timeout
  end

  # Sets how many seconds to wait before timing out a forked child process and raising an exception
  # @param [Int] value Time in seconds to wait before timing out and raising an exception
  def parallel_default_timeout=(value)
    InParallelExecutor.parallel_default_timeout = value
  end

  # Executes each method within a block in a different process.
  #
  # Example - Will spawn a process in the background to execute each method
  #   Parallel.run_in_parallel do
  #     @result_1 = method1
  #     @result_2 = method2
  #   end
  # NOTE - Only instance variables can be assigned the return values of the methods within the block. Local variables will not be assigned any values.
  # @param [Int] timeout Time in seconds to wait before giving up on a child process
  # @param [Boolean] kill_all_on_error Whether to wait for all processes to complete, or fail immediately - killing all other forked processes - when one process errors.
  # @param [Block] block This method will yield to a block of code passed by the caller
  # @return [Array<Result>, Result] the return values of each method within the block
  def run_in_parallel(timeout=nil, kill_all_on_error = false, &block)
    timeout ||= InParallelExecutor.parallel_default_timeout
    InParallelExecutor.run_in_parallel(timeout, kill_all_on_error, &block)
  end

  # Forks a process for each method within a block and returns immediately.
  #
  # Example 1 - Will fork a process in the background to execute each method and return immediately:
  #   Parallel.run_in_background do
  #     @result_1 = method1
  #     @result_2 = method2
  #   end
  #
  # Example 2 - Will fork a process in the background to execute each method, return immediately, then later
  # wait for the process to complete, printing it's STDOUT and assigning return values to instance variables:
  #   Parallel.run_in_background(false) do
  #     @result_1 = method1
  #     @result_2 = method2
  #   end
  #   # Do something else here before waiting for the process to complete
  #
  #   wait_for_processes
  # NOTE: must call wait_for_processes to allow instance variables within the block to be set, otherwise results will evaluate to "unresolved_parallel_result_X"
  # @param [Boolean] ignore_result True if you do not care about the STDOUT or return value of the methods executing in the background
  # @param [Block] block This method will yield to a block of code passed by the caller
  # @return [Array<Result>, Result] the return values of each method within the block
  def run_in_background(ignore_result = true, &block)
    InParallelExecutor.run_in_background(ignore_result, &block)
  end

  # Waits for all processes started by run_in_background to complete execution, then prints STDOUT and assigns return values to instance variables.  See :run_in_background
  # @param [Int] timeout Time in seconds to wait before giving up on a child process
  # @param [Boolean] kill_all_on_error Whether to wait for all processes to complete, or fail immediately - killing all other forked processes - when one process errors.
  # @return [Array<Result>, Result] the temporary return values of each method within the block
  def wait_for_processes(timeout=nil, kill_all_on_error = false)
    timeout ||= InParallelExecutor.parallel_default_timeout
    InParallelExecutor.wait_for_processes(nil, nil, timeout, kill_all_on_error)
  end
end