File: rdb_wrapper.py

package info (click to toggle)
chromium 138.0.7204.183-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,071,908 kB
  • sloc: cpp: 34,937,088; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,971; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (93 lines) | stat: -rw-r--r-- 3,253 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env vpython3
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import contextlib
import json
import os
import requests

# Constants describing TestStatus for ResultDB
STATUS_PASS = 'PASS'
STATUS_FAIL = 'FAIL'
STATUS_CRASH = 'CRASH'
STATUS_ABORT = 'ABORT'
STATUS_SKIP = 'SKIP'

# ResultDB limits failure reasons to 1024 characters.
_FAILURE_REASON_LENGTH_LIMIT = 1024

# Message to use at the end of a truncated failure reason.
_FAILURE_REASON_TRUNCATE_TEXT = '\n...\nFailure reason was truncated.'


class ResultSink(object):
    def __init__(self, session, url, prefix):
        self._session = session
        self._url = url
        self._prefix = prefix

    def report(self, function_name, status, elapsed_time, failure_reason=None):
        """Reports the result and elapsed time of a presubmit function call.

        Args:
            function_name (str): The name of the presubmit function
            status: the status to report the function call with
            elapsed_time: the time taken to invoke the presubmit function
            failure_reason (str or None): if set, the failure reason
        """
        tr = {
            'testId': self._prefix + function_name,
            'status': status,
            'expected': status == STATUS_PASS,
            'duration': '{:.9f}s'.format(elapsed_time)
        }
        if failure_reason:
            if len(failure_reason) > _FAILURE_REASON_LENGTH_LIMIT:
                failure_reason = failure_reason[:-len(
                    _FAILURE_REASON_TRUNCATE_TEXT) - 1]
                failure_reason += _FAILURE_REASON_TRUNCATE_TEXT
            tr['failureReason'] = {'primaryErrorMessage': failure_reason}
        self._session.post(self._url, json={'testResults': [tr]})


@contextlib.contextmanager
def client(prefix):
    """Returns a client for ResultSink.

    This is a context manager that returns a client for ResultSink,
    if LUCI_CONTEXT with a section of result_sink is present. When the context
    is closed, all the connetions to the SinkServer are closed.

    Args:
        prefix: A prefix to be added to the test ID of reported function names.
        The format for this is
            presubmit:gerrit_host/folder/to/repo:path/to/file/
        for example,
            presubmit:chromium-review.googlesource.com/chromium/src/:services/viz/  # pylint: disable=line-too-long
    Returns:
        An instance of ResultSink() if the luci context is present. None,
        otherwise.
    """
    luci_ctx = os.environ.get('LUCI_CONTEXT')
    if not luci_ctx:
        yield None
        return

    sink_ctx = None
    with open(luci_ctx) as f:
        sink_ctx = json.load(f).get('result_sink')
        if not sink_ctx:
            yield None
            return

    url = 'http://{0}/prpc/luci.resultsink.v1.Sink/ReportTestResults'.format(
        sink_ctx['address'])
    with requests.Session() as s:
        s.headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'Authorization': 'ResultSink {0}'.format(sink_ctx['auth_token'])
        }
        yield ResultSink(s, url, prefix)