File: pace.py

package info (click to toggle)
nvidia-cuda-toolkit 12.4.1-3
  • links: PTS, VCS
  • area: non-free
  • in suites: forky, sid
  • size: 18,505,836 kB
  • sloc: ansic: 203,477; cpp: 64,769; python: 34,699; javascript: 22,006; xml: 13,410; makefile: 3,085; sh: 2,343; perl: 352
file content (95 lines) | stat: -rw-r--r-- 3,573 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.

import pandas as pd

from collections import namedtuple

from nsys_recipe.lib import summary

PaceInfo = namedtuple("PaceInfo", ["filename", "pace_df", "stats_df", "session_start"])


def get_session_start_time(session_start_df):
    return session_start_df.at[0, "utcEpochNs"]


def filter_by_pace_name(range_df, pace_col, pace_name):
    filtered_range_df = range_df[range_df[pace_col] == pace_name]
    return filtered_range_df.reset_index(drop=True)


def compute_pace_stats_dfs(range_df, pace_col):
    # Filter out incomplete ranges.
    pace_df = range_df[range_df["start"].notnull() & range_df["end"].notnull()]

    pace_df["duration"] = pace_df["end"] - pace_df["start"]
    pace_gdf = pace_df.groupby(pace_col)
    stats_df = summary.describe_duration(pace_gdf["duration"])

    # Calculate the difference in values between the 'start' column and the
    # previous row's 'end' column.
    pace_df["delta"] = pace_df["start"] - pace_df["end"].shift(fill_value=0)
    pace_df["delta_accum"] = pace_df["delta"].cumsum()
    pace_df["duration_accum"] = pace_df["duration"].cumsum()

    # Drop the name column that contains the same value for all rows and
    # reset index.
    pace_df = pace_df.drop(columns=[pace_col]).reset_index(drop=True)

    return pace_df, stats_df


def apply_time_offset(session_starts, pace_dfs):
    # Synchronize session start times.
    global_min_start = min(session_starts)
    for pace_df, session_start in zip(pace_dfs, session_starts):
        session_offset = session_start - global_min_start
        pace_df["start"] = pace_df["start"] + session_offset
        pace_df["end"] = pace_df["end"] + session_offset


def describe_delta(df):
    agg_df = df.agg(["min", "max", "count", "std", "mean", "sum"])
    quantile_df = df.quantile([0.25, 0.5, 0.75])
    quantile_df.index = ["25%", "50%", "75%"]

    # We transpose the concatenated df to have the statistics as columns.
    stats_df = pd.concat([agg_df, quantile_df]).T
    stats_df = summary.format_columns(stats_df)

    return stats_df


def split_columns_as_dataframes(pace_dfs):
    # We want to get the pace info in individual dataframes per column and not
    # per rank.
    pace_df_by_column = {}

    cols = ["start", "end", "duration_accum", "delta_accum", "duration", "delta"]
    for col in cols:
        rank_column_value_map = {
            # Parquet must have string column names.
            str(rank): pace_df[col]
            for rank, pace_df in enumerate(pace_dfs)
        }

        rank_column_value_df = pd.DataFrame(rank_column_value_map)
        rank_column_value_df = rank_column_value_df.rename_axis(
            index="Iteration", columns="Rank"
        )
        pace_df_by_column[col] = rank_column_value_df

    delta_df = pace_df_by_column["delta"]
    # 'delta_df' has ranks as columns and iterations as the index. We
    # transpose it to get the statistics per iteration instead of per rank.
    pace_df_by_column["delta_stats"] = describe_delta(delta_df.T)

    return pace_df_by_column