1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for working with BigQuery results."""
import collections
import datetime
import os
from collections import defaultdict
from typing import List, Tuple
# //testing imports.
from flake_suppressor_common import common_typing as ct
from flake_suppressor_common import data_types
from flake_suppressor_common import expectations
from flake_suppressor_common import tag_utils
from unexpected_passes_common import expectations as unexpected_expectations
# //third_party/catapult/third_party/typ imports.
from typ import expectations_parser
class ResultProcessor():
def __init__(self, expectations_processor: expectations.ExpectationProcessor):
self._expectations_processor = expectations_processor
def AggregateResults(self,
results: ct.QueryJsonType) -> ct.AggregatedResultsType:
"""Aggregates BigQuery results.
Also filters out any results that have already been suppressed.
Args:
results: Parsed JSON results from a BigQuery query.
Returns:
A map in the following format:
{
'test_suite': {
'test_name': {
'typ_tags_as_tuple': [ 'list', 'of', 'urls' ],
},
},
}
"""
results = self._ConvertJsonResultsToResultObjects(results)
results = self._FilterOutSuppressedResults(results)
aggregated_results = {}
for r in results:
build_url = 'http://ci.chromium.org/b/%s' % r.build_id
build_url_list = aggregated_results.setdefault(r.suite, {}).setdefault(
r.test, {}).setdefault(r.tags, [])
build_url_list.append(build_url)
return aggregated_results
def AggregateTestStatusResults(
self, results: ct.QueryJsonType) -> ct.AggregatedStatusResultsType:
"""Aggregates BigQuery results.
Also filters out any results that have already been suppressed.
Args:
results: Parsed JSON results from a BigQuery query.
Returns:
A map in the following format:
{
'test_suite': {
'test_name': {
('typ', 'tags', 'as', 'tuple'):
[ (status, url, date, is_slow, typ_expectations),
(status, url, date, is_slow, typ_expectations) ],
},
},
}
"""
results = self._ConvertJsonResultsToResultObjects(results)
results = self._FilterOutSuppressedResults(results)
aggregated_results = defaultdict(
lambda: defaultdict(lambda: defaultdict(list)))
for r in results:
build_url = 'http://ci.chromium.org/b/%s' % r.build_id
aggregated_results[r.suite][r.test][r.tags].append(
ct.ResultTupleType(r.status, build_url, r.date, r.is_slow,
r.typ_expectations))
return aggregated_results
def _ConvertJsonResultsToResultObjects(self, results: ct.QueryJsonType
) -> List[data_types.Result]:
"""Converts JSON BigQuery results to data_types.Result objects.
Args:
results: Parsed JSON results from a BigQuery query
Returns:
The contents of |results| as a list of data_types.Result objects.
"""
object_results = []
for r in results:
suite, test_name = self.GetTestSuiteAndNameFromResultDbName(r['name'])
build_id = r['id'].split('-')[-1]
typ_tags = tuple(tag_utils.TagUtils.RemoveIgnoredTags(r['typ_tags']))
status = None
date = None
is_slow = None
typ_expectations = None
if 'status' in r:
status = r['status']
if 'date' in r:
date = datetime.date.fromisoformat(r['date'])
if 'is_slow' in r:
is_slow = r['is_slow']
if 'typ_expectations' in r:
typ_expectations = r['typ_expectations']
object_results.append(
data_types.Result(suite, test_name, typ_tags, build_id, status, date,
is_slow, typ_expectations))
return object_results
def _FilterOutSuppressedResults(self, results: List[data_types.Result]
) -> List[data_types.Result]:
"""Filters out results that have already been suppressed in the repo.
Args:
results: A list of data_types.Result objects.
Returns:
|results| with any already-suppressed failures removed.
"""
# Get all the expectations.
origin_expectation_contents = (
self._expectations_processor.GetLocalCheckoutExpectationFileContents())
origin_expectations = collections.defaultdict(list)
for filename, contents in origin_expectation_contents.items():
list_parser = expectations_parser.TaggedTestListParser(contents)
for e in list_parser.expectations:
wildcard_type = (
unexpected_expectations.WildcardTypeFromTypExpectation(e))
expectation = data_types.Expectation(e.test, e.tags, e.raw_results,
wildcard_type, e.reason)
origin_expectations[filename].append(expectation)
# Discard any results that already have a matching expectation.
kept_results = []
for r in results:
expectation_filename = (
self._expectations_processor.GetExpectationFileForSuite(
r.suite, r.tags))
expectation_filename = os.path.basename(expectation_filename)
should_keep = True
for e in origin_expectations[expectation_filename]:
if e.AppliesToResult(r):
should_keep = False
break
if should_keep:
kept_results.append(r)
return kept_results
def GetTestSuiteAndNameFromResultDbName(self, result_db_name: str
) -> Tuple[str, str]:
raise NotImplementedError
|