1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
|
"""A generic persistence layer, optionally encrypted on Windows, OSX, and Linux.
Should a certain encryption is unavailable, exception will be raised at run-time,
rather than at import time.
By successfully creating and using a certain persistence object,
app developer would naturally know whether the data are protected by encryption.
"""
import abc
import os
import errno
import hashlib
import logging
import sys
try:
from pathlib import Path # Built-in in Python 3
except ImportError:
from pathlib2 import Path # An extra lib for Python 2
try:
ABC = abc.ABC
except AttributeError: # Python 2.7, abc exists, but not ABC
ABC = abc.ABCMeta("ABC", (object,), {"__slots__": ()}) # type: ignore
logger = logging.getLogger(__name__)
def _mkdir_p(path):
"""Creates a directory, and any necessary parents.
If the path provided is an existing file, this function raises an exception.
:param path: The directory name that should be created.
"""
if not path:
return # NO-OP
if sys.version_info >= (3, 2):
os.makedirs(path, exist_ok=True)
return
# This fallback implementation is based on a Stack Overflow question:
# https://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
# Known issue: it won't work when the path is a root folder like "C:\\"
try:
os.makedirs(path)
except OSError as exp:
if exp.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def _auto_hash(input_string):
return hashlib.sha256(input_string.encode('utf-8')).hexdigest()
# We do not aim to wrap every os-specific exception.
# Here we standardize only the most common ones,
# otherwise caller would need to catch os-specific underlying exceptions.
class PersistenceError(IOError): # Use IOError rather than OSError as base,
"""The base exception for persistence."""
# because historically an IOError was bubbled up and expected.
# https://github.com/AzureAD/microsoft-authentication-extensions-for-python/blob/0.2.2/msal_extensions/token_cache.py#L38
# Now we want to maintain backward compatibility even when using Python 2.x
# It makes no difference in Python 3.3+ where IOError is an alias of OSError.
def __init__(self, err_no=None, message=None, location=None): # pylint: disable=useless-super-delegation
super(PersistenceError, self).__init__(err_no, message, location)
class PersistenceNotFound(PersistenceError):
"""This happens when attempting BasePersistence.load() on a non-existent persistence instance"""
def __init__(self, err_no=None, message=None, location=None):
super(PersistenceNotFound, self).__init__(
err_no=errno.ENOENT,
message=message or "Persistence not found",
location=location)
class PersistenceEncryptionError(PersistenceError):
"""This could be raised by persistence.save()"""
class PersistenceDecryptionError(PersistenceError):
"""This could be raised by persistence.load()"""
def build_encrypted_persistence(location):
"""Build a suitable encrypted persistence instance based your current OS.
If you do not need encryption, then simply use ``FilePersistence`` constructor.
"""
# Does not (yet?) support fallback_to_plaintext flag,
# because the persistence on Windows and macOS do not support built-in trial_run().
if sys.platform.startswith('win'):
return FilePersistenceWithDataProtection(location)
if sys.platform.startswith('darwin'):
return KeychainPersistence(location)
if sys.platform.startswith('linux'):
return LibsecretPersistence(location)
raise RuntimeError("Unsupported platform: {}".format(sys.platform)) # pylint: disable=consider-using-f-string
class BasePersistence(ABC):
"""An abstract persistence defining the common interface of this family"""
is_encrypted = False # Default to False. To be overridden by sub-classes.
@abc.abstractmethod
def save(self, content):
# type: (str) -> None
"""Save the content into this persistence"""
raise NotImplementedError
@abc.abstractmethod
def load(self):
# type: () -> str
"""Load content from this persistence.
Could raise PersistenceNotFound if no save() was called before.
"""
raise NotImplementedError
@abc.abstractmethod
def time_last_modified(self):
"""Get the last time when this persistence has been modified.
Could raise PersistenceNotFound if no save() was called before.
"""
raise NotImplementedError
@abc.abstractmethod
def get_location(self):
"""Return the file path which this persistence stores (meta)data into"""
raise NotImplementedError
def _open(location):
return os.open(location, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o600)
# The 600 seems no-op on NTFS/Windows, and that is fine
class FilePersistence(BasePersistence):
"""A generic persistence, storing data in a plain-text file"""
def __init__(self, location):
if not location:
raise ValueError("Requires a file path")
self._location = os.path.expanduser(location)
_mkdir_p(os.path.dirname(self._location))
def save(self, content):
# type: (str) -> None
"""Save the content into this persistence"""
with os.fdopen(_open(self._location), 'w+') as handle:
handle.write(content)
def load(self):
# type: () -> str
"""Load content from this persistence"""
try:
with open(self._location, 'r') as handle: # pylint: disable=unspecified-encoding
return handle.read()
except EnvironmentError as exp: # EnvironmentError in Py 2.7 works across platform
if exp.errno == errno.ENOENT:
raise PersistenceNotFound(
message=(
"Persistence not initialized. "
"You can recover by calling a save() first."),
location=self._location,
)
raise
def time_last_modified(self):
try:
return os.path.getmtime(self._location)
except EnvironmentError as exp: # EnvironmentError in Py 2.7 works across platform
if exp.errno == errno.ENOENT:
raise PersistenceNotFound(
message=(
"Persistence not initialized. "
"You can recover by calling a save() first."),
location=self._location,
)
raise
def touch(self):
"""To touch this file-based persistence without writing content into it"""
Path(self._location).touch() # For os.path.getmtime() to work
def get_location(self):
return self._location
class FilePersistenceWithDataProtection(FilePersistence):
"""A generic persistence with data stored in a file,
protected by Win32 encryption APIs on Windows"""
is_encrypted = True
def __init__(self, location, entropy=''):
"""Initialization could fail due to unsatisfied dependency"""
# pylint: disable=import-outside-toplevel
from .windows import WindowsDataProtectionAgent
self._dp_agent = WindowsDataProtectionAgent(entropy=entropy)
super(FilePersistenceWithDataProtection, self).__init__(location)
def save(self, content):
# type: (str) -> None
try:
data = self._dp_agent.protect(content)
except OSError as exception:
raise PersistenceEncryptionError(
err_no=getattr(exception, "winerror", None), # Exists in Python 3 on Windows
message="Encryption failed: {} Consider disable encryption.".format(exception),
)
with os.fdopen(_open(self._location), 'wb+') as handle:
handle.write(data)
def load(self):
# type: () -> str
try:
with open(self._location, 'rb') as handle:
data = handle.read()
except EnvironmentError as exp: # EnvironmentError in Py 2.7 works across platform
if exp.errno == errno.ENOENT:
raise PersistenceNotFound(
message=(
"Persistence not initialized. "
"You can recover by calling a save() first."),
location=self._location,
)
logger.exception(
"DPAPI error likely caused by file content not previously encrypted. "
"App developer should migrate by calling save(plaintext) first.")
raise
try:
return self._dp_agent.unprotect(data)
except OSError as exception:
raise PersistenceDecryptionError(
err_no=getattr(exception, "winerror", None), # Exists in Python 3 on Windows
message="Decryption failed: {} "
"App developer may consider this guidance: "
"https://github.com/AzureAD/microsoft-authentication-extensions-for-python/wiki/PersistenceDecryptionError" # pylint: disable=line-too-long
.format(exception),
location=self._location,
)
class KeychainPersistence(BasePersistence):
"""A generic persistence with data stored in,
and protected by native Keychain libraries on OSX"""
is_encrypted = True
def __init__(self, signal_location, service_name=None, account_name=None):
"""Initialization could fail due to unsatisfied dependency.
:param signal_location: See :func:`persistence.LibsecretPersistence.__init__`
"""
from .osx import Keychain, KeychainError # pylint: disable=import-outside-toplevel
self._file_persistence = FilePersistence(signal_location) # Favor composition
self._Keychain = Keychain # pylint: disable=invalid-name
self._KeychainError = KeychainError # pylint: disable=invalid-name
default_service_name = "msal-extensions" # This is also our package name
self._service_name = service_name or default_service_name
self._account_name = account_name or _auto_hash(signal_location)
def save(self, content):
with self._Keychain() as locker:
locker.set_generic_password(
self._service_name, self._account_name, content)
self._file_persistence.touch() # For time_last_modified()
def load(self):
with self._Keychain() as locker:
try:
return locker.get_generic_password(
self._service_name, self._account_name)
except self._KeychainError as ex: # pylint: disable=invalid-name
if ex.exit_status == self._KeychainError.ITEM_NOT_FOUND:
# This happens when a load() is called before a save().
# We map it into cross-platform error for unified catching.
raise PersistenceNotFound(
location="Service:{} Account:{}".format( # pylint: disable=consider-using-f-string
self._service_name, self._account_name),
message=(
"Keychain persistence not initialized. "
"You can recover by call a save() first."),
)
raise # We do not intend to hide any other underlying exceptions
def time_last_modified(self):
return self._file_persistence.time_last_modified()
def get_location(self):
return self._file_persistence.get_location()
class LibsecretPersistence(BasePersistence):
"""A generic persistence with data stored in,
and protected by native libsecret libraries on Linux"""
is_encrypted = True
def __init__(self, signal_location, schema_name=None, attributes=None, **kwargs):
"""Initialization could fail due to unsatisfied dependency.
:param string signal_location:
Besides saving the real payload into encrypted storage,
this class will also touch this signal file.
Applications may listen a FileSystemWatcher.Changed event for reload.
https://docs.microsoft.com/en-us/dotnet/api/system.io.filesystemwatcher.changed?view=netframework-4.8#remarks
:param string schema_name: See :func:`libsecret.LibSecretAgent.__init__`
:param dict attributes: See :func:`libsecret.LibSecretAgent.__init__`
"""
# pylint: disable=import-outside-toplevel
from .libsecret import ( # This uncertain import is deferred till runtime
LibSecretAgent, trial_run)
trial_run()
self._agent = LibSecretAgent(
schema_name or _auto_hash(signal_location), attributes or {}, **kwargs)
self._file_persistence = FilePersistence(signal_location) # Favor composition
def save(self, content):
if self._agent.save(content):
self._file_persistence.touch() # For time_last_modified()
def load(self):
data = self._agent.load()
if data is None:
# Lower level libsecret would return None when found nothing. Here
# in persistence layer, we convert it to a unified error for consistence.
raise PersistenceNotFound(message=(
"Keyring persistence not initialized. "
"You can recover by call a save() first."))
return data
def time_last_modified(self):
return self._file_persistence.time_last_modified()
def get_location(self):
return self._file_persistence.get_location()
# We could also have a KeyringPersistence() which can then be used together
# with a FilePersistence to achieve
# https://github.com/AzureAD/microsoft-authentication-extensions-for-python/issues/12
# But this idea is not pursued at this time.
|