1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import calendar
import datetime
import json
import logging
import os
import requests # pylint: disable=import-error
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
import sys
TELEMETRY_TEST_PATH_FORMAT = 'telemetry'
GTEST_TEST_PATH_FORMAT = 'gtest'
def ApplyInParallel(function, work_list, on_failure=None):
"""Apply a function to all values in work_list in parallel.
Args:
function: A function with one argument.
work_list: Any iterable with arguments for the function.
on_failure: A function to run in case of a failure.
"""
if not work_list:
return
try:
# Note that this is speculatively halved as an attempt to fix
# crbug.com/953365.
cpu_count = multiprocessing.cpu_count() // 2
if sys.platform == 'win32':
# TODO(crbug.com/40755900) - we can't use more than 56
# cores on Windows or Python3 may hang.
cpu_count = min(cpu_count, 56)
except NotImplementedError:
# Some platforms can raise a NotImplementedError from cpu_count()
logging.warning('cpu_count() not implemented.')
cpu_count = 4
pool = ThreadPool(min(cpu_count, len(work_list)))
def function_with_try(arg):
try:
function(arg)
except Exception: # pylint: disable=broad-except
# logging exception here is the only way to get a stack trace since
# multiprocessing's pool implementation does not save that data. See
# crbug.com/953365.
logging.exception('Exception while running %s' % function.__name__)
if on_failure:
on_failure(arg)
try:
pool.imap_unordered(function_with_try, work_list)
pool.close()
pool.join()
finally:
pool.terminate()
def SplitTestPath(test_result, test_path_format):
""" Split a test path into test suite name and test case name.
Telemetry and Gtest have slightly different test path formats.
Telemetry uses '{benchmark_name}/{story_name}', e.g.
'system_health.common_desktop/load:news:cnn:2020'.
Gtest uses '{test_suite_name}.{test_case_name}', e.g.
'ZeroToFiveSequence/LuciTestResultParameterizedTest.Variant'
"""
if test_path_format == TELEMETRY_TEST_PATH_FORMAT:
separator = '/'
elif test_path_format == GTEST_TEST_PATH_FORMAT:
separator = '.'
else:
raise ValueError('Unknown test path format: %s' % test_path_format)
test_path = test_result['testPath']
if separator not in test_path:
raise ValueError('Invalid test path: %s' % test_path)
return test_path.split(separator, 1)
def IsoTimestampToEpoch(timestamp):
"""Convert ISO formatted time to seconds since epoch."""
try:
dt = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
except ValueError:
dt = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
return calendar.timegm(dt.timetuple()) + dt.microsecond / 1e6
def SetUnexpectedFailure(test_result):
"""Update fields of a test result in a case of processing failure."""
test_result['status'] = 'FAIL'
test_result['expected'] = False
logging.error('Processing failed for test %s', test_result['testPath'])
def TryUploadingResultToResultSink(results):
def buildSummaryHtml(artifacts):
# Using test log as the summary. It is stored in an artifact named logs.txt.
if 'logs.txt' in artifacts:
summary_html = '<p><text-artifact artifact-id="logs.txt"></p>'
else:
summary_html = ''
return summary_html
def buildArtifacts(artifacts):
artifacts_result = {}
for artifact_id, artifact in artifacts.items():
artifacts_result[artifact_id] = {'filePath': artifact['filePath']}
return artifacts_result
def parse(results):
test_results = []
for test_case in results:
test_result = {
'testId': test_case['testPath'],
'expected': test_case['expected'],
'status': test_case['status']
}
# TODO: go/result-sink#test-result-json-object listed that specifying
# testMetadata with location info can helped with breaking down flaky
# tests. We don't have the file location currently in test results.
if 'runDuration' in test_case:
test_result['duration'] = '%.9fs' % float(
test_case['runDuration'].rstrip('s'))
if 'tags' in test_case:
test_result['tags'] = test_case['tags']
if 'outputArtifacts' in test_case:
test_result['summaryHtml'] = buildSummaryHtml(
test_case['outputArtifacts'])
test_result['artifacts'] = buildArtifacts(test_case['outputArtifacts'])
test_results.append(test_result)
return test_results
try:
with open(os.environ['LUCI_CONTEXT']) as f:
sink = json.load(f)['result_sink']
except KeyError:
return
test_results = parse(results)
res = requests.post(
url='http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' %
sink['address'],
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink %s' % sink['auth_token'],
},
data=json.dumps({'testResults': test_results}))
res.raise_for_status()
|