File: util.py

package info (click to toggle)
chromium 139.0.7258.138-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,120,676 kB
  • sloc: cpp: 35,100,869; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (160 lines) | stat: -rw-r--r-- 5,251 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import calendar
import datetime
import json
import logging
import os

import requests  # pylint: disable=import-error

import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool

import sys

TELEMETRY_TEST_PATH_FORMAT = 'telemetry'
GTEST_TEST_PATH_FORMAT = 'gtest'


def ApplyInParallel(function, work_list, on_failure=None):
  """Apply a function to all values in work_list in parallel.

  Args:
    function: A function with one argument.
    work_list: Any iterable with arguments for the function.
    on_failure: A function to run in case of a failure.
  """
  if not work_list:
    return

  try:
    # Note that this is speculatively halved as an attempt to fix
    # crbug.com/953365.
    cpu_count = multiprocessing.cpu_count() // 2
    if sys.platform == 'win32':
      # TODO(crbug.com/40755900) - we can't use more than 56
      # cores on Windows or Python3 may hang.
      cpu_count = min(cpu_count, 56)

  except NotImplementedError:
    # Some platforms can raise a NotImplementedError from cpu_count()
    logging.warning('cpu_count() not implemented.')
    cpu_count = 4
  pool = ThreadPool(min(cpu_count, len(work_list)))

  def function_with_try(arg):
    try:
      function(arg)
    except Exception:  # pylint: disable=broad-except
      # logging exception here is the only way to get a stack trace since
      # multiprocessing's pool implementation does not save that data. See
      # crbug.com/953365.
      logging.exception('Exception while running %s' % function.__name__)
      if on_failure:
        on_failure(arg)

  try:
    pool.imap_unordered(function_with_try, work_list)
    pool.close()
    pool.join()
  finally:
    pool.terminate()


def SplitTestPath(test_result, test_path_format):
  """ Split a test path into test suite name and test case name.

  Telemetry and Gtest have slightly different test path formats.
  Telemetry uses '{benchmark_name}/{story_name}', e.g.
  'system_health.common_desktop/load:news:cnn:2020'.
  Gtest uses '{test_suite_name}.{test_case_name}', e.g.
  'ZeroToFiveSequence/LuciTestResultParameterizedTest.Variant'
  """
  if test_path_format == TELEMETRY_TEST_PATH_FORMAT:
    separator = '/'
  elif test_path_format == GTEST_TEST_PATH_FORMAT:
    separator = '.'
  else:
    raise ValueError('Unknown test path format: %s' % test_path_format)

  test_path = test_result['testPath']
  if separator not in test_path:
    raise ValueError('Invalid test path: %s' % test_path)

  return test_path.split(separator, 1)


def IsoTimestampToEpoch(timestamp):
  """Convert ISO formatted time to seconds since epoch."""
  try:
    dt = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
  except ValueError:
    dt = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
  return calendar.timegm(dt.timetuple()) + dt.microsecond / 1e6


def SetUnexpectedFailure(test_result):
  """Update fields of a test result in a case of processing failure."""
  test_result['status'] = 'FAIL'
  test_result['expected'] = False
  logging.error('Processing failed for test %s', test_result['testPath'])


def TryUploadingResultToResultSink(results):
  def buildSummaryHtml(artifacts):
    # Using test log as the summary. It is stored in an artifact named logs.txt.
    if 'logs.txt' in artifacts:
      summary_html = '<p><text-artifact artifact-id="logs.txt"></p>'
    else:
      summary_html = ''
    return summary_html

  def buildArtifacts(artifacts):
    artifacts_result = {}
    for artifact_id, artifact in artifacts.items():
      artifacts_result[artifact_id] = {'filePath': artifact['filePath']}
    return artifacts_result

  def parse(results):
    test_results = []
    for test_case in results:
      test_result = {
          'testId': test_case['testPath'],
          'expected': test_case['expected'],
          'status': test_case['status']
      }
      # TODO: go/result-sink#test-result-json-object listed that specifying
      # testMetadata with location info can helped with breaking down flaky
      # tests. We don't have the file location currently in test results.
      if 'runDuration' in test_case:
        test_result['duration'] = '%.9fs' % float(
            test_case['runDuration'].rstrip('s'))
      if 'tags' in test_case:
        test_result['tags'] = test_case['tags']
      if 'outputArtifacts' in test_case:
        test_result['summaryHtml'] = buildSummaryHtml(
            test_case['outputArtifacts'])
        test_result['artifacts'] = buildArtifacts(test_case['outputArtifacts'])
      test_results.append(test_result)
    return test_results

  try:
    with open(os.environ['LUCI_CONTEXT']) as f:
      sink = json.load(f)['result_sink']
  except KeyError:
    return

  test_results = parse(results)
  res = requests.post(
      url='http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' %
      sink['address'],
      headers={
          'Content-Type': 'application/json',
          'Accept': 'application/json',
          'Authorization': 'ResultSink %s' % sink['auth_token'],
      },
      data=json.dumps({'testResults': test_results}))
  res.raise_for_status()