1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import argparse
import glob
import logging
import os
import re
logging.basicConfig(format="[%(levelname)s] - %(message)s", level=logging.DEBUG)
log = logging.getLogger()
def parse_args():
parser = argparse.ArgumentParser(
description="Find optimizers that involve operators which may need an update to the supported opset versions."
)
root_arg = parser.add_argument(
"--ort-root", "-o", required=True, type=str, help="The root directory of the ONNX Runtime repository to search."
)
args = parser.parse_args()
if not os.path.isdir(args.ort_root):
raise argparse.ArgumentError(root_arg, f"{args.ort_root} is not a valid directory")
return args
def get_call_args_from_file(filename: str, function_or_declaration: str) -> list[str]:
"""
Search a file for all function calls or declarations that match the provided name.
Requires both the opening '(' and closing ')' to be on the same line.
Handles multiple calls being on the same line.
"""
results = []
with open(filename) as f:
line_num = 0
for line in f:
for match in re.finditer(function_or_declaration, line):
# check we have both the opening and closing brackets for the function call/declaration.
# if we do we have all the arguments
start = line.find("(", match.end())
end = line.find(")", match.end())
have_all_args = start != -1 and end != -1
if have_all_args:
results.append(line[start + 1 : end])
else:
# TODO: handle automatically by merging lines
log.error(
"Call/Declaration is split over multiple lines. Please check manually."
f"File:{filename} Line:{line_num}"
)
continue
line_num += 1
return results
def get_multiline_call_args_from_file(filename: str, function_or_declaration: str) -> list[str]:
"""
Search a file for all function calls or declarations that match the provided name.
Allows the opening '(' and closing ')' to be split across multiple lines.
Supports a single call per line.
"""
results = []
with open(filename) as f:
function_and_args = None
for line in f:
if not function_and_args:
# look for new match
start = line.find(function_or_declaration)
if start != -1:
function_and_args = line[start:].strip()
else:
# append to existing line and look for closing ')'
start = len(function_and_args)
function_and_args += line.strip()
if function_and_args:
end = function_and_args.find(")", start)
if end != -1:
start_args = function_and_args.find("(")
results.append(function_and_args[start_args + 1 : end])
function_and_args = None
return results
def _add_if_newer(domain: str, op: str, opset: int, op_to_opset: dict[str, int]):
key = domain + "." + op
if key not in op_to_opset or op_to_opset[key] < opset:
op_to_opset[key] = opset
def get_latest_ort_op_versions(root_dir):
"""Find the entries for the latest opset for each operator."""
op_to_opset = {}
files = [
# for ONNX operators we use get_latest_onnx_op_versions
# os.path.join(root_dir, "onnxruntime/core/providers/cpu/cpu_execution_provider.cc"),
# for internal kernels we use the current registrations
os.path.join(root_dir, "onnxruntime/contrib_ops/cpu/cpu_contrib_kernels.cc"),
os.path.join(root_dir, "onnxruntime/contrib_ops/cuda/cuda_contrib_kernels.cc"),
]
for file in files:
# e.g. class ONNX_OPERATOR_KERNEL_CLASS_NAME(kCpuExecutionProvider, kOnnxDomain, 11, Clip);
calls = get_multiline_call_args_from_file(file, "ONNX_OPERATOR_KERNEL_CLASS_NAME")
for call in calls:
args = call.split(",")
domain = args[1].strip()
opset = args[2].strip()
op = args[3].strip()
_add_if_newer(domain, op, int(opset), op_to_opset)
# e.g. class ONNX_OPERATOR_TYPED_KERNEL_CLASS_NAME(kCpuExecutionProvider, kOnnxDomain, 11, float, ArgMax);
calls = get_multiline_call_args_from_file(file, "ONNX_OPERATOR_TYPED_KERNEL_CLASS_NAME")
for call in calls:
args = call.split(",")
domain = args[1].strip()
opset = args[2].strip()
op = args[4].strip()
_add_if_newer(domain, op, int(opset), op_to_opset)
return op_to_opset
def get_latest_onnx_op_versions(root_dir):
"""Get the latest versions of the ONNX operators from the ONNX headers."""
op_to_opset = {}
files = [
# operators with domain of 'Onnx'
os.path.join(root_dir, "cmake/external/onnx/onnx/defs/operator_sets.h"),
# ML operators with domain of 'OnnxML'
os.path.join(root_dir, "cmake/external/onnx/onnx/defs/operator_sets_ml.h"),
]
for file in files:
# e.g. fn(GetOpSchema<ONNX_OPERATOR_SET_SCHEMA_CLASS_NAME(Onnx, 17, LayerNormalization)>());
# fn(GetOpSchema<ONNX_OPERATOR_SET_SCHEMA_CLASS_NAME(OnnxML, 3, TreeEnsembleClassifier)>());
calls = get_multiline_call_args_from_file(file, "ONNX_OPERATOR_SET_SCHEMA_CLASS_NAME")
for call in calls:
args = call.split(",")
orig_domain = args[0].strip()
# convert domain to the ORT constants
domain = "kMLDomain" if orig_domain == "OnnxML" else "kOnnxDomain"
opset = args[1].strip()
op = args[2].strip()
_add_if_newer(domain, op, int(opset), op_to_opset)
return op_to_opset
def find_potential_issues(root_dir, op_to_opset):
optimizer_dir = os.path.join(root_dir, "onnxruntime/core/optimizer")
files = glob.glob(optimizer_dir + "/**/*.cc", recursive=True)
files += glob.glob(optimizer_dir + "/**/*.h", recursive=True)
for file in files:
calls = get_call_args_from_file(file, "graph_utils::IsSupportedOptypeVersionAndDomain")
for call in calls:
# Need to handle multiple comma separated version numbers, and the optional domain argument.
# e.g. IsSupportedOptypeVersionAndDomain(node, "MaxPool", {1, 8, 10})
# IsSupportedOptypeVersionAndDomain(node, "FusedConv", {1}, kMSDomain)
args = call.split(",", 2) # first 2 args are simple, remainder need custom processing
op = args[1].strip()
if not op.startswith('"') or not op.endswith('"'):
log.error(f"Symbolic name of '{op}' found for op. Please check manually. File:{file}")
continue
versions_and_domain_arg = args[2]
v1 = versions_and_domain_arg.find("{")
v2 = versions_and_domain_arg.find("}")
versions = versions_and_domain_arg[v1 + 1 : v2].split(",")
last_version = versions[-1].strip()
domain_arg_start = versions_and_domain_arg.find(",", v2)
if domain_arg_start != -1:
domain = versions_and_domain_arg[domain_arg_start + 1 :].strip()
else:
domain = "kOnnxDomain"
op = domain + "." + op[1:-1]
if op in op_to_opset:
latest = op_to_opset[op]
if int(latest) != int(last_version):
log.warning(
f"Newer opset found for {op}. Latest:{latest} Optimizer support ends at {last_version}. File:{file}"
)
else:
log.error(f"Failed to find version information for {op}. File:{file}")
if __name__ == "__main__":
arguments = parse_args()
ort_to_opset_map = get_latest_ort_op_versions(arguments.ort_root)
onnx_op_to_opset_map = get_latest_onnx_op_versions(arguments.ort_root)
# merge the two maps
op_to_opset_map = {**ort_to_opset_map, **onnx_op_to_opset_map}
find_potential_issues(arguments.ort_root, op_to_opset_map)
|