1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
"""
New Commandline interface that supports ResolvedToolContracts and emitting ToolContracts
There's three use cases.
- running from an argparse instance
- running from a Resolved Tool Contract (RTC)
- emitting a ToolContract (TC)
Going to do this in a new steps.
- de-serializing of RTC (I believe this should be done via avro, not a new random JSON file. With avro, the java, c++, classes can be generated. Python can load the RTC via a structure dict that has a well defined schema)
- get loading and running of RTC from commandline to call main func in a report.
- generate/emit TC from a a common commandline parser interface that builds the TC and the standard argparse instance
"""
import argparse
import errno
import json
import logging
import os
import shutil
import sys
import time
import traceback
import pbcommand
from pbcommand.models import ResourceTypes, PacBioAlarm
from pbcommand.models.report import Report, Attribute
from pbcommand.common_options import add_base_options, add_nproc_option
from pbcommand.utils import get_parsed_args_log_level, get_peak_memory_usage
def _add_version(p, version):
p.version = version
p.add_argument('--version',
action="version",
help="show program's version number and exit")
return p
def get_default_argparser(version, description):
"""
Everyone should use this to create an instance on a argparser python parser.
*This should be replaced updated to have the required base options*
:param version: Version of your tool
:param description: Description of your tool
:return:
:rtype: ArgumentParser
"""
p = argparse.ArgumentParser(description=description,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# Explicitly adding here to have only --version (not -v)
return _add_version(p, version)
def get_default_argparser_with_base_opts(
version, description, default_level="INFO", nproc=None):
"""Return a parser with the default log related options
If you don't want the default log behavior to go to stdout, then set
the default log level to be "ERROR". This will essentially suppress all
output to stdout.
Default behavior will only emit to stderr. This is essentially a '--quiet'
default mode.
my-tool --my-opt=1234 file_in.txt
To override the default behavior and add a chatty-er stdout
my-tool --my-opt=1234 --log-level=INFO file_in.txt
Or write the console output to write the log file to an explict file and
leave the stdout untouched.
my-tool --my-opt=1234 --log-level=DEBUG --log-file=file.log file_in.txt
"""
p = add_base_options(
get_default_argparser(
version,
description),
default_level=default_level)
if nproc is not None:
p = add_nproc_option(p)
return p
def write_task_report(run_time, nproc, exit_code, maxrss):
attributes = [
Attribute("host", value=os.uname()[1]),
Attribute("system", value=os.uname()[0]),
Attribute("nproc", value=nproc),
Attribute("run_time", value=run_time),
Attribute("exit_code", value=exit_code),
Attribute("maxrss", value=maxrss)
]
report = Report("workflow_task",
title="Workflow Task Report",
attributes=attributes,
tags=("internal",))
report.write_json("task-report.json")
def _pacbio_main_runner(alog, setup_log_func, exe_main_func, *args, **kwargs):
"""
Runs a general func and logs results. The return type is expected to be an (int) return code.
:param alog: a log instance
:param func: a cli exe func, must return an int exit code. func(args) => Int, where args is parsed from p.parse_args()
:param args: parsed args from parser
:param setup_log_func: F(alog, level=value, file_name=value, formatter=value) or None
:return: Exit code of callable func
:rtype: int
"""
started_at = time.time()
pargs = args[0]
# default logging level
level = logging.INFO
if 'level' in kwargs:
level = kwargs.pop('level')
else:
level = get_parsed_args_log_level(pargs)
# None will default to stdout
log_file = getattr(pargs, 'log_file', None)
# Currently, only support to stdout. More customization would require
# more required commandline options in base parser (e.g., --log-file,
# --log-formatter)
log_options = dict(level=level, file_name=log_file)
base_dir = os.getcwd()
dump_alarm_on_error = False
if "dump_alarm_on_error" in kwargs:
dump_alarm_on_error = kwargs.pop("dump_alarm_on_error")
is_cromwell_environment = bool(
os.environ.get(
"SMRT_PIPELINE_BUNDLE_DIR",
None)) and "cromwell-executions" in base_dir
dump_alarm_on_error = dump_alarm_on_error and is_cromwell_environment
# The Setup log func must adhere to the pbcommand.utils.setup_log func
# signature
# FIXME. This should use the more concrete F(file_name_or_name, level, formatter)
# signature of setup_logger
if setup_log_func is not None and alog is not None:
setup_log_func(alog, **log_options)
alog.info("Using pbcommand v{v}".format(v=pbcommand.get_version()))
alog.info(
"completed setting up logger with {f}".format(
f=setup_log_func))
alog.info("log opts {d}".format(d=log_options))
if dump_alarm_on_error:
alog.info(
"This command appears to be running as part of a Cromwell workflow")
alog.info("Additional output files may be generated")
try:
# the code in func should catch any exceptions. The try/catch
# here is a fail safe to make sure the program doesn't fail
# and the makes sure the exit code is logged.
return_code = exe_main_func(*args, **kwargs)
run_time = time.time() - started_at
except Exception as e:
run_time = time.time() - started_at
if alog is not None:
alog.error(e, exc_info=True)
else:
traceback.print_exc(sys.stderr)
if dump_alarm_on_error:
PacBioAlarm.dump_error(
file_name=os.path.join(base_dir, "alarms.json"),
exception=e,
info="".join(traceback.format_exc()),
message=str(e),
name=e.__class__.__name__,
severity=logging.ERROR)
# We should have a standard map of exit codes to Int
if isinstance(e, IOError):
return_code = 1
else:
return_code = 2
maxrss = get_peak_memory_usage()
if is_cromwell_environment:
alog.info("Writing task report to task-report.json")
nproc = getattr(pargs, "nproc", 1)
write_task_report(run_time, nproc, return_code, maxrss)
_d = dict(r=return_code, s=run_time)
if alog is not None:
alog.info(f"Max RSS (kB): {maxrss}")
alog.info("exiting with return code {r} in {s:.2f} sec.".format(**_d))
return return_code
def pacbio_args_runner(argv, parser, args_runner_func, alog, setup_log_func,
dump_alarm_on_error=True):
# For tools that haven't yet implemented the ToolContract API
args = parser.parse_args(argv)
return _pacbio_main_runner(alog, setup_log_func, args_runner_func, args,
dump_alarm_on_error=dump_alarm_on_error)
|