File: upload_metrics.py

package info (click to toggle)
pytorch-cuda 2.6.0%2Bdfsg-7
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 161,620 kB
  • sloc: python: 1,278,832; cpp: 900,322; ansic: 82,710; asm: 7,754; java: 3,363; sh: 2,811; javascript: 2,443; makefile: 597; ruby: 195; xml: 84; objc: 68
file content (159 lines) | stat: -rw-r--r-- 5,833 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from __future__ import annotations

import datetime
import inspect
import os
import time
import uuid
from datetime import timezone
from typing import Any
from warnings import warn


# boto3 is an optional dependency. If it's not installed,
# we'll just not emit the metrics.
# Keeping this logic here so that callers don't have to
# worry about it.
EMIT_METRICS = False
try:
    from tools.stats.upload_stats_lib import upload_to_s3

    EMIT_METRICS = True
except ImportError as e:
    print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}")


class EnvVarMetric:
    name: str
    env_var: str
    required: bool = True
    # Used to cast the value of the env_var to the correct type (defaults to str)
    type_conversion_fn: Any = None

    def __init__(
        self,
        name: str,
        env_var: str,
        required: bool = True,
        type_conversion_fn: Any = None,
    ) -> None:
        self.name = name
        self.env_var = env_var
        self.required = required
        self.type_conversion_fn = type_conversion_fn

    def value(self) -> Any:
        value = os.environ.get(self.env_var)

        # Github CI will set some env vars to an empty string
        DEFAULT_ENVVAR_VALUES = [None, ""]
        if value in DEFAULT_ENVVAR_VALUES:
            if not self.required:
                return None

            raise ValueError(
                f"Missing {self.name}. Please set the {self.env_var} "
                "environment variable to pass in this value."
            )

        if self.type_conversion_fn:
            return self.type_conversion_fn(value)
        return value


global_metrics: dict[str, Any] = {}


def add_global_metric(metric_name: str, metric_value: Any) -> None:
    """
    Adds stats that should be emitted with every metric by the current process.
    If the emit_metrics method specifies a metric with the same name, it will
    overwrite this value.
    """
    global_metrics[metric_name] = metric_value


def emit_metric(
    metric_name: str,
    metrics: dict[str, Any],
) -> None:
    """
    Upload a metric to DynamoDB (and from there, the HUD backend database).

    Even if EMIT_METRICS is set to False, this function will still run the code to
    validate and shape the metrics, skipping just the upload.

    Parameters:
        metric_name:
            Name of the metric. Every unique metric should have a different name
            and be emitted just once per run attempt.
            Metrics are namespaced by their module and the function that emitted them.
        metrics: The actual data to record.

    Some default values are populated from environment variables, which must be set
    for metrics to be emitted. (If they're not set, this function becomes a noop):
    """

    if metrics is None:
        raise ValueError("You didn't ask to upload any metrics!")

    # Merge the given metrics with the global metrics, overwriting any duplicates
    # with the given metrics.
    metrics = {**global_metrics, **metrics}

    # We use these env vars that to determine basic info about the workflow run.
    # By using env vars, we don't have to pass this info around to every function.
    # It also helps ensure that we only emit metrics during CI
    env_var_metrics = [
        EnvVarMetric("repo", "GITHUB_REPOSITORY"),
        EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
        EnvVarMetric("build_environment", "BUILD_ENVIRONMENT", required=False),
        EnvVarMetric("job", "GITHUB_JOB"),
        EnvVarMetric("test_config", "TEST_CONFIG", required=False),
        EnvVarMetric("pr_number", "PR_NUMBER", required=False, type_conversion_fn=int),
        EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
        EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
        EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
        EnvVarMetric("job_id", "JOB_ID", type_conversion_fn=int),
        EnvVarMetric("job_name", "JOB_NAME"),
    ]

    # Use info about the function that invoked this one as a namespace and a way to filter metrics.
    calling_frame = inspect.currentframe().f_back  # type: ignore[union-attr]
    calling_frame_info = inspect.getframeinfo(calling_frame)  # type: ignore[arg-type]
    calling_file = os.path.basename(calling_frame_info.filename)
    calling_module = inspect.getmodule(calling_frame).__name__  # type: ignore[union-attr]
    calling_function = calling_frame_info.function

    try:
        default_metrics = {
            "metric_name": metric_name,
            "calling_file": calling_file,
            "calling_module": calling_module,
            "calling_function": calling_function,
            "timestamp": datetime.datetime.now(timezone.utc).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
            **{m.name: m.value() for m in env_var_metrics if m.value()},
        }
    except ValueError as e:
        warn(f"Not emitting metrics for {metric_name}. {e}")
        return

    # Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
    s3_key = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"

    if EMIT_METRICS:
        try:
            upload_to_s3(
                bucket_name="ossci-raw-job-status",
                key=f"ossci_uploaded_metrics/{s3_key}",
                docs=[{**default_metrics, "info": metrics}],
            )
        except Exception as e:
            # We don't want to fail the job if we can't upload the metric.
            # We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
            warn(f"Error uploading metric {metric_name} to DynamoDB: {e}")
            return
    else:
        print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.")