File: for_each

package info (click to toggle)
mrtrix3 3.0.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,300 kB
  • sloc: cpp: 130,470; python: 9,603; sh: 597; makefile: 62; xml: 47
file content (306 lines) | stat: -rwxr-xr-x 17,566 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#!/usr/bin/python3

# Copyright (c) 2008-2025 the MRtrix3 contributors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Covered Software is provided under this License on an "as is"
# basis, without warranty of any kind, either expressed, implied, or
# statutory, including, without limitation, warranties that the
# Covered Software is free of defects, merchantable, fit for a
# particular purpose or non-infringing.
# See the Mozilla Public License v. 2.0 for more details.
#
# For more details, see http://www.mrtrix.org/.


import os, re, sys, threading



# Since we're going to capture everything after the colon character and "hide" it from argparse,
#   we need to store the contents from there in a global so as for it to be accessible from execute()
CMDSPLIT = [ ]



def usage(cmdline): #pylint: disable=unused-variable
  global CMDSPLIT
  cmdline.set_author('Robert E. Smith (robert.smith@florey.edu.au) and David Raffelt (david.raffelt@florey.edu.au)')
  cmdline.set_synopsis('Perform some arbitrary processing step for each of a set of inputs')
  cmdline.add_description('This script greatly simplifies various forms of batch processing by enabling the execution of a command (or set of commands) independently for each of a set of inputs. Part of the way that this is achieved is by providing basic text substitutions, which simplify the formation of valid command strings based on the unique components of the input strings on which the script is instructed to execute. The available substitutions are listed below (note that the -test command-line option can be used to ensure correct command string formation prior to actually executing the commands):')
  cmdline.add_description('   - IN:   The full matching pattern, including leading folders. For example, if the target list contains a file "folder/image.mif", any occurrence of "IN" will be substituted with "folder/image.mif".')
  cmdline.add_description('   - NAME: The basename of the matching pattern. For example, if the target list contains a file "folder/image.mif", any occurrence of "NAME" will be substituted with "image.mif".')
  cmdline.add_description('   - PRE:  The prefix of the input pattern (the basename stripped of its extension). For example, if the target list contains a file "folder/my.image.mif.gz", any occurrence of "PRE" will be substituted with "my.image".')
  cmdline.add_description('   - UNI:  The unique part of the input after removing any common prefix and common suffix. For example, if the target list contains files: "folder/001dwi.mif", "folder/002dwi.mif", "folder/003dwi.mif", any occurrence of "UNI" will be substituted with "001", "002", "003".')
  cmdline.add_description('Note that due to a limitation of the Python "argparse" module, any command-line OPTIONS that the user intends to provide specifically to the for_each script must appear BEFORE providing the list of inputs on which for_each is intended to operate. While command-line options provided as such will be interpreted specifically by the for_each script, any command-line options that are provided AFTER the COLON separator will form part of the executed COMMAND, and will therefore be interpreted as command-line options having been provided to that underlying command.')
  cmdline.add_example_usage('Demonstration of basic usage syntax',
                            'for_each folder/*.mif : mrinfo IN',
                            'This will run the "mrinfo" command for every .mif file present in "folder/". Note that the compulsory colon symbol is used to separate the list of items on which for_each is being instructed to operate, from the command that is intended to be run for each input.')
  cmdline.add_example_usage('Multi-threaded use of for_each',
                            'for_each -nthreads 4 freesurfer/subjects/* : recon-all -subjid NAME -all',
                            'In this example, for_each is instructed to run the FreeSurfer command \'recon-all\' for all subjects within the \'subjects\' directory, with four subjects being processed in parallel at any one time. Whenever processing of one subject is completed, processing for a new unprocessed subject will commence. This technique is useful for improving the efficiency of running single-threaded commands on multi-core systems, as long as the system possesses enough memory to support such parallel processing. Note that in the case of multi-threaded commands (which includes many MRtrix3 commands), it is generally preferable to permit multi-threaded execution of the command on a single input at a time, rather than processing multiple inputs in parallel.')
  cmdline.add_example_usage('Excluding specific inputs from execution',
                            'for_each *.nii -exclude 001.nii : mrconvert IN PRE.mif',
                            'Particularly when a wildcard is used to define the list of inputs for for_each, it is possible in some instances that this list will include one or more strings for which execution should in fact not be performed; for instance, if a command has already been executed for one or more files, and then for_each is being used to execute the same command for all other files. In this case, the -exclude option can be used to effectively remove an item from the list of inputs that would otherwise be included due to the use of a wildcard (and can be used more than once to exclude more than one string). In this particular example, mrconvert is instructed to perform conversions from NIfTI to MRtrix image formats, for all except the first image in the directory. Note that any usages of this option must appear AFTER the list of inputs. Note also that the argument following the -exclude option can alternatively be a regular expression, in which case any inputs for which a match to the expression is found will be excluded from processing.')
  cmdline.add_example_usage('Testing the command string substitution',
                            'for_each -test * : mrconvert IN PRE.mif',
                            'By specifying the -test option, the script will print to the terminal the results of text substitutions for all of the specified inputs, but will not actually execute those commands. It can therefore be used to verify that the script is receiving the intended set of inputs, and that the text substitutions on those inputs lead to the intended command strings.')
  cmdline.add_example_usage('Utilising shell operators within the command substitution',
                            'for_each * : tensor2metric IN/dwi.mif - "|" tensor2metric - -fa IN/fa.mif',
                            'In this example, if the double-quotes were NOT placed around the pipe operator, then the shell would take the sum total output of the for_each script and pipe that to a single invocation of the tensor2metric command. Since in this example it is instead desired for the pipe operator to be a part of the command string that is executed multiple times by the for_each script, it must be escaped using double-quotes.')
  cmdline.add_argument('inputs',   help='Each of the inputs for which processing should be run', nargs='+')
  cmdline.add_argument('colon',    help='Colon symbol (":") delimiting the for_each inputs & command-line options from the actual command to be executed', type=str, choices=[':'])
  cmdline.add_argument('command',  help='The command string to run for each input, containing any number of substitutions listed in the Description section', type=str)
  cmdline.add_argument('-exclude', help='Exclude one specific input string / all strings matching a regular expression from being processed (see Example Usage)', action='append', metavar='"regex"', nargs=1)
  cmdline.add_argument('-test',    help='Test the operation of the for_each script, by printing the command strings following string substitution but not actually executing them', action='store_true', default=False)

  # Usage of for_each needs to be handled slightly differently here:
  # We want argparse to parse only the contents of the command-line before the colon symbol,
  #   as these are the items that pertain to the invocation of the for_each script;
  #   anything after the colon should instead form a part of the command that
  #   for_each is responsible for executing
  try:
    index = next(i for i,s in enumerate(sys.argv) if s == ':')
    try:
      CMDSPLIT = sys.argv[index+1:]
      sys.argv = sys.argv[:index+1]
      sys.argv.append(' '.join(CMDSPLIT))
    except IndexError:
      sys.stderr.write('Erroneous usage: No command specified (colon separator cannot be the last entry provided)\n')
      sys.exit(0)
  except StopIteration:
    if len(sys.argv) > 2:
      sys.stderr.write('Erroneous usage: A colon must be used to separate for_each inputs from the command to be executed\n')
      sys.exit(0)








# These need to be globals in order to be accessible from execute_parallel()
class Shared(object):
  def __init__(self):
    self._job_index = 0
    self.lock = threading.Lock()
    self.stop = False
  def next(self, jobs):
    job = None
    with self.lock:
      if self._job_index < len(jobs):
        job = jobs[self._job_index]
        self._job_index += 1
        self.stop = self._job_index == len(jobs)
    return job

shared = Shared() #pylint: disable=invalid-name



KEYLIST = [ 'IN', 'NAME', 'PRE', 'UNI' ]



def execute(): #pylint: disable=unused-variable
  from mrtrix3 import ANSI, MRtrixError #pylint: disable=no-name-in-module, import-outside-toplevel
  from mrtrix3 import app, run #pylint: disable=no-name-in-module, import-outside-toplevel

  inputs = app.ARGS.inputs
  app.debug('All inputs: ' + str(inputs))
  app.debug('Command: ' + str(app.ARGS.command))
  app.debug('CMDSPLIT: ' + str(CMDSPLIT))

  if app.ARGS.exclude:
    app.ARGS.exclude = [ exclude[0] for exclude in app.ARGS.exclude ] # To deal with argparse's action=append. Always guaranteed to be only one argument since nargs=1
    app.debug('To exclude: ' + str(app.ARGS.exclude))
    exclude_unmatched = [ ]
    to_exclude = [ ]
    for exclude in app.ARGS.exclude:
      if exclude in inputs:
        to_exclude.append(exclude)
      else:
        try:
          re_object = re.compile(exclude)
          regex_hits = [ ]
          for arg in inputs:
            search_result = re_object.search(arg)
            if search_result and search_result.group():
              regex_hits.append(arg)
          if regex_hits:
            app.debug('Inputs excluded via regex "' + exclude + '": ' + str(regex_hits))
            to_exclude.extend(regex_hits)
          else:
            app.debug('Compiled exclude regex "' + exclude + '" had no hits')
            exclude_unmatched.append(exclude)
        except re.error:
          app.debug('Exclude string "' + exclude + '" did not compile as regex')
          exclude_unmatched.append(exclude)
    if exclude_unmatched:
      app.warn('Item' + ('s' if len(exclude_unmatched) > 1 else '') + ' specified via -exclude did not result in item exclusion, whether by direct match or compilation as regex: ' + str('\'' + exclude_unmatched[0] + '\'' if len(exclude_unmatched) == 1 else exclude_unmatched))
    inputs = [ arg for arg in inputs if arg not in to_exclude ]
    if not inputs:
      raise MRtrixError('No inputs remaining after application of exclusion criteri' + ('on' if len(app.ARGS.exclude) == 1 else 'a'))
    app.debug('Inputs after exclusion: ' + str(inputs))

  common_prefix = os.path.commonprefix(inputs)
  common_suffix = os.path.commonprefix([i[::-1] for i in inputs])[::-1]
  app.debug('Common prefix: ' + common_prefix if common_prefix else 'No common prefix')
  app.debug('Common suffix: ' + common_suffix if common_suffix else 'No common suffix')

  for entry in CMDSPLIT:
    if os.path.exists(entry):
      keys_present = [ key for key in KEYLIST if key in entry ]
      if keys_present:
        app.warn('Performing text substitution of ' + str(keys_present) + ' within command: "' + entry + '"; but the original text exists as a path on the file system... is this a problematic filesystem path?')

  try:
    next(entry for entry in CMDSPLIT if any(key for key in KEYLIST if key in entry))
  except StopIteration:
    raise MRtrixError('None of the unique for_each keys ' + str(KEYLIST) + ' appear in command string "' + app.ARGS.command + '"; no substitution can occur')

  class Entry(object):
    def __init__(self, input_text):
      self.input_text = input_text
      self.sub_in = input_text
      self.sub_name = os.path.basename(input_text.rstrip('/'))
      self.sub_pre = os.path.splitext(self.sub_name.rstrip('.gz'))[0]
      if common_suffix:
        self.sub_uni = input_text[len(common_prefix):-len(common_suffix)]
      else:
        self.sub_uni = input_text[len(common_prefix):]

      self.substitutions = { 'IN': self.sub_in, 'NAME': self.sub_name, 'PRE': self.sub_pre, 'UNI': self.sub_uni }
      app.debug('Input text: ' + input_text)
      app.debug('Substitutions: ' + str(self.substitutions))

      self.cmd = [ ]
      for entry in CMDSPLIT:
        for (key, value) in self.substitutions.items():
          entry = entry.replace(key, value)
        if ' ' in entry:
          entry = '"' + entry + '"'
        self.cmd.append(entry)
      app.debug('Resulting command: ' + str(self.cmd))

      self.outputtext = None
      self.returncode = None

  jobs = [ ]
  for i in inputs:
    jobs.append(Entry(i))

  if app.ARGS.test:
    app.console('Command strings for ' + str(len(jobs)) + ' jobs:')
    for job in jobs:
      sys.stderr.write(ANSI.execute + 'Input:' + ANSI.clear + '   "' + job.input_text + '"\n')
      sys.stderr.write(ANSI.execute + 'Command:' + ANSI.clear + ' ' + ' '.join(job.cmd) + '\n')
    return

  parallel = app.NUM_THREADS is not None and app.NUM_THREADS > 1

  def progress_string():
    text = str(sum(1 if job.returncode is not None else 0 for job in jobs)) + \
        '/' + \
        str(len(jobs)) + \
        ' jobs completed ' + \
        ('across ' + str(app.NUM_THREADS) + ' threads' if parallel else 'sequentially')
    fail_count = sum(bool(job.returncode) for job in jobs)
    if fail_count:
      text += ' (' + str(fail_count) + ' errors)'
    return text

  progress = app.ProgressBar(progress_string(), len(jobs))

  def execute_parallel():
    while not shared.stop:
      my_job = shared.next(jobs)
      if not my_job:
        return
      try:
        result = run.command(' '.join(my_job.cmd), shell=True)
        my_job.outputtext = result.stdout + result.stderr
        my_job.returncode = 0
      except run.MRtrixCmdError as exception:
        my_job.outputtext = str(exception)
        my_job.returncode = exception.returncode
      except Exception as exception: # pylint: disable=broad-except
        my_job.outputtext = str(exception)
        my_job.returncode = 1
      with shared.lock:
        progress.increment(progress_string())

  if parallel:
    threads = [ ]
    for i in range (1, app.NUM_THREADS):
      thread = threading.Thread(target=execute_parallel)
      thread.start()
      threads.append(thread)
    execute_parallel()
    for thread in threads:
      thread.join()
  else:
    for job in jobs:
      try:
        result = run.command(' '.join(job.cmd), shell=True)
        job.outputtext = result.stdout + result.stderr
        job.returncode = 0
      except run.MRtrixCmdError as exception:
        job.outputtext = str(exception)
        job.returncode = exception.returncode
      except Exception as exception: # pylint: disable=broad-except
        job.outputtext = str(exception)
        job.returncode = 1
      progress.increment(progress_string())

  progress.done()

  assert all(job.returncode is not None for job in jobs)
  fail_count = sum(bool(job.returncode) for job in jobs)
  if fail_count:
    app.warn(str(fail_count) + ' of ' + str(len(jobs)) + ' jobs did not complete successfully')
    if fail_count > 1:
      app.warn('Outputs from failed commands:')
      sys.stderr.write(app.EXEC_NAME + ':\n')
    else:
      app.warn('Output from failed command:')
    for job in jobs:
      if job.returncode:
        if job.outputtext:
          app.warn('For input "' + job.sub_in + '" (returncode = ' + str(job.returncode) + '):')
          for line in job.outputtext.splitlines():
            sys.stderr.write(' ' * (len(app.EXEC_NAME)+2) + line + '\n')
        else:
          app.warn('No output from command for input "' + job.sub_in + '" (return code = ' + str(job.returncode) + ')')
        if fail_count > 1:
          sys.stderr.write(app.EXEC_NAME + ':\n')
    raise MRtrixError(str(fail_count) + ' of ' + str(len(jobs)) + ' jobs did not complete successfully')

  if app.VERBOSITY > 1:
    if any(job.outputtext for job in jobs):
      sys.stderr.write(app.EXEC_NAME + ':\n')
      for job in jobs:
        if job.outputtext:
          app.console('Output of command for input "' + job.sub_in + '":')
          for line in job.outputtext.splitlines():
            sys.stderr.write(' ' * (len(app.EXEC_NAME)+2) + line + '\n')
        else:
          app.console('No output from command for input "' + job.sub_in + '"')
        sys.stderr.write(app.EXEC_NAME + ':\n')
    else:
      app.console('No output from command for any inputs')

  app.console('Script reported successful completion for all inputs')






# Execute the script
import mrtrix3
mrtrix3.execute() #pylint: disable=no-member