File: logdog_wrapper.py

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (171 lines) | stat: -rwxr-xr-x 5,715 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env vpython3
# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper for adding logdog streaming support to swarming tasks."""

import argparse
import contextlib
import json
import logging
import os
import shutil
import signal
import subprocess
import sys
import tempfile

_SRC_PATH = os.path.abspath(os.path.join(
    os.path.dirname(__file__), '..', '..', '..'))
sys.path.append(os.path.join(_SRC_PATH, 'third_party', 'catapult', 'devil'))
sys.path.append(os.path.join(_SRC_PATH, 'third_party', 'catapult', 'common',
                             'py_utils'))

from devil.utils import signal_handler
from devil.utils import timeout_retry

OUTPUT = 'logdog'
COORDINATOR_HOST = 'luci-logdog.appspot.com'
LOGDOG_TERMINATION_TIMEOUT = 30


def CommandParser():
  # Parses the command line arguments being passed in
  parser = argparse.ArgumentParser(allow_abbrev=False)
  wrapped = parser.add_mutually_exclusive_group()
  wrapped.add_argument(
      '--target',
      help='The test target to be run. If neither target nor script are set,'
      ' any extra args passed to this script are assumed to be the'
      ' full test command to run.')
  wrapped.add_argument(
      '--script',
      help='The script target to be run. If neither target nor script are set,'
      ' any extra args passed to this script are assumed to be the'
      ' full test command to run.')
  parser.add_argument('--logdog-bin-cmd',
                      help='Location of the logdog butler binary. Will attempt '
                      'to find it on PATH if not specified. If not found, this '
                      'script will be a no-op and simply passthrough to the '
                      'test command.')
  return parser


def CreateStopTestsMethod(proc):
  def StopTests(signum, _frame):
    logging.error('Forwarding signal %s to test process', str(signum))
    proc.send_signal(signum)
  return StopTests


@contextlib.contextmanager
def NoLeakingProcesses(popen):
  try:
    yield popen
  finally:
    if popen is not None:
      try:
        if popen.poll() is None:
          popen.kill()
      except OSError:
        logging.warning('Failed to kill %s. Process may be leaked.',
                        str(popen.pid))


def GetProjectFromLuciContext():
  """Return the "project" from LUCI_CONTEXT.

  LUCI_CONTEXT contains a section "realm.name" whose value follows the format
  "<project>:<realm>". This method parses and return the "project" part.

  Fallback to "chromium" if realm name is None
  """
  project = 'chromium'
  ctx_path = os.environ.get('LUCI_CONTEXT')
  if ctx_path:
    try:
      with open(ctx_path) as f:
        luci_ctx = json.load(f)
        realm_name = luci_ctx.get('realm', {}).get('name')
        if realm_name:
          project = realm_name.split(':')[0]
    except (OSError, IOError, ValueError):
      pass
  return project


def main():
  parser = CommandParser()
  args, extra_cmd_args = parser.parse_known_args(sys.argv[1:])

  logging.basicConfig(level=logging.INFO)
  if args.target:
    test_cmd = [os.path.join('bin', 'run_%s' % args.target), '-v']
    test_cmd += extra_cmd_args
  elif args.script:
    test_cmd = [args.script]
    test_cmd += extra_cmd_args
  else:
    test_cmd = extra_cmd_args

  test_env = dict(os.environ)
  logdog_cmd = []
  logdog_butler_bin = args.logdog_bin_cmd
  if os.environ.get('SWARMING_TASK_ID'):
    logdog_butler_bin = logdog_butler_bin or shutil.which('logdog_butler')
    if not logdog_butler_bin or not os.path.exists(logdog_butler_bin):
      parser.error('Either --logdog-bin-cmd must be specified and valid or '
                   '"logdog_butler" must be on PATH if running on swarming.')

  with tempfile.TemporaryDirectory() as temp_directory:
    if logdog_butler_bin:
      if os.path.exists(logdog_butler_bin):
        streamserver_uri = 'unix:%s' % os.path.join(temp_directory,
                                                    'butler.sock')
        prefix = os.path.join('android', 'swarming', 'logcats',
                              os.environ.get('SWARMING_TASK_ID', ""))
        project = GetProjectFromLuciContext()

        logdog_cmd = [
            logdog_butler_bin, '-project', project, '-output', OUTPUT,
            '-prefix', prefix, '-coordinator-host', COORDINATOR_HOST, 'serve',
            '-streamserver-uri', streamserver_uri
        ]
        test_env.update({
            'LOGDOG_STREAM_PROJECT': project,
            'LOGDOG_STREAM_PREFIX': prefix,
            'LOGDOG_STREAM_SERVER_PATH': streamserver_uri,
            'LOGDOG_COORDINATOR_HOST': COORDINATOR_HOST,
        })
      else:
        logging.warning('--logdog-bin-cmd specified, but binary was not found.'
                        ' Will not be using logdog butler server.')

    logdog_proc = None
    if logdog_cmd:
      logdog_proc = subprocess.Popen(logdog_cmd)

    with NoLeakingProcesses(logdog_proc):
      with NoLeakingProcesses(
          subprocess.Popen(test_cmd, env=test_env)) as test_proc:
        with signal_handler.SignalHandler(signal.SIGTERM,
                                          CreateStopTestsMethod(test_proc)):
          result = test_proc.wait()
          if logdog_proc:
            def logdog_stopped():
              return logdog_proc.poll() is not None

            logdog_proc.terminate()
            timeout_retry.WaitFor(logdog_stopped, wait_period=1,
                                  max_tries=LOGDOG_TERMINATION_TIMEOUT)

            # If logdog_proc hasn't finished by this point, allow
            # NoLeakingProcesses to kill it.


  return result


if __name__ == '__main__':
  sys.exit(main())