File: output_adapter.py

package info (click to toggle)
chromium 138.0.7204.157-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,071,864 kB
  • sloc: cpp: 34,936,859; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,967; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (231 lines) | stat: -rw-r--r-- 9,834 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Adapter for printing legacy recipe output

TODO(https://crbug.com/326904531): This file is intended to be a temporary
workaround and should be replaced once this bug is resolved"""

import json
import logging
import os
import re
import sys

# Create a logger that avoids rich formatting as we can't control recipe
# formatting from here
basic_logger = logging.getLogger('basic_logger')
basic_logger.addHandler(logging.StreamHandler(sys.stdout))
basic_logger.propagate = False

class PassthroughAdapter:
  """Doesn't filter anything, just logs everything from the recipe run."""

  def ProcessLine(self, line):
    basic_logger.log(logging.DEBUG, line)


class LegacyOutputAdapter:
  """Interprets the legacy recipe run mode output to logging

  This will filter, route and in some cases reformat the output to trace levels
  of logging. This will cause specific output (e.g. unfiltered step names) to
  always print to std out or when -v is passed, the stdout will additionally
  be passed to the logging stdout. Note -vv will cause PassthroughAdapter to
  interpret results"""

  SEED_STEP_TEXT = '@@@SEED_STEP@'
  STEP_CLOSED_TEXT = '@@@STEP_CLOSED@@@'
  ANNOTATOR_PREFIX_SUFIX = '@@@'
  TRIGGER_STEP_PREFIX = 'test_pre_run.[trigger] '
  TRIGGER_LINK_TEXT = '@@@STEP_LINK@task UI:'
  # Special sub-log names added by the UTR recipe to surface to users.
  UTR_LOG_NAME = 'utr_log'

  def __init__(self):
    self._trigger_link_re = re.compile(r'.+@(https://.+)@@@$')
    self._ninja_status_re = re.compile(r'\[(\d+)\/(\d+)\]')
    self._collect_wait_re = re.compile(
        r'.+prpc call (.+) swarming.v2.Tasks.ListTaskStates, stdin: '
        r'(\{"task_id": .+\})$'
    )
    self._result_links_re = re.compile(
        r'@@@STEP_LINK@shard (#\d+) test results@(https://[^@]+)@@@')

    self._current_proccess_fn = self._StepNameProcessLine
    # The first match is used. This allows us to filter parent steps while still
    # printing child steps by adding the child step name first. By default
    # _StepNameProcessLine will be used which prints the step name and it's
    # stdout
    self._step_to_processors = {
        'compile': self._ProcessCompileLine,
        'reclient compile': self._ProcessCompileLine,
        'test_pre_run.[trigger] ': self._ProcessTriggerLine,
        'collect tasks.wait for tasks': self._ProcessCollectLine,
        'download compilation outputs': self._PrintOnlyStepName,
    }
    # The first match is used. This allows us to filter parent steps while still
    # printing child steps by adding the child step name first. By default INFO
    # will be used which prints in non-verbose mode (i.e. no -v flag)
    self._step_to_log_level = {
        'lookup_builder_gn_args': logging.DEBUG,
        'git rev-parse': logging.DEBUG,
        'git diff to instrument': logging.DEBUG,
        'save paths of affected files': logging.DEBUG,
        'preprocess for reclient.start reproxy via bootstrap': logging.INFO,
        'preprocess for reclient': logging.DEBUG,
        'process clang crashes': logging.DEBUG,
        'compile confirm no-op': logging.DEBUG,
        'postprocess for reclient': logging.DEBUG,
        'setup_build': logging.DEBUG,
        'get compile targets for scripts': logging.DEBUG,
        'lookup GN args': logging.DEBUG,
        'install infra/tools/luci/isolate': logging.DEBUG,
        'find command lines': logging.DEBUG,
        'test_pre_run.install infra/tools/luci/swarming': logging.DEBUG,
        'isolate tests': logging.DEBUG,
        'read GN args': logging.DEBUG,
        'test_pre_run.[trigger] ': logging.INFO,
        'test_pre_run.': logging.DEBUG,
        'collect tasks.wait for tasks': logging.INFO,
        'collect tasks': logging.DEBUG,
        '$debug - all results': logging.DEBUG,
        'Test statistics': logging.DEBUG,
        'read gclient': logging.DEBUG,
        'write output_properties_file': logging.DEBUG,
        'prepare skylab tests.': logging.DEBUG,
        'update invocation instructions': logging.DEBUG,
    }
    # Setup logger for printing to the same line
    logger = logging.getLogger('single_line_logger')
    handler = logging.StreamHandler(sys.stdout)
    handler.terminator = ''
    logger.addHandler(handler)
    logger.propagate = False

    self._last_line = ''
    self._last_line_teriminal_lines = 0
    self._current_log_level = logging.DEBUG
    self._single_line_logger = logger
    self._terminal_columns, _ = os.get_terminal_size()
    self._current_step_name = ''
    self._dot_count = 0

  def _PrintCurrentStepName(self, log_level):
    logging.log(log_level, '\n[cyan]Running: %s[/]', self._current_step_name)

  def _StdoutProcessLine(self, line):
    # Pass through any non-engine or utr-log text.
    if line.startswith(f'@@@STEP_LOG_LINE@{self.UTR_LOG_NAME}@'):
      # '-3' corresponds to the trailing @@@ on every sub-log line.
      line = line[len(f'@@@STEP_LOG_LINE@{self.UTR_LOG_NAME}@'):-3]
    if line.startswith(self.ANNOTATOR_PREFIX_SUFIX):
      return
    is_urlish = re.match(r'^http[s]?://\S+$', line)
    if is_urlish:
      logging.log(self._current_log_level, line)
    else:
      basic_logger.log(self._current_log_level, line)

  def _StepNameProcessLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      # Always print the step name to info
      self._PrintCurrentStepName(self._current_log_level)
      return
    self._StdoutProcessLine(line)

  def _PrintOnlyStepName(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      self._PrintCurrentStepName(logging.INFO)

  def _ProcessTriggerLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT + self.TRIGGER_STEP_PREFIX):
      # The step names for tests don't have any identifying keywords so the
      # result step parsers need to be installed at trigger time
      test_name = line[len(self.SEED_STEP_TEXT +
                           self.TRIGGER_STEP_PREFIX):line.index(' (') if ' (' in
                       line else -len(self.ANNOTATOR_PREFIX_SUFIX)]
      self._step_to_processors[test_name] = self._ProcessResult
      self._step_to_log_level[test_name] = logging.DEBUG
    elif line.startswith(self.TRIGGER_LINK_TEXT):
      matches = self._trigger_link_re.match(line)
      if matches:
        task_name = self._current_step_name[len(self.TRIGGER_STEP_PREFIX):]
        basic_logger.log(self._current_log_level,
                         f'Triggered {task_name}: ' + matches[1])
    else:
      self._StdoutProcessLine(line)

  def _ProcessCompileLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      self._PrintCurrentStepName(logging.INFO)
      return
    matches = self._ninja_status_re.match(line)
    if matches:
      # Remove the last line which might be multiple on the terminal
      self._single_line_logger.log(self._current_log_level, '\33[2K')
      if self._last_line_teriminal_lines > 1:
        for _ in range(self._last_line_teriminal_lines - 1):
          self._single_line_logger.log(self._current_log_level, '\33[A\33[2K')
      self._single_line_logger.log(self._current_log_level, '\r' + line)
      self._single_line_logger.handlers[0].flush()
      return
    if self._last_line.startswith('['):
      basic_logger.log(self._current_log_level, '')
    self._StdoutProcessLine(line)

  def _ProcessCollectLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      self._PrintCurrentStepName(logging.INFO)
    matches = self._collect_wait_re.match(line)
    if matches:
      task_ids = json.loads(matches[2])['task_id']
      self._dot_count = (self._dot_count % 5) + 1
      self._single_line_logger.log(
          self._current_log_level,
          f'\33[2K\rStill waiting on: {len(task_ids)} shard(s)' +
          '.' * self._dot_count)
      return
    if line == self.STEP_CLOSED_TEXT:
      self._single_line_logger.log(self._current_log_level,
                                   '\33[2K\rStill waiting on: 0 shard(s)...')
      basic_logger.log(self._current_log_level, '')

  def _ProcessResult(self, line):
    matches = self._result_links_re.match(line)
    if matches:
      basic_logger.log(self._current_log_level,
                       'Test results for %s shard %s: %s',
                       self._current_step_name, matches[1], matches[2])

  def ProcessLine(self, line):
    # If we're in a new step see if it needs to be parsed differently
    if line.startswith(self.SEED_STEP_TEXT):
      self._current_step_name = line[len(self.SEED_STEP_TEXT
                                         ):-len(self.ANNOTATOR_PREFIX_SUFIX)]
      self._current_proccess_fn = self._get_processor(self._current_step_name)
      self._current_log_level = self._get_log_level(self._current_step_name)
    self._current_proccess_fn(line)
    self._last_line = line
    self._last_line_teriminal_lines = int(
        (len(line) - 1) / self._terminal_columns) + 1
    if line.startswith(self.STEP_CLOSED_TEXT):
      # Text outside of steps will use the last processor otherwise
      self._current_log_level = logging.DEBUG
      _current_proccess_fn = self._StepNameProcessLine

  def _get_processor(self, step_name):
    if step_name in self._step_to_processors:
      return self._step_to_processors[step_name]
    for match_name in self._step_to_processors:
      if step_name.startswith(match_name):
        return self._step_to_processors[match_name]
    return self._StepNameProcessLine

  def _get_log_level(self, step_name):
    if step_name in self._step_to_log_level:
      return self._step_to_log_level[step_name]
    for match_name in self._step_to_log_level:
      if step_name.startswith(match_name):
        return self._step_to_log_level[match_name]
    return logging.INFO