1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
"""Implements a Windows Specific TokenCache, and provides auxiliary helper types."""
import ctypes
from ctypes import wintypes
_LOCAL_FREE = ctypes.windll.kernel32.LocalFree
_GET_LAST_ERROR = ctypes.windll.kernel32.GetLastError
_MEMCPY = ctypes.cdll.msvcrt.memcpy
_MEMCPY.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t] # Note:
# Suggested by https://github.com/AzureAD/microsoft-authentication-extensions-for-python/issues/85 # pylint: disable=line-too-long
# Matching https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/memcpy-wmemcpy?view=msvc-160 # pylint: disable=line-too-long
_CRYPT_PROTECT_DATA = ctypes.windll.crypt32.CryptProtectData
_CRYPT_UNPROTECT_DATA = ctypes.windll.crypt32.CryptUnprotectData
_CRYPTPROTECT_UI_FORBIDDEN = 0x01
class DataBlob(ctypes.Structure): # pylint: disable=too-few-public-methods
"""A wrapper for interacting with the _CRYPTOAPI_BLOB type and its many aliases. This type is
exposed from Wincrypt.h in XP and above.
The memory associated with a DataBlob itself does not need to be freed, as the Python runtime
will correctly clean it up. However, depending on the data it points at, it may still need to be
freed. For instance, memory created by ctypes.create_string_buffer is already managed, and needs
to not be freed. However, memory allocated by CryptProtectData and CryptUnprotectData must have
LocalFree called on pbData.
See documentation for this type at:
https://msdn.microsoft.com/en-us/7a06eae5-96d8-4ece-98cb-cf0710d2ddbd
"""
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_char))]
def raw(self):
# type: () -> bytes
"""Copies the message from the DataBlob in natively allocated memory into Python controlled
memory.
:return A byte array that matches what is stored in native-memory."""
cb_data = int(self.cbData)
pb_data = self.pbData
blob_buffer = ctypes.create_string_buffer(cb_data)
_MEMCPY(blob_buffer, pb_data, cb_data)
return blob_buffer.raw
_err_description = {
# Keys came from real world observation, values came from winerror.h (http://errors (Microsoft internal))
-2146893813: "Key not valid for use in specified state.",
-2146892987: "The requested operation cannot be completed. "
"The computer must be trusted for delegation and "
"the current user account must be configured to allow delegation. "
"See also https://docs.microsoft.com/en-us/windows/security/threat-protection/security-policy-settings/enable-computer-and-user-accounts-to-be-trusted-for-delegation",
13: "The data is invalid.",
}
# This code is modeled from a StackOverflow question, which can be found here:
# https://stackoverflow.com/questions/463832/using-dpapi-with-python
class WindowsDataProtectionAgent(object):
"""A mechanism for interacting with the Windows DP API Native library, e.g. Crypt32.dll."""
def __init__(self, entropy=None):
# type: (str) -> None
self._entropy_blob = None
if entropy:
entropy_utf8 = entropy.encode('utf-8')
blob_buffer = ctypes.create_string_buffer(entropy_utf8, len(entropy_utf8))
self._entropy_blob = DataBlob(len(entropy_utf8), blob_buffer)
def protect(self, message):
# type: (str) -> bytes
"""Encrypts a message.
:return cipher text holding the original message."""
message = message.encode('utf-8')
message_buffer = ctypes.create_string_buffer(message, len(message))
message_blob = DataBlob(len(message), message_buffer)
result = DataBlob()
if self._entropy_blob:
entropy = ctypes.byref(self._entropy_blob)
else:
entropy = None
if _CRYPT_PROTECT_DATA(
ctypes.byref(message_blob),
u"python_data", # pylint: disable=redundant-u-string-prefix
entropy,
None,
None,
_CRYPTPROTECT_UI_FORBIDDEN,
ctypes.byref(result)):
try:
return result.raw()
finally:
_LOCAL_FREE(result.pbData)
err_code = _GET_LAST_ERROR()
raise OSError(None, _err_description.get(err_code, ''), None, err_code)
def unprotect(self, cipher_text):
# type: (bytes) -> str
"""Decrypts cipher text that is provided.
:return The original message hidden in the cipher text."""
ct_buffer = ctypes.create_string_buffer(cipher_text, len(cipher_text))
ct_blob = DataBlob(len(cipher_text), ct_buffer)
result = DataBlob()
if self._entropy_blob:
entropy = ctypes.byref(self._entropy_blob)
else:
entropy = None
if _CRYPT_UNPROTECT_DATA(
ctypes.byref(ct_blob),
None,
entropy,
None,
None,
_CRYPTPROTECT_UI_FORBIDDEN,
ctypes.byref(result)
):
try:
return result.raw().decode('utf-8')
finally:
_LOCAL_FREE(result.pbData)
err_code = _GET_LAST_ERROR()
raise OSError(None, _err_description.get(err_code, ''), None, err_code)
|