File: identities.py

package info (click to toggle)
azure-devops-cli-extension 1.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,384 kB
  • sloc: python: 160,782; xml: 198; makefile: 56; sh: 51
file content (141 lines) | stat: -rw-r--r-- 5,948 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------


from knack.util import CLIError
from .file_cache import get_cli_cache
from .uuid import is_uuid
from .services import get_connection_data, get_identity_client


def resolve_identity_as_id(identity_filter, organization):
    """Takes an identity name, email, alias, or id, and returns the id.
    """
    if identity_filter is None or is_uuid(identity_filter):
        return identity_filter
    if identity_filter.lower() == ME:
        return get_current_identity(organization).id
    identity = resolve_identity(identity_filter, organization)
    if identity is not None:
        return identity.id
    return None


def resolve_identity_as_identity_descriptor(identity_filter, organization):
    """Takes an identity name, email, alias, or id, and returns the id.
    """
    if identity_filter is None:
        return identity_filter
    if identity_filter.lower() == ME:
        return get_current_identity(organization).descriptor
    identity = resolve_identity(identity_filter, organization)
    if identity is not None:
        return identity.descriptor
    return None


def resolve_identity_as_display_name(identity_filter, organization):
    """Takes an identity name, email, alias, or id, and returns the display name.
    """
    identity = resolve_identity(identity_filter, organization)
    if identity is not None:
        if identity_filter.lower() == ME:
            return get_current_identity(organization).provider_display_name
        return get_display_name_from_identity(identity)
    return None


def resolve_identity(identity_filter, organization):
    """Takes an identity name, email, alias, or id, and returns the identity.
    """
    if identity_filter is None:
        return None

    if identity_filter.lower() == ME:
        return get_current_identity(organization)

    identity_client = get_identity_client(organization)
    if identity_filter.find(' ') > 0 or identity_filter.find('@') > 0:
        identities = identity_client.read_identities(search_filter='General',
                                                     filter_value=identity_filter)
        if identities is None or not identities:
            identities = identity_client.read_identities(search_filter='DirectoryAlias',
                                                         filter_value=identity_filter)
    else:
        identities = identity_client.read_identities(search_filter='DirectoryAlias',
                                                     filter_value=identity_filter)
        if identities is None or not identities:
            identities = identity_client.read_identities(search_filter='General',
                                                         filter_value=identity_filter)
    if not identities:
        raise CLIError('Could not resolve identity: ' + identity_filter)
    if len(identities) > 1:
        # prefer users with same domain
        identities_with_tenant = []
        for identity in identities:
            if 'Domain' in identity.properties and '$value' in identity.properties['Domain']:
                current_user = get_current_identity(organization)
                if 'Domain' in current_user.properties and '$value' in current_user.properties['Domain']\
                        and identity.properties['Domain']['$value'] ==\
                        current_user.properties['Domain']['$value']:
                    identities_with_tenant.append(identity)
        if len(identities_with_tenant) == 1:
            return identities_with_tenant[0]
        raise CLIError('There are multiple identities found for "' + identity_filter + '" '
                       'Please provide a more specific identifier for this identity.')

    return identities[0]


def get_current_identity(organization):
    return get_connection_data(organization).authenticated_user


def get_identities(organization, identity_ids):
    identity_client = get_identity_client(organization)
    return identity_client.read_identities(identity_ids=identity_ids)


def ensure_display_names_in_cache(organization, identity_ids):
    ids_to_look_up = []
    for identity_id in identity_ids:
        if not _display_name_cache[identity_id]:
            ids_to_look_up.append(identity_id)
    if ids_to_look_up:
        resolved_identities = get_identities(organization, ','.join(ids_to_look_up))
        for identity in resolved_identities:
            _display_name_cache[identity.id] = get_display_name_from_identity(identity)


def get_display_name_from_identity_id(organization, identity_id):
    if not _display_name_cache[identity_id]:
        ensure_display_names_in_cache(organization, [identity_id])
    if _display_name_cache[identity_id]:
        return _display_name_cache[identity_id]
    return None


def get_display_name_from_identity(identity):
    if identity.custom_display_name is not None and identity.custom_display_name != '':
        return identity.custom_display_name
    return identity.provider_display_name


def get_account_from_identity(identity):
    if 'Account' in identity.properties and '$value' in identity.properties['Account']:
        return identity.properties['Account']['$value']
    return identity.provider_display_name


def get_identity_descriptor_from_subject_descriptor(subject_descriptor, organization):
    identity_client = get_identity_client(organization)
    identities = identity_client.read_identities(subject_descriptors=subject_descriptor)
    if identities:
        return identities[0].descriptor
    return subject_descriptor


ME = 'me'
_display_name_cache = get_cli_cache('identity_display_names', 3600 * 6)