1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
|
#!/usr/bin/python3
# Copyright (c) 2008-2025 the MRtrix3 contributors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Covered Software is provided under this License on an "as is"
# basis, without warranty of any kind, either expressed, implied, or
# statutory, including, without limitation, warranties that the
# Covered Software is free of defects, merchantable, fit for a
# particular purpose or non-infringing.
# See the Mozilla Public License v. 2.0 for more details.
#
# For more details, see http://www.mrtrix.org/.
import os, re, sys, threading
# Since we're going to capture everything after the colon character and "hide" it from argparse,
# we need to store the contents from there in a global so as for it to be accessible from execute()
CMDSPLIT = [ ]
def usage(cmdline): #pylint: disable=unused-variable
global CMDSPLIT
cmdline.set_author('Robert E. Smith (robert.smith@florey.edu.au) and David Raffelt (david.raffelt@florey.edu.au)')
cmdline.set_synopsis('Perform some arbitrary processing step for each of a set of inputs')
cmdline.add_description('This script greatly simplifies various forms of batch processing by enabling the execution of a command (or set of commands) independently for each of a set of inputs. Part of the way that this is achieved is by providing basic text substitutions, which simplify the formation of valid command strings based on the unique components of the input strings on which the script is instructed to execute. The available substitutions are listed below (note that the -test command-line option can be used to ensure correct command string formation prior to actually executing the commands):')
cmdline.add_description(' - IN: The full matching pattern, including leading folders. For example, if the target list contains a file "folder/image.mif", any occurrence of "IN" will be substituted with "folder/image.mif".')
cmdline.add_description(' - NAME: The basename of the matching pattern. For example, if the target list contains a file "folder/image.mif", any occurrence of "NAME" will be substituted with "image.mif".')
cmdline.add_description(' - PRE: The prefix of the input pattern (the basename stripped of its extension). For example, if the target list contains a file "folder/my.image.mif.gz", any occurrence of "PRE" will be substituted with "my.image".')
cmdline.add_description(' - UNI: The unique part of the input after removing any common prefix and common suffix. For example, if the target list contains files: "folder/001dwi.mif", "folder/002dwi.mif", "folder/003dwi.mif", any occurrence of "UNI" will be substituted with "001", "002", "003".')
cmdline.add_description('Note that due to a limitation of the Python "argparse" module, any command-line OPTIONS that the user intends to provide specifically to the for_each script must appear BEFORE providing the list of inputs on which for_each is intended to operate. While command-line options provided as such will be interpreted specifically by the for_each script, any command-line options that are provided AFTER the COLON separator will form part of the executed COMMAND, and will therefore be interpreted as command-line options having been provided to that underlying command.')
cmdline.add_example_usage('Demonstration of basic usage syntax',
'for_each folder/*.mif : mrinfo IN',
'This will run the "mrinfo" command for every .mif file present in "folder/". Note that the compulsory colon symbol is used to separate the list of items on which for_each is being instructed to operate, from the command that is intended to be run for each input.')
cmdline.add_example_usage('Multi-threaded use of for_each',
'for_each -nthreads 4 freesurfer/subjects/* : recon-all -subjid NAME -all',
'In this example, for_each is instructed to run the FreeSurfer command \'recon-all\' for all subjects within the \'subjects\' directory, with four subjects being processed in parallel at any one time. Whenever processing of one subject is completed, processing for a new unprocessed subject will commence. This technique is useful for improving the efficiency of running single-threaded commands on multi-core systems, as long as the system possesses enough memory to support such parallel processing. Note that in the case of multi-threaded commands (which includes many MRtrix3 commands), it is generally preferable to permit multi-threaded execution of the command on a single input at a time, rather than processing multiple inputs in parallel.')
cmdline.add_example_usage('Excluding specific inputs from execution',
'for_each *.nii -exclude 001.nii : mrconvert IN PRE.mif',
'Particularly when a wildcard is used to define the list of inputs for for_each, it is possible in some instances that this list will include one or more strings for which execution should in fact not be performed; for instance, if a command has already been executed for one or more files, and then for_each is being used to execute the same command for all other files. In this case, the -exclude option can be used to effectively remove an item from the list of inputs that would otherwise be included due to the use of a wildcard (and can be used more than once to exclude more than one string). In this particular example, mrconvert is instructed to perform conversions from NIfTI to MRtrix image formats, for all except the first image in the directory. Note that any usages of this option must appear AFTER the list of inputs. Note also that the argument following the -exclude option can alternatively be a regular expression, in which case any inputs for which a match to the expression is found will be excluded from processing.')
cmdline.add_example_usage('Testing the command string substitution',
'for_each -test * : mrconvert IN PRE.mif',
'By specifying the -test option, the script will print to the terminal the results of text substitutions for all of the specified inputs, but will not actually execute those commands. It can therefore be used to verify that the script is receiving the intended set of inputs, and that the text substitutions on those inputs lead to the intended command strings.')
cmdline.add_example_usage('Utilising shell operators within the command substitution',
'for_each * : tensor2metric IN/dwi.mif - "|" tensor2metric - -fa IN/fa.mif',
'In this example, if the double-quotes were NOT placed around the pipe operator, then the shell would take the sum total output of the for_each script and pipe that to a single invocation of the tensor2metric command. Since in this example it is instead desired for the pipe operator to be a part of the command string that is executed multiple times by the for_each script, it must be escaped using double-quotes.')
cmdline.add_argument('inputs', help='Each of the inputs for which processing should be run', nargs='+')
cmdline.add_argument('colon', help='Colon symbol (":") delimiting the for_each inputs & command-line options from the actual command to be executed', type=str, choices=[':'])
cmdline.add_argument('command', help='The command string to run for each input, containing any number of substitutions listed in the Description section', type=str)
cmdline.add_argument('-exclude', help='Exclude one specific input string / all strings matching a regular expression from being processed (see Example Usage)', action='append', metavar='"regex"', nargs=1)
cmdline.add_argument('-test', help='Test the operation of the for_each script, by printing the command strings following string substitution but not actually executing them', action='store_true', default=False)
# Usage of for_each needs to be handled slightly differently here:
# We want argparse to parse only the contents of the command-line before the colon symbol,
# as these are the items that pertain to the invocation of the for_each script;
# anything after the colon should instead form a part of the command that
# for_each is responsible for executing
try:
index = next(i for i,s in enumerate(sys.argv) if s == ':')
try:
CMDSPLIT = sys.argv[index+1:]
sys.argv = sys.argv[:index+1]
sys.argv.append(' '.join(CMDSPLIT))
except IndexError:
sys.stderr.write('Erroneous usage: No command specified (colon separator cannot be the last entry provided)\n')
sys.exit(0)
except StopIteration:
if len(sys.argv) > 2:
sys.stderr.write('Erroneous usage: A colon must be used to separate for_each inputs from the command to be executed\n')
sys.exit(0)
# These need to be globals in order to be accessible from execute_parallel()
class Shared(object):
def __init__(self):
self._job_index = 0
self.lock = threading.Lock()
self.stop = False
def next(self, jobs):
job = None
with self.lock:
if self._job_index < len(jobs):
job = jobs[self._job_index]
self._job_index += 1
self.stop = self._job_index == len(jobs)
return job
shared = Shared() #pylint: disable=invalid-name
KEYLIST = [ 'IN', 'NAME', 'PRE', 'UNI' ]
def execute(): #pylint: disable=unused-variable
from mrtrix3 import ANSI, MRtrixError #pylint: disable=no-name-in-module, import-outside-toplevel
from mrtrix3 import app, run #pylint: disable=no-name-in-module, import-outside-toplevel
inputs = app.ARGS.inputs
app.debug('All inputs: ' + str(inputs))
app.debug('Command: ' + str(app.ARGS.command))
app.debug('CMDSPLIT: ' + str(CMDSPLIT))
if app.ARGS.exclude:
app.ARGS.exclude = [ exclude[0] for exclude in app.ARGS.exclude ] # To deal with argparse's action=append. Always guaranteed to be only one argument since nargs=1
app.debug('To exclude: ' + str(app.ARGS.exclude))
exclude_unmatched = [ ]
to_exclude = [ ]
for exclude in app.ARGS.exclude:
if exclude in inputs:
to_exclude.append(exclude)
else:
try:
re_object = re.compile(exclude)
regex_hits = [ ]
for arg in inputs:
search_result = re_object.search(arg)
if search_result and search_result.group():
regex_hits.append(arg)
if regex_hits:
app.debug('Inputs excluded via regex "' + exclude + '": ' + str(regex_hits))
to_exclude.extend(regex_hits)
else:
app.debug('Compiled exclude regex "' + exclude + '" had no hits')
exclude_unmatched.append(exclude)
except re.error:
app.debug('Exclude string "' + exclude + '" did not compile as regex')
exclude_unmatched.append(exclude)
if exclude_unmatched:
app.warn('Item' + ('s' if len(exclude_unmatched) > 1 else '') + ' specified via -exclude did not result in item exclusion, whether by direct match or compilation as regex: ' + str('\'' + exclude_unmatched[0] + '\'' if len(exclude_unmatched) == 1 else exclude_unmatched))
inputs = [ arg for arg in inputs if arg not in to_exclude ]
if not inputs:
raise MRtrixError('No inputs remaining after application of exclusion criteri' + ('on' if len(app.ARGS.exclude) == 1 else 'a'))
app.debug('Inputs after exclusion: ' + str(inputs))
common_prefix = os.path.commonprefix(inputs)
common_suffix = os.path.commonprefix([i[::-1] for i in inputs])[::-1]
app.debug('Common prefix: ' + common_prefix if common_prefix else 'No common prefix')
app.debug('Common suffix: ' + common_suffix if common_suffix else 'No common suffix')
for entry in CMDSPLIT:
if os.path.exists(entry):
keys_present = [ key for key in KEYLIST if key in entry ]
if keys_present:
app.warn('Performing text substitution of ' + str(keys_present) + ' within command: "' + entry + '"; but the original text exists as a path on the file system... is this a problematic filesystem path?')
try:
next(entry for entry in CMDSPLIT if any(key for key in KEYLIST if key in entry))
except StopIteration:
raise MRtrixError('None of the unique for_each keys ' + str(KEYLIST) + ' appear in command string "' + app.ARGS.command + '"; no substitution can occur')
class Entry(object):
def __init__(self, input_text):
self.input_text = input_text
self.sub_in = input_text
self.sub_name = os.path.basename(input_text.rstrip('/'))
self.sub_pre = os.path.splitext(self.sub_name.rstrip('.gz'))[0]
if common_suffix:
self.sub_uni = input_text[len(common_prefix):-len(common_suffix)]
else:
self.sub_uni = input_text[len(common_prefix):]
self.substitutions = { 'IN': self.sub_in, 'NAME': self.sub_name, 'PRE': self.sub_pre, 'UNI': self.sub_uni }
app.debug('Input text: ' + input_text)
app.debug('Substitutions: ' + str(self.substitutions))
self.cmd = [ ]
for entry in CMDSPLIT:
for (key, value) in self.substitutions.items():
entry = entry.replace(key, value)
if ' ' in entry:
entry = '"' + entry + '"'
self.cmd.append(entry)
app.debug('Resulting command: ' + str(self.cmd))
self.outputtext = None
self.returncode = None
jobs = [ ]
for i in inputs:
jobs.append(Entry(i))
if app.ARGS.test:
app.console('Command strings for ' + str(len(jobs)) + ' jobs:')
for job in jobs:
sys.stderr.write(ANSI.execute + 'Input:' + ANSI.clear + ' "' + job.input_text + '"\n')
sys.stderr.write(ANSI.execute + 'Command:' + ANSI.clear + ' ' + ' '.join(job.cmd) + '\n')
return
parallel = app.NUM_THREADS is not None and app.NUM_THREADS > 1
def progress_string():
text = str(sum(1 if job.returncode is not None else 0 for job in jobs)) + \
'/' + \
str(len(jobs)) + \
' jobs completed ' + \
('across ' + str(app.NUM_THREADS) + ' threads' if parallel else 'sequentially')
fail_count = sum(bool(job.returncode) for job in jobs)
if fail_count:
text += ' (' + str(fail_count) + ' errors)'
return text
progress = app.ProgressBar(progress_string(), len(jobs))
def execute_parallel():
while not shared.stop:
my_job = shared.next(jobs)
if not my_job:
return
try:
result = run.command(' '.join(my_job.cmd), shell=True)
my_job.outputtext = result.stdout + result.stderr
my_job.returncode = 0
except run.MRtrixCmdError as exception:
my_job.outputtext = str(exception)
my_job.returncode = exception.returncode
except Exception as exception: # pylint: disable=broad-except
my_job.outputtext = str(exception)
my_job.returncode = 1
with shared.lock:
progress.increment(progress_string())
if parallel:
threads = [ ]
for i in range (1, app.NUM_THREADS):
thread = threading.Thread(target=execute_parallel)
thread.start()
threads.append(thread)
execute_parallel()
for thread in threads:
thread.join()
else:
for job in jobs:
try:
result = run.command(' '.join(job.cmd), shell=True)
job.outputtext = result.stdout + result.stderr
job.returncode = 0
except run.MRtrixCmdError as exception:
job.outputtext = str(exception)
job.returncode = exception.returncode
except Exception as exception: # pylint: disable=broad-except
job.outputtext = str(exception)
job.returncode = 1
progress.increment(progress_string())
progress.done()
assert all(job.returncode is not None for job in jobs)
fail_count = sum(bool(job.returncode) for job in jobs)
if fail_count:
app.warn(str(fail_count) + ' of ' + str(len(jobs)) + ' jobs did not complete successfully')
if fail_count > 1:
app.warn('Outputs from failed commands:')
sys.stderr.write(app.EXEC_NAME + ':\n')
else:
app.warn('Output from failed command:')
for job in jobs:
if job.returncode:
if job.outputtext:
app.warn('For input "' + job.sub_in + '" (returncode = ' + str(job.returncode) + '):')
for line in job.outputtext.splitlines():
sys.stderr.write(' ' * (len(app.EXEC_NAME)+2) + line + '\n')
else:
app.warn('No output from command for input "' + job.sub_in + '" (return code = ' + str(job.returncode) + ')')
if fail_count > 1:
sys.stderr.write(app.EXEC_NAME + ':\n')
raise MRtrixError(str(fail_count) + ' of ' + str(len(jobs)) + ' jobs did not complete successfully')
if app.VERBOSITY > 1:
if any(job.outputtext for job in jobs):
sys.stderr.write(app.EXEC_NAME + ':\n')
for job in jobs:
if job.outputtext:
app.console('Output of command for input "' + job.sub_in + '":')
for line in job.outputtext.splitlines():
sys.stderr.write(' ' * (len(app.EXEC_NAME)+2) + line + '\n')
else:
app.console('No output from command for input "' + job.sub_in + '"')
sys.stderr.write(app.EXEC_NAME + ':\n')
else:
app.console('No output from command for any inputs')
app.console('Script reported successful completion for all inputs')
# Execute the script
import mrtrix3
mrtrix3.execute() #pylint: disable=no-member
|