1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
import math
import numpy as np
import pandas as pd
def get_bin_size(bin_number, max_duration):
return math.ceil(max_duration / bin_number)
def generate_bin_list(bin_number, bin_size, include_last=False):
"""Generates a list of values determining the boundaries of each bin.
Parameters
----------
bin_number : int
Total number of bins in the generated list.
bin_size : int
Size of each bin.
include_last : bool
Whether the maximum bin boundary should be included or not.
"""
bin_count = bin_number + 1 if include_last else bin_number
return [bin_size * i for i in range(bin_count)]
def group_overlapping_ranges(range_df):
"""Assign unique group identifiers to overlapping ranges."""
df = range_df.sort_values("start")
cumulative_max_end = df["end"].cummax()
groups = (df["start"] > cumulative_max_end.shift()).cumsum()
return groups
def consolidate_ranges(range_df):
"""Consolidate overlapping time ranges.
For each set of overlapping ranges, only the earliest start time and latest
end time will be retained.
"""
groups = group_overlapping_ranges(range_df)
return range_df.groupby(groups).agg({"start": "min", "end": "max"})
def _calculate_bin_info(starts, ends, bin_size):
# Scale the start and end times by the bin size.
start_scaled = starts / bin_size
end_scaled = ends / bin_size
# Calculate the bin index for each start and end.
start_bins = np.floor(start_scaled).astype(int)
end_bins = np.floor(end_scaled).astype(int)
# Calculate the clipped start and end values to ensure they don't exceed
# the bin boundaries.
ends = np.minimum(np.ceil(start_scaled), end_scaled)
starts = np.maximum(np.floor(end_scaled), start_scaled)
# Calculate the coverage percentage.
start_percents = ends - start_scaled
end_percents = end_scaled - starts
return start_bins, end_bins, start_percents, end_percents
def _rectify_pct_inplace(bin_pcts, bin_size, profile_duration, session_offset):
# Any portion of the bins that is outside the profiling session will be
# removed.
start_bin, end_bin, start_percent, end_percent = _calculate_bin_info(
session_offset, profile_duration, bin_size
)
if start_percent != 0:
bin_pcts[start_bin] /= start_percent
if end_percent != 0 and start_bin != end_bin:
bin_pcts[end_bin] /= end_percent
# Set values outside the profiling session to NaN.
bin_pcts[end_bin + 1 :] = np.nan
bin_pcts[:start_bin] = np.nan
def get_zero_bin_pcts(bin_size, bin_num, profile_duration, session_offset):
"""Fill each bin with zero."""
bin_pcts = np.zeros(bin_num)
_rectify_pct_inplace(bin_pcts, bin_size, profile_duration, session_offset)
return bin_pcts
def calculate_bin_pcts(
df, bin_size, bin_num, profile_duration, session_offset, value_key=None
):
"""Calculate the percentage for each bin."""
values = df[value_key] if value_key else np.ones(df.shape[0])
start_bins, end_bins, start_percents, end_percents = _calculate_bin_info(
df["start"], df["end"], bin_size
)
# Handle cases where the range falls in a single bin.
# In this case, either the start or the end arrays can be used.
single_bin_indices = np.where(end_bins == start_bins)[0]
bin_pcts = np.bincount(
start_bins[single_bin_indices],
weights=start_percents[single_bin_indices] * values[single_bin_indices],
minlength=bin_num,
).astype(float)
# Handle cases where the range spans multiple bins.
# We add the percentages individually for each bin.
multi_bin_indices = np.where(end_bins != start_bins)[0]
for i in multi_bin_indices:
bin_pcts[end_bins[i]] += end_percents[i] * values[i]
bin_pcts[start_bins[i]] += start_percents[i] * values[i]
bin_pcts[start_bins[i] + 1 : end_bins[i]] += values[i]
_rectify_pct_inplace(bin_pcts, bin_size, profile_duration, session_offset)
return (bin_pcts * 100).round(1)
def calculate_overlapping_ranges(df1, df2):
"""Calculate overlapping ranges from two dataframes."""
df1["type"] = "df1"
df2["type"] = "df2"
all_df = pd.concat([df1, df2]).reset_index(drop=True)
all_df["group"] = group_overlapping_ranges(all_df)
group_df1 = all_df[all_df["type"] == "df1"]
group_df2 = all_df[all_df["type"] == "df2"]
df1 = df1.drop(columns=["type"])
df2 = df2.drop(columns=["type"])
# Ranges that have no shared groups are excluded, as they cannot overlap
# with other ranges.
merged_df = pd.merge(group_df1, group_df2, on="group", suffixes=("_df1", "_df2"))
start1 = merged_df["start_df1"].values
end1 = merged_df["end_df1"].values
start2 = merged_df["start_df2"].values
end2 = merged_df["end_df2"].values
overlap_start = np.maximum(start1[:, np.newaxis], start2)
overlap_end = np.minimum(end1[:, np.newaxis], end2)
overlap_start = overlap_start.reshape(-1)
overlap_end = overlap_end.reshape(-1)
differences = overlap_end - overlap_start
mask = differences > 0
overlap_start = overlap_start[mask]
overlap_start = np.unique(overlap_start)
overlap_end = overlap_end[mask]
overlap_end = np.unique(overlap_end)
if len(overlap_start) != len(overlap_end):
raise RuntimeError("Start and end counts do not match.")
return pd.DataFrame({"start": overlap_start, "end": overlap_end})
|