File: collective_loader.py

package info (click to toggle)
nvidia-cuda-toolkit 12.4.1-3
  • links: PTS, VCS
  • area: non-free
  • in suites: forky, sid
  • size: 18,505,836 kB
  • sloc: ansic: 203,477; cpp: 64,769; python: 34,699; javascript: 22,006; xml: 13,410; makefile: 3,085; sh: 2,343; perl: 352
file content (77 lines) | stat: -rw-r--r-- 2,711 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.

from nsys_recipe.data_service import DataService
from nsys_recipe.lib import exceptions


def filter_mapper_results(mapper_res):
    """Filter empty results along with their associated report files.

    Returns
    -------
    filtered_files : list of str
        List of report files with non-empty data. This can be used
        instead of the original files to avoid duplicated warning/error
        messages when reading the same table afterwards.
    filtered_res: list
        List of non-empty results.
    """
    filtered_res = []
    filtered_files = []

    for file, res in mapper_res:
        if res is not None:
            filtered_res.append(res)
            filtered_files.append(file)

    if not filtered_res:
        raise exceptions.NoDataError
    return filtered_files, filtered_res


class ProfileInfo:
    @staticmethod
    def mapper_func(report_path):
        service = DataService(report_path)

        table_column_dict = {
            "ANALYSIS_DETAILS": ["duration"],
            "TARGET_INFO_SESSION_START_TIME": ["utcEpochNs"],
        }

        df_dict = service.read_tables(table_column_dict)
        if df_dict is None:
            return None, None

        profile_duration = df_dict["ANALYSIS_DETAILS"].at[0, "duration"]
        session_time = df_dict["TARGET_INFO_SESSION_START_TIME"].at[0, "utcEpochNs"]

        return report_path, (profile_duration, session_time)

    @staticmethod
    def reducer_func(mapper_res):
        profile_durations, session_times = zip(*mapper_res)

        min_session = min(session_times)
        profile_durations = [
            duration + (session - min_session) for duration, session in mapper_res
        ]

        return max(profile_durations), min_session

    @staticmethod
    def get_profile_info(context, input):
        """Get the maximum profile duration and the minimum session time."""
        mapper_res = context.wait(context.map(ProfileInfo.mapper_func, input))
        filtered_files, filtered_res = filter_mapper_results(mapper_res)
        max_duration, min_session = ProfileInfo.reducer_func(filtered_res)

        return max_duration, min_session, filtered_files