File: cpu.py

package info (click to toggle)
python-plyer 2.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,808 kB
  • sloc: python: 13,395; sh: 217; makefile: 177
file content (252 lines) | stat: -rw-r--r-- 6,646 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
'''
Module of Windows API for plyer.cpu.
'''

from ctypes import (
    c_ulonglong, c_ulong, byref,
    Structure, POINTER, Union, windll, create_string_buffer,
    sizeof, cast, c_void_p, c_uint32
)
from ctypes.wintypes import (
    BYTE, DWORD, WORD
)

from plyer.facades import CPU


KERNEL = windll.kernel32
ERROR_INSUFFICIENT_BUFFER = 0x0000007A


class CacheType:
    '''
    Win API PROCESSOR_CACHE_TYPE enum.
    '''

    unified = 0
    instruction = 1
    data = 2
    trace = 3


class RelationshipType:
    '''
    Win API LOGICAL_PROCESSOR_RELATIONSHIP enum.
    '''

    processor_core = 0     # logical proc sharing single core
    numa_node = 1          # logical proc sharing single NUMA node
    cache = 2              # logical proc sharing cache
    processor_package = 3  # logical proc sharing physical package
    group = 4              # logical proc sharing processor group
    all = 0xffff           # logical proc info for all groups


class CacheDescriptor(Structure):
    '''
    Win API CACHE_DESCRIPTOR struct.
    '''

    _fields_ = [
        ('Level', BYTE),
        ('Associativity', BYTE),
        ('LineSize', WORD),
        ('Size', DWORD),
        ('Type', DWORD)
    ]


class ProcessorCore(Structure):
    '''
    Win API ProcessorCore struct.
    '''

    _fields_ = [('Flags', BYTE)]


class NumaNode(Structure):
    '''
    Win API NumaNode struct.
    '''

    _fields_ = [('NodeNumber', DWORD)]


class SystemLPIUnion(Union):
    '''
    Win API SYSTEM_LOGICAL_PROCESSOR_INFORMATION union without name.
    '''

    _fields_ = [
        ('ProcessorCore', ProcessorCore),
        ('NumaNode', NumaNode),
        ('Cache', CacheDescriptor),
        ('Reserved', c_ulonglong)
    ]


class SystemLPI(Structure):
    '''
    Win API SYSTEM_LOGICAL_PROCESSOR_INFORMATION struct.
    '''

    _fields_ = [
        ('ProcessorMask', c_ulong),
        ('Relationship', c_ulong),
        ('LPI', SystemLPIUnion)
    ]


class WinCPU(CPU):
    '''
    Implementation of Windows CPU API.
    '''

    @staticmethod
    def _countbits(mask):
        # make sure the correct ULONG_PTR size is used on 64bit
        # https://docs.microsoft.com/en-us/windows/
        # desktop/WinProg/windows-data-types
        # note: not a pointer per-se, != PULONG_PTR
        ulong_ptr = c_ulonglong if sizeof(c_void_p) == 8 else c_ulong
        # note: c_ulonglong only on 64bit, otherwise c_ulong

        # DWORD == c_uint32
        # https://docs.microsoft.com/en-us/windows/
        # desktop/WinProg/windows-data-types
        lshift = c_uint32(sizeof(ulong_ptr) * 8 - 1)
        assert lshift.value in (31, 63), lshift  # 32 or 64 bits - 1

        lshift = lshift.value
        test = 1 << lshift
        assert test % 2 == 0, test

        count = 0
        i = 0
        while i <= lshift:
            i += 1

            # do NOT remove!!!
            # test value has to be %2 == 0,
            # except the last case where the value is 1,
            # so that int(test) == int(float(test))
            # and the mask bit is counted correctly
            assert test % 2 == 0 or float(test) == 1.0, test

            # https://stackoverflow.com/a/1746642/5994041
            # note: useful to print(str(bin(int(...)))[2:])
            count += 1 if (mask & int(test)) else 0
            test /= 2

        return count

    def _logprocinfo(self, relationship):
        get_logical_process_info = KERNEL.GetLogicalProcessorInformation

        # first call with no structure to get the real size of the required
        buff_length = c_ulong(0)
        result = get_logical_process_info(None, byref(buff_length))
        assert not result, result
        error = KERNEL.GetLastError()
        assert error == ERROR_INSUFFICIENT_BUFFER, error
        assert buff_length, buff_length

        # create buffer from the real winapi buffer length
        buff = create_string_buffer(buff_length.value)

        # call again with buffer pointer + the same length as arguments
        result = get_logical_process_info(buff, byref(buff_length))
        assert result, (result, KERNEL.GetLastError())

        # memory size of one LPI struct in the array of LPI structs
        offset = sizeof(SystemLPI)  # ok
        values = {
            key: 0 for key in (
                'relationship', 'mask',
                'L1', 'L2', 'L3'
            )
        }

        for i in range(0, buff_length.value, offset):
            slpi = cast(
                buff[i: i + offset],
                POINTER(SystemLPI)
            ).contents

            if slpi.Relationship != relationship:
                continue

            values['relationship'] += 1
            values['mask'] += self._countbits(slpi.ProcessorMask)

            if slpi.LPI.Cache.Level == 1:
                values['L1'] += 1
            elif slpi.LPI.Cache.Level == 2:
                values['L2'] += 1
            elif slpi.LPI.Cache.Level == 3:
                values['L3'] += 1

        return values

    def _sockets(self):
        # physical CPU sockets (or slots) on motherboard
        return self._logprocinfo(
            RelationshipType.processor_package
        )['relationship']

    def _physical(self):
        # cores
        return self._logprocinfo(
            RelationshipType.processor_core
        )['relationship']

    def _logical(self):
        # cores * threads
        # if hyperthreaded core -> more than one logical processor
        return self._logprocinfo(
            RelationshipType.processor_core
        )['mask']

    def _cache(self):
        # L1, L2, L3 cache count
        result = self._logprocinfo(
            RelationshipType.cache
        )
        return {
            key: result[key]
            for key in result
            if key in ('L1', 'L2', 'L3')
        }

    def _numa(self):
        # numa nodes
        return self._logprocinfo(
            RelationshipType.numa_node
        )['relationship']


def instance():
    '''
    Instance for facade proxy.
    '''
    return WinCPU()


# Resources:
# GetLogicalProcessInformation
# https://msdn.microsoft.com/en-us/library/ms683194(v=vs.85).aspx

# SYSTEM_LOGICAL_PROCESSOR_INFORMATION
# https://msdn.microsoft.com/en-us/library/ms686694(v=vs.85).aspx

# LOGICAL_PROCESSOR_RELATIONSHIP enum (0 - 4, 0xffff)
# https://msdn.microsoft.com/2ada52f0-70ec-4146-9ef7-9af3b08996f9

# CACHE_DESCRIPTOR struct
# https://msdn.microsoft.com/38cfa605-831c-45ef-a99f-55f42b2b56e9

# PROCESSOR_CACHE_TYPE
# https://msdn.microsoft.com/23044f67-e944-43c2-8c75-3d2fba87cb3c

# C example
# https://msdn.microsoft.com/en-us/904d2d35-f419-4e8f-a689-f39ed926644c