File: security_group.py

package info (click to toggle)
azure-devops-cli-extension 1.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,384 kB
  • sloc: python: 160,782; xml: 198; makefile: 56; sh: 51
file content (236 lines) | stat: -rw-r--r-- 11,977 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from knack.log import get_logger
from knack.util import CLIError
from azext_devops.devops_sdk.exceptions import AzureDevOpsClientRequestError
from azext_devops.devops_sdk.v5_0.graph.models import (JsonPatchOperation,
                                                       GraphSubjectLookup,
                                                       GraphSubjectLookupKey)
from azext_devops.dev.common.identities import resolve_identity_as_id
from azext_devops.dev.common.services import (get_graph_client,
                                              get_project_id_from_name,
                                              resolve_instance_and_project,
                                              resolve_instance)
from .security_group_helper import (GraphGroupVstsCreationContext,
                                    GraphGroupMailAddressCreationContext,
                                    GraphGroupOriginIdCreationContext)

logger = get_logger(__name__)


def list_groups(scope='project', project=None, continuation_token=None,
                subject_types=None, organization=None, detect=None):
    """ List all the groups in a project or organization
    :param scope: List groups at project or organization level.
    :type scope: str
    :param continuation_token : If there are more results that can't be returned in a single page, the result set
                                will contain a continuation token for retrieval of the next set of results.
    :type continuation_token: str
    :param subject_types: A comma separated list of user subject subtypes to reduce the retrieved results.
                          You can give initial part of descriptor [before the dot] as a filter e.g. vssgp,aadgp
    :type subject_types: [str]
    :rtype: :class:`<PagedGraphGroups> <azure.devops.v5_0.graph.models.PagedGraphGroups>`
    """
    if scope == 'project':
        organization, project = resolve_instance_and_project(detect=detect,
                                                             organization=organization,
                                                             project=project)
    else:
        organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    scope_descriptor = None
    if project is not None:
        project_id = get_project_id_from_name(organization, project)
        scope_descriptor = get_descriptor_from_storage_key(project_id, client)
    if subject_types is not None:
        subject_types = subject_types.split(',')
    group_list_response = client.list_groups(scope_descriptor=scope_descriptor,
                                             continuation_token=continuation_token, subject_types=subject_types)
    return group_list_response


def create_group(name=None, description=None, origin_id=None, email_id=None,
                 groups=None, scope='project', project=None, organization=None, detect=None):
    """
    :param name: Name of Azure DevOps group.
    :type name: str
    :param description: Description of Azure DevOps group.
    :type description: str
    :param origin_id: Create new group using the OriginID as a reference to an existing group
                      from an external AD or AAD backed provider. Required if name or email-id is missing.
    :type origin_id: str
    :param email_id: Create new group using the mail address as a reference to an existing group
                     from an external AD or AAD backed provider. Required if name or origin-id is missing.
    :type email_id: str
    :param groups: A comma separated list of descriptors referencing groups you want the newly created
                   group to join.
    :type groups: [str]
    :param scope: Create group at project or organization level.
    :type scope: str
    :rtype: :class:`<GraphGroup> <azure.devops.v5_0.graph.models.GraphGroup>`
    """
    if scope == 'project':
        organization, project = resolve_instance_and_project(detect=detect,
                                                             organization=organization,
                                                             project=project)
    else:
        organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    if name is not None and origin_id is None and email_id is None:
        group_creation_context = GraphGroupVstsCreationContext(display_name=name, description=description)
    elif origin_id is not None and email_id is None and name is None:
        group_creation_context = GraphGroupOriginIdCreationContext(origin_id=origin_id)
    elif email_id is not None and name is None and origin_id is None:
        group_creation_context = GraphGroupMailAddressCreationContext(mail_address=email_id)
    else:
        raise CLIError('Provide exactly one argument out of name, origin-id or email-id.')
    scope_descriptor = None
    if project is not None:
        project_id = get_project_id_from_name(organization, project)
        scope_descriptor = get_descriptor_from_storage_key(project_id, client)
    if groups is not None:
        groups = groups.split(',')
    group_details = client.create_group(creation_context=group_creation_context,
                                        scope_descriptor=scope_descriptor, group_descriptors=groups)
    return group_details


def get_group(id, organization=None, detect=None):  # pylint: disable=redefined-builtin
    """Show group details.
    :param id: Descriptor of the group.
    :type id: str
    :rtype: :class:`<GraphGroup> <azure.devops.v5_0.graph.models.GraphGroup>`
    """
    organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    group_details = client.get_group(group_descriptor=id)
    return group_details


def update_group(id, name=None, description=None, organization=None, detect=None):  # pylint: disable=redefined-builtin
    """Update name AND/OR description for an Azure DevOps group.
    :param id: Descriptor of the group.
    :type id: str
    :param name: New name for Azure DevOps group.
    :type name: str
    :param description: New description for Azure DevOps group.
    :type description: str
    :rtype: :class:`<GraphGroup> <azure.devops.v5_0.graph.models.GraphGroup>`
    """
    if name is None and description is None:
        raise CLIError('Either name or description argument must be provided.')
    patch_document = []
    if name is not None:
        patch_document.append(_create_patch_operation('replace', '/displayName', name))
    if description is not None:
        patch_document.append(_create_patch_operation('replace', '/description', description))
    organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    update_group_details = client.update_group(group_descriptor=id, patch_document=patch_document)
    return update_group_details


def delete_group(id, organization=None, detect=None):  # pylint: disable=redefined-builtin
    """Delete an Azure DevOps group.
    :param id: Descriptor of the group.
    :type id: str
    """
    organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    delete_group_details = client.delete_group(group_descriptor=id)
    return delete_group_details


def list_memberships(id, relationship='members', organization=None, detect=None):  # pylint: disable=redefined-builtin
    """List memberships for a group or user.
    :param id: Group descriptor or User Email whose membership details are required.
    :type id: str
    :rtype: [GraphMembership]
    """
    organization = resolve_instance(detect=detect, organization=organization)
    subject_descriptor = id
    client = get_graph_client(organization)
    if '@' in id or '.' not in id:
        id = resolve_identity_as_id(id, organization)
        subject_descriptor = get_descriptor_from_storage_key(id, client)
    direction = 'down'
    if relationship == 'memberof':
        direction = 'up'
    membership_list = client.list_memberships(subject_descriptor=subject_descriptor, direction=direction)
    lookup_keys = []
    for members in membership_list:
        if relationship == 'memberof':
            key = GraphSubjectLookupKey(members.container_descriptor)
        else:
            key = GraphSubjectLookupKey(members.member_descriptor)
        lookup_keys.append(key)
    subject_lookup = GraphSubjectLookup(lookup_keys=lookup_keys)
    members_details = client.lookup_subjects(subject_lookup=subject_lookup)
    return members_details


def add_membership(member_id, group_id, organization=None, detect=None):
    """Add membership.
    :param member_id: Descriptor of the group or Email Id of the user to be added.
    User should already be a part of the organization. Use `az devops user add` command to add an user to organization.
    :type member_id: str
    :param group_id: Descriptor of the group to which member needs to be added.
    :type group_id: str
    :rtype: :class:`<GraphMembership> <azure.devops.v5_0.graph.models.GraphMembership>`
    """
    organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    subject_descriptor = member_id
    if '@' in member_id or '.' not in member_id:
        member_id = resolve_identity_as_id(member_id, organization)
        subject_descriptor = get_descriptor_from_storage_key(member_id, client)
    membership_details = client.add_membership(subject_descriptor=subject_descriptor,
                                               container_descriptor=group_id)
    lookup_keys = []
    container = GraphSubjectLookupKey(membership_details.container_descriptor)
    subject = GraphSubjectLookupKey(membership_details.member_descriptor)
    lookup_keys.append(container)
    lookup_keys.append(subject)
    subject_lookup = GraphSubjectLookup(lookup_keys=lookup_keys)
    membership_details = client.lookup_subjects(subject_lookup=subject_lookup)
    return membership_details


def remove_membership(member_id, group_id, organization=None, detect=None):
    """Remove membership.
    :param member_id: Descriptor of the group or Email Id of the user to be removed.
    :type member_id: str
    :param group_id: Descriptor of the group from which member needs to be removed.
    :type group_id: str
    """
    organization = resolve_instance(detect=detect, organization=organization)
    client = get_graph_client(organization)
    subject_descriptor = member_id
    if '@' in member_id or '.' not in member_id:
        member_id = resolve_identity_as_id(member_id, organization)
        subject_descriptor = get_descriptor_from_storage_key(member_id, client)
    try:
        client.check_membership_existence(subject_descriptor=subject_descriptor,
                                          container_descriptor=group_id)
        membership_details = client.remove_membership(subject_descriptor=subject_descriptor,
                                                      container_descriptor=group_id)
    except AzureDevOpsClientRequestError as ex:
        logger.debug(ex, exc_info=True)
        raise CLIError("Membership doesn't exists.")
    return membership_details


def get_descriptor_from_storage_key(storage_key, client):
    descriptor = client.get_descriptor(storage_key)
    return descriptor


def _create_patch_operation(op, path, value):
    patch_operation = JsonPatchOperation()
    patch_operation.op = op
    patch_operation.path = path
    patch_operation.value = value
    return patch_operation