File: profiler.py

package info (click to toggle)
python-vispy 0.15.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,868 kB
  • sloc: python: 59,799; javascript: 6,800; makefile: 69; sh: 6
file content (135 lines) | stat: -rw-r--r-- 4,695 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
# Copyright (c) Vispy Development Team. All Rights Reserved.
# Distributed under the (new) BSD License. See LICENSE.txt for more info.
# Adapted from PyQtGraph
import sys
from . import ptime
from .. import config


class Profiler(object):
    """Simple profiler allowing directed, hierarchical measurement of time
    intervals.

    By default, profilers are disabled.  To enable profiling, set the
    environment variable `VISPYPROFILE` to a comma-separated list of
    fully-qualified names of profiled functions.

    Calling a profiler registers a message (defaulting to an increasing
    counter) that contains the time elapsed since the last call.  When the
    profiler is about to be garbage-collected, the messages are passed to the
    outer profiler if one is running, or printed to stdout otherwise.

    If `delayed` is set to False, messages are immediately printed instead.

    Example:
        def function(...):
            profiler = Profiler()
            ... do stuff ...
            profiler('did stuff')
            ... do other stuff ...
            profiler('did other stuff')
            # profiler is garbage-collected and flushed at function end

    If this function is a method of class C, setting `VISPYPROFILE` to
    "C.function" (without the module name) will enable this profiler.

    For regular functions, use the qualified name of the function, stripping
    only the initial "vispy.." prefix from the module.
    """

    _profilers = (config['profile'].split(",") if config['profile'] is not None
                  else [])

    _depth = 0
    _msgs = []
    # set this flag to disable all or individual profilers at runtime
    disable = False

    class DisabledProfiler(object):
        def __init__(self, *args, **kwds):
            pass

        def __call__(self, *args):
            pass

        def finish(self):
            pass

        def mark(self, msg=None):
            pass

    _disabled_profiler = DisabledProfiler()

    def __new__(cls, msg=None, disabled='env', delayed=True):
        """Optionally create a new profiler based on caller's qualname."""
        if (disabled is True or 
                (disabled == 'env' and len(cls._profilers) == 0)):
            return cls._disabled_profiler

        # determine the qualified name of the caller function
        caller_frame = sys._getframe(1)
        try:
            caller_object_type = type(caller_frame.f_locals["self"])
        except KeyError:  # we are in a regular function
            qualifier = caller_frame.f_globals["__name__"].split(".", 1)[1]
        else:  # we are in a method
            qualifier = caller_object_type.__name__
        func_qualname = qualifier + "." + caller_frame.f_code.co_name
        if (disabled == 'env' and func_qualname not in cls._profilers and
                'all' not in cls._profilers):  # don't do anything
            return cls._disabled_profiler
        # create an actual profiling object
        cls._depth += 1
        obj = super(Profiler, cls).__new__(cls)
        obj._name = msg or func_qualname
        obj._delayed = delayed
        obj._mark_count = 0
        obj._finished = False
        obj._firstTime = obj._last_time = ptime.time()
        obj._new_msg("> Entering " + obj._name)
        return obj

    def __call__(self, msg=None, *args):
        """Register or print a new message with timing information."""
        if self.disable:
            return
        if msg is None:
            msg = str(self._mark_count)
        self._mark_count += 1
        new_time = ptime.time()
        elapsed = (new_time - self._last_time) * 1000
        self._new_msg("  " + msg + ": %0.4f ms", *(args + (elapsed,)))
        self._last_time = new_time

    def mark(self, msg=None):
        self(msg)

    def _new_msg(self, msg, *args):
        msg = "  " * (self._depth - 1) + msg
        if self._delayed:
            self._msgs.append((msg, args))
        else:
            self.flush()
            print(msg % args)

    def __del__(self):
        self.finish()

    def finish(self, msg=None):
        """Add a final message; flush the message list if no parent profiler."""
        if self._finished or self.disable:
            return        
        self._finished = True
        if msg is not None:
            self(msg)
        self._new_msg("< Exiting %s, total time: %0.4f ms", 
                      self._name, (ptime.time() - self._firstTime) * 1000)
        type(self)._depth -= 1
        if self._depth < 1:
            self.flush()

    def flush(self):
        if self._msgs:
            print("\n".join([m[0] % m[1] for m in self._msgs]))
            type(self)._msgs = []