File: _win32.py

package info (click to toggle)
pyinstaller 6.18.0%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,824 kB
  • sloc: python: 41,828; ansic: 12,123; makefile: 171; sh: 131; xml: 19
file content (333 lines) | stat: -rw-r--r-- 11,564 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# -----------------------------------------------------------------------------
# Copyright (c) 2023, PyInstaller Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#
# SPDX-License-Identifier: Apache-2.0
# -----------------------------------------------------------------------------

import ctypes
import ctypes.wintypes

# Constants from win32 headers
TOKEN_QUERY = 0x0008

TokenUser = 1  # from TOKEN_INFORMATION_CLASS enum
TokenAppContainerSid = 31  # from TOKEN_INFORMATION_CLASS enum

ERROR_INSUFFICIENT_BUFFER = 122

INVALID_HANDLE = -1

FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000

SDDL_REVISION1 = 1

# Structures for ConvertSidToStringSidW
PSID = ctypes.wintypes.LPVOID


class SID_AND_ATTRIBUTES(ctypes.Structure):
    _fields_ = [
        ("Sid", PSID),
        ("Attributes", ctypes.wintypes.DWORD),
    ]


class TOKEN_USER(ctypes.Structure):
    _fields_ = [
        ("User", SID_AND_ATTRIBUTES),
    ]


PTOKEN_USER = ctypes.POINTER(TOKEN_USER)


class TOKEN_APPCONTAINER_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("TokenAppContainer", PSID),
    ]


PTOKEN_APPCONTAINER_INFORMATION = ctypes.POINTER(TOKEN_APPCONTAINER_INFORMATION)

# SECURITY_ATTRIBUTES structure for CreateDirectoryW
PSECURITY_DESCRIPTOR = ctypes.wintypes.LPVOID


class SECURITY_ATTRIBUTES(ctypes.Structure):
    _fields_ = [
        ("nLength", ctypes.wintypes.DWORD),
        ("lpSecurityDescriptor", PSECURITY_DESCRIPTOR),
        ("bInheritHandle", ctypes.wintypes.BOOL),
    ]


# win32 API functions, bound via ctypes.
# NOTE: we do not use ctypes.windll.<dll_name> to avoid modifying its (global) function prototypes, which might affect
# user's code.
advapi32 = ctypes.WinDLL("advapi32")
kernel32 = ctypes.WinDLL("kernel32")

advapi32.ConvertSidToStringSidW.restype = ctypes.wintypes.BOOL
advapi32.ConvertSidToStringSidW.argtypes = (
    PSID,  # [in] PSID Sid
    ctypes.POINTER(ctypes.wintypes.LPWSTR),  # [out] LPWSTR *StringSid
)

advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW.restype = ctypes.wintypes.BOOL
advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW.argtypes = (
    ctypes.wintypes.LPCWSTR,  # [in] LPCWSTR StringSecurityDescriptor
    ctypes.wintypes.DWORD,  # [in] DWORD StringSDRevision
    ctypes.POINTER(PSECURITY_DESCRIPTOR),  # [out] PSECURITY_DESCRIPTOR *SecurityDescriptor
    ctypes.wintypes.PULONG,  # [out] PULONG SecurityDescriptorSize
)

advapi32.GetTokenInformation.restype = ctypes.wintypes.BOOL
advapi32.GetTokenInformation.argtypes = (
    ctypes.wintypes.HANDLE,  # [in] HANDLE TokenHandle
    ctypes.c_int,  # [in] TOKEN_INFORMATION_CLASS TokenInformationClass
    ctypes.wintypes.LPVOID,  # [out, optional] LPVOID TokenInformation
    ctypes.wintypes.DWORD,  # [in] DWORD TokenInformationLength
    ctypes.wintypes.PDWORD,  # [out] PDWORD ReturnLength
)

kernel32.CloseHandle.restype = ctypes.wintypes.BOOL
kernel32.CloseHandle.argtypes = (
    ctypes.wintypes.HANDLE,  # [in] HANDLE hObject
)

kernel32.CreateDirectoryW.restype = ctypes.wintypes.BOOL
kernel32.CreateDirectoryW.argtypes = (
    ctypes.wintypes.LPCWSTR,  # [in] LPCWSTR lpPathName
    ctypes.POINTER(SECURITY_ATTRIBUTES),  # [in, optional] LPSECURITY_ATTRIBUTES lpSecurityAttributes
)

kernel32.FormatMessageW.restype = ctypes.wintypes.DWORD
kernel32.FormatMessageW.argtypes = (
    ctypes.wintypes.DWORD,  # [in] DWORD dwFlags
    ctypes.wintypes.LPCVOID,  # [in, optional] LPCVOID lpSource
    ctypes.wintypes.DWORD,  # [in] DWORD dwMessageId
    ctypes.wintypes.DWORD,  # [in] DWORD dwLanguageId
    ctypes.wintypes.LPWSTR,  # [out] LPWSTR lpBuffer
    ctypes.wintypes.DWORD,  # [in] DWORD nSize
    ctypes.wintypes.LPVOID,  # [in, optional] va_list *Arguments
)

kernel32.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
# kernel32.GetCurrentProcess has no arguments

kernel32.GetLastError.restype = ctypes.wintypes.DWORD
# kernel32.GetLastError has no arguments

kernel32.LocalFree.restype = ctypes.wintypes.BOOL
kernel32.LocalFree.argtypes = (
    ctypes.wintypes.HLOCAL,  # [in] _Frees_ptr_opt_ HLOCAL hMem
)

kernel32.OpenProcessToken.restype = ctypes.wintypes.BOOL
kernel32.OpenProcessToken.argtypes = (
    ctypes.wintypes.HANDLE,  # [in] HANDLE ProcessHandle
    ctypes.wintypes.DWORD,  # [in] DWORD DesiredAccess
    ctypes.wintypes.PHANDLE,  # [out] PHANDLE TokenHandle
)


def _win_error_to_message(error_code):
    """
    Convert win32 error code to message.
    """
    message_wstr = ctypes.wintypes.LPWSTR(None)
    ret = kernel32.FormatMessageW(
        FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
        None,  # lpSource
        error_code,  # dwMessageId
        0x400,  # dwLanguageId = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT)
        ctypes.cast(
            ctypes.byref(message_wstr),
            ctypes.wintypes.LPWSTR,
        ),  # pointer to LPWSTR due to FORMAT_MESSAGE_ALLOCATE_BUFFER; needs to be cast to LPWSTR
        64,  # due to FORMAT_MESSAGE_ALLOCATE_BUFFER, this is minimum number of characters to allocate
        None,
    )
    if ret == 0:
        return None

    message = message_wstr.value
    kernel32.LocalFree(message_wstr)

    # Strip trailing CR/LF.
    if message:
        message = message.strip()
    return message


def _get_process_sid(token_information_class):
    """
    Obtain the SID from the current process by the given token information class.

    Args:
      token_information_class: Token information class identifying the SID that we're
          interested in. Only TokenUser and TokenAppContainerSid are supported.

    Returns: SID (if it could be fetched) or None if not available or on error.
    """
    process_token = ctypes.wintypes.HANDLE(INVALID_HANDLE)

    try:
        # Get access token for the current process
        ret = kernel32.OpenProcessToken(
            kernel32.GetCurrentProcess(),
            TOKEN_QUERY,
            ctypes.pointer(process_token),
        )
        if ret == 0:
            error_code = kernel32.GetLastError()
            raise RuntimeError(f"Failed to open process token! Error code: 0x{error_code:X}")

        # Query buffer size for sid
        token_info_size = ctypes.wintypes.DWORD(0)

        ret = advapi32.GetTokenInformation(
            process_token,
            token_information_class,
            None,
            0,
            ctypes.byref(token_info_size),
        )

        # We expect this call to fail with ERROR_INSUFFICIENT_BUFFER
        if ret == 0:
            error_code = kernel32.GetLastError()
            if error_code != ERROR_INSUFFICIENT_BUFFER:
                raise RuntimeError(f"Failed to query token information buffer size! Error code: 0x{error_code:X}")
        else:
            raise RuntimeError("Unexpected return value from GetTokenInformation!")

        # Allocate buffer
        token_info = ctypes.create_string_buffer(token_info_size.value)
        ret = advapi32.GetTokenInformation(
            process_token,
            token_information_class,
            token_info,
            token_info_size,
            ctypes.byref(token_info_size),
        )
        if ret == 0:
            error_code = kernel32.GetLastError()
            raise RuntimeError(f"Failed to query token information! Error code: 0x{error_code:X}")

        # Convert SID to string
        # Technically, when UserToken is used, we need to pass user_info->User.Sid,
        # but as they are at the beginning of the buffer, just pass the buffer instead...
        sid_wstr = ctypes.wintypes.LPWSTR(None)

        if token_information_class == TokenUser:
            sid = ctypes.cast(token_info, PTOKEN_USER).contents.User.Sid
        elif token_information_class == TokenAppContainerSid:
            sid = ctypes.cast(token_info, PTOKEN_APPCONTAINER_INFORMATION).contents.TokenAppContainer
        else:
            raise ValueError(f"Unexpected token information class: {token_information_class}")

        ret = advapi32.ConvertSidToStringSidW(sid, ctypes.pointer(sid_wstr))
        if ret == 0:
            error_code = kernel32.GetLastError()
            raise RuntimeError(f"Failed to convert SID to string! Error code: 0x{error_code:X}")
        sid = sid_wstr.value
        kernel32.LocalFree(sid_wstr)
    except Exception:
        sid = None
    finally:
        # Close the process token
        if process_token.value != INVALID_HANDLE:
            kernel32.CloseHandle(process_token)

    return sid


# Get and cache current user's SID
_user_sid = _get_process_sid(TokenUser)

# Get and cache current app container's SID (if any)
_app_container_sid = _get_process_sid(TokenAppContainerSid)


def secure_mkdir(dir_name):
    """
    Replacement for mkdir that limits the access to created directory to current user.
    """

    # Create security descriptor
    # Prefer actual user SID over SID S-1-3-4 (current owner), because at the time of writing, Wine does not properly
    # support the latter.
    user_sid = _user_sid or "S-1-3-4"

    # DACL descriptor (D):
    # ace_type;ace_flags;rights;object_guid;inherit_object_guid;account_sid;(resource_attribute)
    # - ace_type = SDDL_ACCESS_ALLOWED (A)
    # - rights = SDDL_FILE_ALL (FA)
    # - account_sid = current user (queried SID)
    security_desc_str = f"D:(A;;FA;;;{user_sid})"

    # If the app is running within an AppContainer, the app container SID has to be added to the DACL.
    # Otherwise our process will not have access to the temp dir.
    #
    # Quoting https://learn.microsoft.com/en-us/windows/win32/secauthz/implementing-an-appcontainer:
    # "The AppContainer SID is a persistent unique identifier for the appcontainer. ...
    #  To allow a single AppContainer to access a resource, add its AppContainerSID to the ACL for that resource."
    if _app_container_sid:
        security_desc_str += f"(A;;FA;;;{_app_container_sid})"
    security_desc = ctypes.wintypes.LPVOID(None)

    ret = advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW(
        security_desc_str,
        SDDL_REVISION1,
        ctypes.byref(security_desc),
        None,
    )
    if ret == 0:
        error_code = kernel32.GetLastError()
        raise RuntimeError(
            f"Failed to create security descriptor! Error code: 0x{error_code:X}, "
            f"message: {_win_error_to_message(error_code)}"
        )

    security_attr = SECURITY_ATTRIBUTES()
    security_attr.nLength = ctypes.sizeof(SECURITY_ATTRIBUTES)
    security_attr.lpSecurityDescriptor = security_desc
    security_attr.bInheritHandle = False

    # Create directory
    ret = kernel32.CreateDirectoryW(
        dir_name,
        security_attr,
    )
    if ret == 0:
        # Call failed; store error code immediately, to avoid it being overwritten in cleanup below.
        error_code = kernel32.GetLastError()

    # Free security descriptor
    kernel32.LocalFree(security_desc)

    # Exit on succeess
    if ret != 0:
        return

    # Construct OSError from win error code
    error_message = _win_error_to_message(error_code)

    # Strip trailing dot to match error message from os.mkdir().
    if error_message and error_message[-1] == '.':
        error_message = error_message[:-1]

    raise OSError(
        None,  # errno
        error_message,  # strerror
        dir_name,  # filename
        error_code,  # winerror
        None,  # filename2
    )