1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
import pandas as pd
from collections import namedtuple
from nsys_recipe.lib import summary
PaceInfo = namedtuple("PaceInfo", ["filename", "pace_df", "stats_df", "session_start"])
def get_session_start_time(session_start_df):
return session_start_df.at[0, "utcEpochNs"]
def filter_by_pace_name(range_df, pace_col, pace_name):
filtered_range_df = range_df[range_df[pace_col] == pace_name]
return filtered_range_df.reset_index(drop=True)
def compute_pace_stats_dfs(range_df, pace_col):
# Filter out incomplete ranges.
pace_df = range_df[range_df["start"].notnull() & range_df["end"].notnull()]
pace_df["duration"] = pace_df["end"] - pace_df["start"]
pace_gdf = pace_df.groupby(pace_col)
stats_df = summary.describe_duration(pace_gdf["duration"])
# Calculate the difference in values between the 'start' column and the
# previous row's 'end' column.
pace_df["delta"] = pace_df["start"] - pace_df["end"].shift(fill_value=0)
pace_df["delta_accum"] = pace_df["delta"].cumsum()
pace_df["duration_accum"] = pace_df["duration"].cumsum()
# Drop the name column that contains the same value for all rows and
# reset index.
pace_df = pace_df.drop(columns=[pace_col]).reset_index(drop=True)
return pace_df, stats_df
def apply_time_offset(session_starts, pace_dfs):
# Synchronize session start times.
global_min_start = min(session_starts)
for pace_df, session_start in zip(pace_dfs, session_starts):
session_offset = session_start - global_min_start
pace_df["start"] = pace_df["start"] + session_offset
pace_df["end"] = pace_df["end"] + session_offset
def describe_delta(df):
agg_df = df.agg(["min", "max", "count", "std", "mean", "sum"])
quantile_df = df.quantile([0.25, 0.5, 0.75])
quantile_df.index = ["25%", "50%", "75%"]
# We transpose the concatenated df to have the statistics as columns.
stats_df = pd.concat([agg_df, quantile_df]).T
stats_df = summary.format_columns(stats_df)
return stats_df
def split_columns_as_dataframes(pace_dfs):
# We want to get the pace info in individual dataframes per column and not
# per rank.
pace_df_by_column = {}
cols = ["start", "end", "duration_accum", "delta_accum", "duration", "delta"]
for col in cols:
rank_column_value_map = {
# Parquet must have string column names.
str(rank): pace_df[col]
for rank, pace_df in enumerate(pace_dfs)
}
rank_column_value_df = pd.DataFrame(rank_column_value_map)
rank_column_value_df = rank_column_value_df.rename_axis(
index="Iteration", columns="Rank"
)
pace_df_by_column[col] = rank_column_value_df
delta_df = pace_df_by_column["delta"]
# 'delta_df' has ranks as columns and iterations as the index. We
# transpose it to get the statistics per iteration instead of per rank.
pace_df_by_column["delta_stats"] = describe_delta(delta_df.T)
return pace_df_by_column
|