File: FlameProfiler.py

package info (click to toggle)
uranium 5.0.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,304 kB
  • sloc: python: 31,765; sh: 132; makefile: 12
file content (240 lines) | stat: -rw-r--r-- 7,405 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# Copyright (c) 2022 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.

import time
import math
import os
import threading
from contextlib import contextmanager
import functools
from typing import List, Callable, Any

from PyQt6.QtCore import pyqtSlot as PyQt6PyqtSlot
from UM.Logger import Logger
# A simple profiler which produces data suitable for viewing as a flame graph
# when using the Big Flame Graph plugin.
#
# An example of code which uses this profiling data is this Cura plugin:
# https://github.com/sedwards2009/cura-big-flame-graph
#
# Set the environment variable URANIUM_FLAME_PROFILER to something before
# starting the application to make the profiling code available.


def enabled() -> bool:
    return "URANIUM_FLAME_PROFILER" in os.environ


record_profile = False  # Flag to keep track of whether we are recording data.


# Profiling data is build up of a tree of these kinds of nodes. Each node
# has a name, start time, end time, and a list of children nodes which are
# other functions/methods which were called by this function.
class _ProfileCallNode:
    def __init__(self, name, line_number, start_time, end_time, children):
        self.__name = name
        self.__line_number = line_number
        self.__start_time = start_time
        self.__end_time = end_time
        self.__children = children if children is not None else [] # type: List[_ProfileCallNode]

    def getStartTime(self):
        return self.__start_time

    def getEndTime(self):
        return self.__end_time

    def getDuration(self):
        return self.__end_time - self.__start_time

    def toJSON(self, root=False):
        if root:
            return """
{
  "c": {
    "callStats": """ + self._plainToJSON() + """,
    "sampleIterval": 1,
    "objectName": "Cura",
    "runTime": """ + str(self.getDuration()) + """,
    "totalSamples": """ + str(self.getDuration()) + """
  },
  "version": "0.34"
}
"""
        else:
            return self._plainToJSON()

    def _plainToJSON(self):
        return '''{
"stack": [
  "''' + self.__name + '''",
  "Code: ''' + self.__name + '''",
  ''' + str(self.__line_number) + ''',
  ''' + str(self.getDuration()) + '''
],
"sampleCount": '''+ str(self.getDuration()) + ''',
"children": [
    ''' + ",\n".join( [kid.toJSON() for kid in self.__children]) + '''
]
}
'''
child_accu_stack = [ [] ]   # type: List[List[_ProfileCallNode]]
clear_profile_requested = False
record_profile_requested = False
stop_record_profile_requested = False


def getProfileData():
    """Fetch the accumulated profile data.

    :return: :type{ProfileCallNode} or None if there is no data.
    """

    raw_profile_calls = child_accu_stack[0]
    if len(raw_profile_calls) == 0:
        return None

    start_time = raw_profile_calls[0].getStartTime()
    end_time = raw_profile_calls[-1].getEndTime()
    fill_children = _fillInProfileSpaces(start_time, end_time, raw_profile_calls)
    return _ProfileCallNode("", 0, start_time, end_time, fill_children)


def clearProfileData():
    """Erase any profile data."""

    global clear_profile_requested
    clear_profile_requested = True


def startRecordingProfileData():
    """Start recording profile data."""

    global record_profile_requested
    global stop_record_profile_requested
    stop_record_profile_requested = False
    record_profile_requested = True


def stopRecordingProfileData():
    """Stop recording profile data."""

    global stop_record_profile_requested
    stop_record_profile_requested = True


def _fillInProfileSpaces(start_time, end_time, profile_call_list):
    result = []
    time_counter = start_time
    for profile_call in profile_call_list:
        if secondsToMS(profile_call.getStartTime()) != secondsToMS(time_counter):
            result.append(_ProfileCallNode("", 0, time_counter, profile_call.getStartTime(), []))
        result.append(profile_call)
        time_counter = profile_call.getEndTime()

    if secondsToMS(time_counter) != secondsToMS(end_time):
        result.append(_ProfileCallNode("", 0, time_counter, end_time, []))

    return result


def secondsToMS(value):
    return math.floor(value *1000)


@contextmanager
def profileCall(name):
    """Profile a block of code.

    Use this context manager to wrap and profile a block of code.
    :param name: :type{str} The name to use to identify this code in the profile report.
    """

    if enabled():
        start_time = time.perf_counter()
        child_accu_stack.append([])
        yield
        end_time = time.perf_counter()

        child_values = child_accu_stack.pop()
        if (end_time - start_time) > 0.001: # Filter out small durations (< 1ms)
            call_stat = _ProfileCallNode(name, 0, start_time, end_time, _fillInProfileSpaces(start_time, end_time,
                                                                                         child_values))
            child_accu_stack[-1].append(call_stat)
    else:
        yield


def isRecordingProfile() -> bool:
    """Return whether we are recording profiling information.

    :return: :type{bool} True if we are recording.
    """

    global record_profile
    return record_profile and threading.main_thread() is threading.current_thread()


def updateProfileConfig():
    global child_accu_stack
    global record_profile

    # We can only update the active profiling config when we are not deeply nested inside profiled calls.
    if len(child_accu_stack) <= 1:
        global clear_profile_requested
        if clear_profile_requested:
            clear_profile_requested = False
            child_accu_stack = [[]]

        global record_profile_requested
        if record_profile_requested:
            record_profile_requested = False
            record_profile = True
            Logger.log('d', 'Starting record record_profile_requested')

        global stop_record_profile_requested
        if stop_record_profile_requested:
            stop_record_profile_requested = False
            record_profile = False
            Logger.log('d', 'Stopping record stop_record_profile_requested')


def profile(function):
    """Decorator which can be manually applied to methods to record profiling information."""

    if enabled():
        @functools.wraps(function)
        def runIt(*args, ** kwargs):
            if isRecordingProfile():
                with profileCall(function.__qualname__):
                    return function(*args, ** kwargs)
            else:
                return function(*args, **kwargs)
        return runIt
    else:
        return function


def pyqtSlot(*args, **kwargs) -> Callable[..., Any]:
    """Drop in replacement for PyQt6's pyqtSlot decorator which records profiling information.

    See the PyQt6 documentation for information about pyqtSlot.
    """

    if enabled():
        def wrapIt(function):
            @functools.wraps(function)
            def wrapped(*args2, **kwargs2):
                if isRecordingProfile():
                    with profileCall("[SLOT] "+ function.__qualname__):
                        return function(*args2, **kwargs2)
                else:
                    return function(*args2, **kwargs2)

            return PyQt6PyqtSlot(*args, **kwargs)(wrapped)
        return wrapIt
    else:
        def dontWrapIt(function):
            return PyQt6PyqtSlot(*args, **kwargs)(function)
        return dontWrapIt