File: renderImageItem.py

package info (click to toggle)
python-pyqtgraph 0.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,168 kB
  • sloc: python: 54,831; makefile: 128; ansic: 40; sh: 2
file content (218 lines) | stat: -rw-r--r-- 8,085 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import typing

import numpy as np
import numpy.typing as npt

import pyqtgraph as pg

try:
    import cupy as cp
except ImportError:
    cp = None

try:
    import numba
except ImportError:
    numba = None


def renderQImage(*args, **kwargs):
    imgitem = pg.ImageItem(axisOrder='row-major')
    if 'autoLevels' not in kwargs:
        kwargs['autoLevels'] = False
    imgitem.setImage(*args, **kwargs)
    imgitem.render()

def prime(data, lut, levels):
    shape = (64, 64)
    data = data[:shape[0], :shape[1]]
    kwargs = {}
    if levels is not None:
        kwargs["levels"] = levels
    if lut is not None:
        kwargs["lut"] = lut
    renderQImage(data, **kwargs)  # prime the gpu


class Parameters(typing.NamedTuple):
    sizes: list[tuple[int, int]]
    acceleration: list[str]
    uses_levels: list[bool]
    dtypes: list[npt.DTypeLike]
    channels: list[int]
    lut_lengths: list[npt.DTypeLike | None]

class TimeSuite:
    unit = "seconds"
    param_names = ["size", "acceleration", "use_levels", "dtype", "channels", "lut_length"]
    params = Parameters(
        [
            # (256, 256),               # other sizes useful to test for
            # (512, 512),               # seeing performance scale
            # (1024, 1024),             # but not helpful for tracking history
            # (2048, 2048),             # so we test the most taxing size only
            # (3072, 3072),
            (4096, 4096)
        ],                              # size
        ["numpy"],     # acceleration
        [True, False],                  # use_levels
        ['uint8', 'uint16', 'float32'], # dtype
        [1, 3, 4],                      # channels
        ['uint8', 'uint16', None]       # lut_length
    )
    def __init__(self):
        self.data = np.empty((), dtype=np.uint8)
        self.lut = np.empty((), dtype=np.ubyte)

        # No need to add acceleration that isn't available
        if numba is not None:
            self.params.acceleration.append('numba')

        if cp is not None:
            self.params.acceleration.append('cupy')

        self.levels = None

    def teardown(self, *args, **kwargs):
        # toggle options off
        pg.setConfigOption("useNumba", False)
        pg.setConfigOption("useCupy", False)

    def setup_cache(self) -> dict:
        accelerations = [np]
        if cp is not None:
            accelerations.append(cp)
        cache = {}

        for xp in accelerations:
            cache[xp.__name__] = {"lut": {}, "data": {}}
            random_generator = xp.random.default_rng(42) # answer to everything
            # handle lut caching
            c_map = xp.array([[-500.0, 255.0], [-255.0, 255.0], [0.0, 500.0]])
            for lut_length in self.params.lut_lengths:
                if lut_length is None:
                    continue
                bits = xp.dtype(lut_length).itemsize * 8
                # create the LUT
                lut = xp.zeros((2 ** bits, 4), dtype="ubyte")
                for i in range(3):
                    lut[:, i] = xp.clip(xp.linspace(c_map[i][0], c_map[i][1], 2 ** bits), 0, 255)
                lut[:, -1] = 255
                cache[xp.__name__]["lut"][lut_length] = lut

            # handle data caching
            for dtype in self.params.dtypes:
                cache[xp.__name__]["data"][dtype] = {}
                for channels in self.params.channels:
                    cache[xp.__name__]["data"][dtype][channels] = {}

                    for size in self.params.sizes:
                        size_with_channels = (size[0], size[1], channels) if channels != 1 else size
                        if xp.dtype(dtype) in (xp.float32, xp.float64):
                            data = random_generator.standard_normal(
                                size=size_with_channels,
                                dtype=dtype
                            )
                        else:
                            iinfo = xp.iinfo(dtype)
                            data = random_generator.integers(
                                low=iinfo.min,
                                high=iinfo.max,
                                size=size_with_channels,
                                dtype=dtype,
                                endpoint=True
                            )
                        cache[xp.__name__]["data"][dtype][channels][size] = data
        return cache


    def setup(
            self,
            cache: dict,
            size: tuple[int, int],
            acceleration: str,
            use_levels: bool,
            dtype: npt.DTypeLike,
            channels: int,
            lut_length: typing.Optional[npt.DTypeLike]
    ):
        xp = np
        if acceleration == "numba":
            if numba is None:
                # if numba is not available, skip it...
                raise NotImplementedError("numba not available")
            pg.setConfigOption("useNumba", True)
        elif acceleration == "cupy":
            if cp is None:
                # if cupy is not available, skip it...
                raise NotImplementedError("cupy not available")
            pg.setConfigOption("useCupy", True)
            xp = cp  # use cupy instead of numpy

        # does it even make sense to have a LUT with multiple channels?
        if lut_length is not None and channels != 1:
            raise NotImplementedError(
                f"{lut_length=} and {channels=} not implemented. LUT with multiple channels not supported."
            )

        # skip when the code paths bypass makeARGB
        if acceleration != "numpy":
            if xp.dtype(dtype) == xp.ubyte and not use_levels:
                if lut_length is None:
                    # Grayscale8, RGB888 or RGB[AX]8888
                    raise NotImplementedError(
                        f"{dtype=} and {use_levels=} not tested for {acceleration=} with {lut_length=}"
                    )
                elif channels == 1 and xp.dtype(lut_length) == xp.uint8:
                    # Indexed8
                    raise NotImplementedError(
                        f"{dtype=} and {use_levels=} not tested with {acceleration=} for {channels=} and {lut_length=}"
                    )

            elif xp.dtype(dtype) == xp.uint16 and not use_levels and lut_length is None:
                if channels == 1:
                    # Grayscale16
                    raise NotImplementedError(
                        f"{dtype=} {use_levels=} {lut_length=} and {channels=} not tested for {acceleration=}"
                    )
                elif channels == 4:
                    # RGBA64
                    raise NotImplementedError(
                        f"{dtype=} {use_levels=} {lut_length=} and {channels=} not tested with {acceleration=}"
                    )

        if use_levels:
            if xp.dtype(dtype) == xp.float32:
                self.levels = (-4.0, 4.0)
            elif xp.dtype(dtype) == xp.uint16:
                self.levels = (250, 3000)
            elif xp.dtype(dtype) == xp.uint8:
                self.levels = (20, 220)
            else:
                raise ValueError(
                    "dtype needs to be one of {'float32', 'uint8', 'uint16'}"
                )
        elif xp.dtype(dtype) in (xp.float32, xp.float64):
            # float images always need levels
            raise NotImplementedError(
                f"{use_levels=} {dtype=} is not supported. Float images always need levels."
            )
        else:
            self.levels = None

        if lut_length is None:
            self.lut = None
        else:
            self.lut = cache[xp.__name__]["lut"][lut_length]

        self.data = cache[xp.__name__]["data"][dtype][channels][size]
        if acceleration in {"numba", "cupy"}:
            prime(self.data, self.lut, self.levels)

    def time_test(self, *args, **kwargs):
        kwargs = {}
        if self.lut is not None:
            kwargs["lut"] = self.lut
        if self.levels is not None:
            kwargs["levels"] = self.levels
        renderQImage(self.data, **kwargs)