1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from knack.util import CLIError
from .file_cache import get_cli_cache
from .uuid import is_uuid
from .services import get_connection_data, get_identity_client
def resolve_identity_as_id(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the id.
"""
if identity_filter is None or is_uuid(identity_filter):
return identity_filter
if identity_filter.lower() == ME:
return get_current_identity(organization).id
identity = resolve_identity(identity_filter, organization)
if identity is not None:
return identity.id
return None
def resolve_identity_as_identity_descriptor(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the id.
"""
if identity_filter is None:
return identity_filter
if identity_filter.lower() == ME:
return get_current_identity(organization).descriptor
identity = resolve_identity(identity_filter, organization)
if identity is not None:
return identity.descriptor
return None
def resolve_identity_as_display_name(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the display name.
"""
identity = resolve_identity(identity_filter, organization)
if identity is not None:
if identity_filter.lower() == ME:
return get_current_identity(organization).provider_display_name
return get_display_name_from_identity(identity)
return None
def resolve_identity(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the identity.
"""
if identity_filter is None:
return None
if identity_filter.lower() == ME:
return get_current_identity(organization)
identity_client = get_identity_client(organization)
if identity_filter.find(' ') > 0 or identity_filter.find('@') > 0:
identities = identity_client.read_identities(search_filter='General',
filter_value=identity_filter)
if identities is None or not identities:
identities = identity_client.read_identities(search_filter='DirectoryAlias',
filter_value=identity_filter)
else:
identities = identity_client.read_identities(search_filter='DirectoryAlias',
filter_value=identity_filter)
if identities is None or not identities:
identities = identity_client.read_identities(search_filter='General',
filter_value=identity_filter)
if not identities:
raise CLIError('Could not resolve identity: ' + identity_filter)
if len(identities) > 1:
# prefer users with same domain
identities_with_tenant = []
for identity in identities:
if 'Domain' in identity.properties and '$value' in identity.properties['Domain']:
current_user = get_current_identity(organization)
if 'Domain' in current_user.properties and '$value' in current_user.properties['Domain']\
and identity.properties['Domain']['$value'] ==\
current_user.properties['Domain']['$value']:
identities_with_tenant.append(identity)
if len(identities_with_tenant) == 1:
return identities_with_tenant[0]
raise CLIError('There are multiple identities found for "' + identity_filter + '" '
'Please provide a more specific identifier for this identity.')
return identities[0]
def get_current_identity(organization):
return get_connection_data(organization).authenticated_user
def get_identities(organization, identity_ids):
identity_client = get_identity_client(organization)
return identity_client.read_identities(identity_ids=identity_ids)
def ensure_display_names_in_cache(organization, identity_ids):
ids_to_look_up = []
for identity_id in identity_ids:
if not _display_name_cache[identity_id]:
ids_to_look_up.append(identity_id)
if ids_to_look_up:
resolved_identities = get_identities(organization, ','.join(ids_to_look_up))
for identity in resolved_identities:
_display_name_cache[identity.id] = get_display_name_from_identity(identity)
def get_display_name_from_identity_id(organization, identity_id):
if not _display_name_cache[identity_id]:
ensure_display_names_in_cache(organization, [identity_id])
if _display_name_cache[identity_id]:
return _display_name_cache[identity_id]
return None
def get_display_name_from_identity(identity):
if identity.custom_display_name is not None and identity.custom_display_name != '':
return identity.custom_display_name
return identity.provider_display_name
def get_account_from_identity(identity):
if 'Account' in identity.properties and '$value' in identity.properties['Account']:
return identity.properties['Account']['$value']
return identity.provider_display_name
def get_identity_descriptor_from_subject_descriptor(subject_descriptor, organization):
identity_client = get_identity_client(organization)
identities = identity_client.read_identities(subject_descriptors=subject_descriptor)
if identities:
return identities[0].descriptor
return subject_descriptor
ME = 'me'
_display_name_cache = get_cli_cache('identity_display_names', 3600 * 6)
|