File: BatchProcessor.rb

package info (click to toggle)
tj3 3.8.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,048 kB
  • sloc: ruby: 36,481; javascript: 1,113; sh: 19; makefile: 17
file content (353 lines) | stat: -rw-r--r-- 12,835 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
#!/usr/bin/env ruby -w
# encoding: UTF-8
#
# = BatchProcessor.rb -- The TaskJuggler III Project Management Software
#
# Copyright (c) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014
#               by Chris Schlaeger <cs@taskjuggler.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of version 2 of the GNU General Public License as
# published by the Free Software Foundation.
#

require 'thread'
require 'monitor'

class TaskJuggler

  # The JobInfo class is just a storage container for some batch job related
  # pieces of information. It contains things like a job id, the process id,
  # the stdout data and the like.
  class JobInfo

    attr_reader :jobId, :block, :tag
    attr_accessor :pid, :retVal, :stdoutP, :stdoutC, :stdout, :stdoutEOT,
                  :stderrP, :stderrC, :stderr, :stderrEOT

    def initialize(jobId, block, tag)
      # The job id. A unique number that is used by the BatchProcessor objects
      # to indentify jobs.
      @jobId = jobId
      # This the the block of code to be run as external process.
      @block = block
      # The tag can really be anything that the user of BatchProcessor needs
      # to uniquely identify the job.
      @tag = tag
      # The pipe to transfer stdout data from the child to the parent.
      @stdoutP, @stdoutC = nil
      # The stdout output of the child
      @stdout = ''
      # This flag is set to true when the EOT character has been received.
      @stdoutEOF = false
      # The pipe to transfer stderr data from the child to the parent.
      @stderrP, @stderrC = nil
      # The stderr output of the child
      @stderr = ''
      # This flag is set to true when the EOT character has been received.
      @stderrEOT = false
    end

    def openPipes
      @stdoutP, @stdoutC = IO.pipe
      @stderrP, @stderrC = IO.pipe
    end

  end

  # The BatchProcessor class can be used to run code blocks of the program as
  # a separate process. Mulitple pieces of code can be submitted to be
  # executed in parallel. The number of CPU cores to use is limited at object
  # creation time. The submitted jobs will be queued and scheduled to the
  # given number of CPUs. The usage model is simple. Create an BatchProcessor
  # object. Use BatchProcessor#queue to submit all the jobs and then use
  # BatchProcessor#wait to wait for completion and to process the results.
  class BatchProcessor

    # Create a BatchProcessor object. +maxCpuCores+ limits the number of
    # simultaneously spawned processes.
    def initialize(maxCpuCores)
      @maxCpuCores = maxCpuCores
      # Jobs submitted by calling queue() are put in the @toRunQueue. The
      # launcher Thread will pick them up and fork them off into another
      # process.
      @toRunQueue =  [ ]
      # A hash that maps the JobInfo objects of running jobs by their PID.
      @runningJobs = { }
      # A list of jobs that wait to complete their writing.
      @spoolingJobs = [ ]
      # The wait() method will then clean the @toDropQueue, executes the post
      # processing block and removes all JobInfo related objects.
      @toDropQueue = []

      # A semaphore to guard accesses to @runningJobs, @spoolingJobs and
      # following shared data structures.
      @lock = Monitor.new
      # We count the submitted and completed jobs. The @jobsIn counter also
      # doubles as a unique job ID.
      @jobsIn = @jobsOut = 0
      # An Array that holds all the IO objects to receive data from.
      @pipes = []
      # A hash that maps IO objects to JobInfo objects
      @pipeToJob = {}

      # This global flag is set to true to signal the threads to terminate.
      @terminate = false
      # Sleep time of the threads when no data is pending. This value must be
      # large enough to allow for a context switch between the sending
      # (forked-off) process and this process. If it's too large, throughput
      # will suffer.
      @timeout = 0.02

      Thread.abort_on_exception = true
    end

    # Add a new job the job queue. +tag+ is some data that the caller can use
    # to identify the job upon completion. +block+ is a Ruby code block to be
    # executed in a separate process.
    def queue(tag = nil, &block)

      # Create a new JobInfo object for the job and push it to the @toRunQueue.
      @lock.synchronize do
        raise 'You cannot call queue() while wait() is running!' if @jobsOut > 0

        # If this is the first queued job for this run, we have to start the
        # helper threads.
        if @jobsIn == 0
          # The JobInfo objects in the @toRunQueue are processed by the
          # launcher thread.  It forkes off processes to execute the code
          # block associated with the JobInfo.
          @launcher = Thread.new { launcher }
          # The receiver thread waits for terminated child processes and picks
          # up the results.
          @receiver = Thread.new { receiver }
          # The grabber thread collects $stdout and $stderr data from each
          # child process and stores them in the corresponding JobInfo.
          @grabber = Thread.new { grabber }
        end

        # To track a job through the queues, we use a JobInfo object to hold
        # all data associated with a job.
        job = JobInfo.new(@jobsIn, block, tag)
        # Increase job counter
        @jobsIn += 1
        # Push the job to the toRunQueue.
        @toRunQueue.push(job)
      end
    end

    # Wait for all jobs to complete. The code block will get the JobInfo
    # objects for each job to pick up the results.
    def wait
      # Don't wait if there are no jobs.
      return if @jobsIn == 0

      # When we have received as many jobs in the @toDropQueue than we have
      # started then we're done.
      while @lock.synchronize { @jobsOut < @jobsIn }
        job = nil
        @lock.synchronize do
          if !@toDropQueue.empty? && (job = @toDropQueue.pop)
            # Call the post-processing block that was passed to wait() with
            # the JobInfo object as argument.
            @jobsOut += 1
            yield(job)
          end
        end

        unless job
          sleep(@timeout)
        end
      end

      # Signal threads to stop
      @terminate = true
      # Wait for treads to finish
      @launcher.join
      @receiver.join
      @grabber.join

      # Reset some variables so we can reuse the object for further job runs.
      @jobsIn = @jobsOut = 0
      @terminate = false

      # Make sure all data structures are empty and clean.
      check
    end

    private

    # This function runs in a separate thread to pop JobInfo items from the
    # @toRunQueue and create child processes for them.
    def launcher
      # Run until the terminate flag is set.
      until @terminate
        job = nil
        unless @lock.synchronize { @runningJobs.length < @maxCpuCores &&
                                   (job = @toRunQueue.pop) }
          # We have no jobs in the @toRunQueue or all CPU cores in use already.
          sleep(@timeout)
        else
          @lock.synchronize do
            job.openPipes
            # Add the receiver end of the pipe to the pipes Arrays.
            @pipes << job.stdoutP
            @pipes << job.stderrP
            # Map the pipe end to this JobInfo object.
            @pipeToJob[job.stdoutP] = job
            @pipeToJob[job.stderrP] = job

            pid = fork do
              # This is the child process now. Connect $stdout and $stderr to
              # the pipes.
              $stdout.reopen(job.stdoutC)
              job.stdoutC.close
              $stderr.reopen(job.stderrC)
              job.stderrC.close
              # Call the Ruby code block
              retVal = job.block.call
              # Send EOT character to mark the end of the text.
              $stdout.putc 4
              $stdout.close
              $stderr.putc 4
              $stderr.close
              # Now exit the child process and return the return value of the
              # block as process return value.
              exit retVal
            end
            job.pid = pid
            # Save the process ID in the PID to JobInfo hash.
            @runningJobs[pid] = job
          end
        end
      end
    end

    # This function runs in a separate thread to wait for completed jobs. It
    # waits for the process completion and stores the result in the
    # corresponding JobInfo object. Aborted jobs are pushed to the
    # @toDropQueue while completed jobs are pushed to the @spoolingJobs queue.
    def receiver
      until @terminate
        pid = retVal = nil
        begin
          # Wait for the next job to complete.
          pid, retVal = Process.wait2
        rescue Errno::ECHILD
          # No running jobs. Wait a bit.
          sleep(@timeout)
        end

        if pid && retVal
          job = nil
          @lock.synchronize do
            # Get the JobInfo object that corresponds to the process ID. The
            # blocks passed to queue() or wait() may fork child processes as
            # well. If we get their PID, we can just ignore them.
            next if (job = @runningJobs[pid]).nil?
            # Remove the job from the @runningJobs Hash.
            @runningJobs.delete(pid)
            # Save the return value.
            job.retVal = retVal.exitstatus
            if retVal.signaled?
              cleanPipes(job)
              # Aborted jobs will probably not send an EOT. So we fastrack
              # them to the toDropQueue.
              @toDropQueue.push(job)
            else
              # Push the job into the @spoolingJobs list to wait for it to
              # finish writing IO.
              @spoolingJobs << job
            end
          end
        end
      end
    end

    # This function runs in a separate thread to pick up the $stdout and
    # $stderr outputs of the child processes. It stores them in the JobInfo
    # object that corresponds to each child process.
    def grabber
      until @terminate
        # Wait for output in any of the pipes or a timeout. To make sure that
        # we get all output, we remain in the loop until the select() call
        # times out.
        res = nil
        begin
          @lock.synchronize do
            if (res = IO.select(@pipes, nil, nil, @timeout))
              # We have output data from at least one child. Check which pipe
              # actually triggered the select.
              res[0].each do |pipe|
                # Find the corresponding JobInfo object.
                job = @pipeToJob[pipe]

                # Store the standard output.
                if pipe == job.stdoutP
                  # Look for the EOT character to signal the end of the text.
                  if pipe.closed? || (c = pipe.read_nonblock(1)) == ?\004
                    job.stdoutEOT = true
                  else
                    job.stdout << c
                  end
                end

                # Store the error output.
                if pipe == job.stderrP
                  # Look for the EOT character to signal the end of the text.
                  if pipe.closed? || (c = pipe.read_nonblock(1)) == ?\004
                    job.stderrEOT = true
                  else
                    job.stderr << c
                  end
                end
              end
            end
          end
          sleep(@timeout) unless res
        end while res

        # Search the @spoolingJobs list for jobs that have completed IO and
        # push them to the @toDropQueue.
        @lock.synchronize do
          @spoolingJobs.each do |job|
            # Both stdout and stderr need to have reached the end of text.
            if job.stdoutEOT && job.stderrEOT
              @spoolingJobs.delete(job)
              cleanPipes(job)
              @toDropQueue.push(job)
              # Since we deleted a list item during an iterator run, we
              # terminate the iterator.
              break
            end
          end
        end
      end
    end

    def cleanPipes(job)
      @pipes.delete(job.stdoutP)
      @pipeToJob.delete(job.stdoutP)
      @pipes.delete(job.stderrP)
      @pipeToJob.delete(job.stderrP)
      job.stdoutC.close
      job.stdoutP.close
      job.stderrC.close
      job.stderrP.close
      job.stdoutC = job.stderrC = nil
      job.stdoutP = job.stderrP = nil
    end

    def check
      raise "toRunQueue not empty!" unless @toRunQueue.empty?
      raise "runningJobs list not empty!" unless @runningJobs.empty?
      raise "spoolingJobs list not empty!" unless @spoolingJobs.empty?
      raise "toDropQueue not empty!" unless @toDropQueue.empty?

      raise "pipe list not empty!" unless @pipes.empty?
      raise "pipe map not empty!" unless @pipeToJob.empty?
    end

  end

end