1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
import json
import os
import time
from typing import Any, Dict, List, Set, Tuple
from ..util.setting import (
CompilerType,
JSON_FOLDER_BASE_DIR,
TestList,
TestPlatform,
TestStatusType,
)
from ..util.utils import (
detect_compiler_type,
print_error,
print_time,
related_to_test_list,
)
from .parser.coverage_record import CoverageRecord
from .parser.gcov_coverage_parser import GcovCoverageParser
from .parser.llvm_coverage_parser import LlvmCoverageParser
from .print_report import (
file_oriented_report,
html_oriented_report,
line_oriented_report,
)
# coverage_records: Dict[str, LineInfo] = {}
covered_lines: Dict[str, Set[int]] = {}
uncovered_lines: Dict[str, Set[int]] = {}
tests_type: TestStatusType = {"success": set(), "partial": set(), "fail": set()}
def transform_file_name(
file_path: str, interested_folders: List[str], platform: TestPlatform
) -> str:
remove_patterns: Set[str] = {".DEFAULT.cpp", ".AVX.cpp", ".AVX2.cpp"}
for pattern in remove_patterns:
file_path = file_path.replace(pattern, "")
# if user has specifiled interested folder
if interested_folders:
for folder in interested_folders:
if folder in file_path:
return file_path[file_path.find(folder) :]
# remove pytorch base folder path
if platform == TestPlatform.OSS:
from package.oss.utils import get_pytorch_folder
pytorch_foler = get_pytorch_folder()
assert file_path.startswith(pytorch_foler)
file_path = file_path[len(pytorch_foler) + 1 :]
return file_path
def is_intrested_file(
file_path: str, interested_folders: List[str], platform: TestPlatform
) -> bool:
ignored_patterns = ["cuda", "aten/gen_aten", "aten/aten_", "build/"]
if any([pattern in file_path for pattern in ignored_patterns]):
return False
# ignore files that are not belong to pytorch
if platform == TestPlatform.OSS:
from package.oss.utils import get_pytorch_folder
if not file_path.startswith(get_pytorch_folder()):
return False
# if user has specifiled interested folder
if interested_folders:
for folder in interested_folders:
intersted_folder_path = folder if folder.endswith("/") else f"{folder}/"
if intersted_folder_path in file_path:
return True
return False
else:
return True
def get_json_obj(json_file: str) -> Tuple[Any, int]:
"""
Sometimes at the start of file llvm/gcov will complains "fail to find coverage data",
then we need to skip these lines
-- success read: 0 - this json file have the full json coverage information
-- partial success: 1 - this json file starts with some error prompt, but still have the coverage information
-- fail to read: 2 - this json file doesn't have any coverage information
"""
read_status = -1
with open(json_file) as f:
lines = f.readlines()
for line in lines:
try:
json_obj = json.loads(line)
except json.JSONDecodeError:
read_status = 1
continue
else:
if read_status == -1:
# not meet jsonDecoderError before, return success
read_status = 0
return (json_obj, read_status)
return None, 2
def parse_json(json_file: str, platform: TestPlatform) -> List[CoverageRecord]:
print("start parse:", json_file)
json_obj, read_status = get_json_obj(json_file)
if read_status == 0:
tests_type["success"].add(json_file)
elif read_status == 1:
tests_type["partial"].add(json_file)
else:
tests_type["fail"].add(json_file)
raise RuntimeError(
"Fail to do code coverage! Fail to load json file: ", json_file
)
cov_type = detect_compiler_type(platform)
coverage_records: List[CoverageRecord] = []
if cov_type == CompilerType.CLANG:
coverage_records = LlvmCoverageParser(json_obj).parse("fbcode")
# print(coverage_records)
elif cov_type == CompilerType.GCC:
coverage_records = GcovCoverageParser(json_obj).parse()
return coverage_records
def parse_jsons(
test_list: TestList, interested_folders: List[str], platform: TestPlatform
) -> None:
g = os.walk(JSON_FOLDER_BASE_DIR)
for path, _, file_list in g:
for file_name in file_list:
if file_name.endswith(".json"):
# if compiler is clang, we only analyze related json / when compiler is gcc, we analyze all jsons
cov_type = detect_compiler_type(platform)
if cov_type == CompilerType.CLANG and not related_to_test_list(
file_name, test_list
):
continue
json_file = os.path.join(path, file_name)
try:
coverage_records = parse_json(json_file, platform)
except RuntimeError:
print_error("Fail to load json file: ", json_file)
continue
# collect information from each target's export file and merge them together:
update_coverage(coverage_records, interested_folders, platform)
def update_coverage(
coverage_records: List[CoverageRecord],
interested_folders: List[str],
platform: TestPlatform,
) -> None:
for item in coverage_records:
# extract information for the record
record = item.to_dict()
file_path = record["filepath"]
if not is_intrested_file(file_path, interested_folders, platform):
continue
covered_range = record["covered_lines"]
uncovered_range = record["uncovered_lines"]
# transform file name: remote/13223/caffe2/aten -> caffe2/aten
file_path = transform_file_name(file_path, interested_folders, platform)
# if file not exists, add it into dictionary
if file_path not in covered_lines:
covered_lines[file_path] = set()
if file_path not in uncovered_lines:
uncovered_lines[file_path] = set()
# update this file's covered and uncovered lines
if covered_range is not None:
covered_lines[file_path].update(covered_range)
if uncovered_range is not None:
uncovered_lines[file_path].update(uncovered_range)
def update_set() -> None:
for file_name in covered_lines:
# difference_update
uncovered_lines[file_name].difference_update(covered_lines[file_name])
def summarize_jsons(
test_list: TestList,
interested_folders: List[str],
coverage_only: List[str],
platform: TestPlatform,
) -> None:
start_time = time.time()
if detect_compiler_type(platform) == CompilerType.GCC:
html_oriented_report()
else:
parse_jsons(test_list, interested_folders, platform)
update_set()
line_oriented_report(
test_list,
tests_type,
interested_folders,
coverage_only,
covered_lines,
uncovered_lines,
)
file_oriented_report(
test_list,
tests_type,
interested_folders,
coverage_only,
covered_lines,
uncovered_lines,
)
print_time("summary jsons take time: ", start_time)
|