1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from knack.log import get_logger
from knack.util import CLIError
from azext_devops.devops_sdk.exceptions import AzureDevOpsClientRequestError
from azext_devops.devops_sdk.v5_0.graph.models import (JsonPatchOperation,
GraphSubjectLookup,
GraphSubjectLookupKey)
from azext_devops.dev.common.identities import resolve_identity_as_id
from azext_devops.dev.common.services import (get_graph_client,
get_project_id_from_name,
resolve_instance_and_project,
resolve_instance)
from .security_group_helper import (GraphGroupVstsCreationContext,
GraphGroupMailAddressCreationContext,
GraphGroupOriginIdCreationContext)
logger = get_logger(__name__)
def list_groups(scope='project', project=None, continuation_token=None,
subject_types=None, organization=None, detect=None):
""" List all the groups in a project or organization
:param scope: List groups at project or organization level.
:type scope: str
:param continuation_token : If there are more results that can't be returned in a single page, the result set
will contain a continuation token for retrieval of the next set of results.
:type continuation_token: str
:param subject_types: A comma separated list of user subject subtypes to reduce the retrieved results.
You can give initial part of descriptor [before the dot] as a filter e.g. vssgp,aadgp
:type subject_types: [str]
:rtype: :class:`<PagedGraphGroups> <azure.devops.v5_0.graph.models.PagedGraphGroups>`
"""
if scope == 'project':
organization, project = resolve_instance_and_project(detect=detect,
organization=organization,
project=project)
else:
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
scope_descriptor = None
if project is not None:
project_id = get_project_id_from_name(organization, project)
scope_descriptor = get_descriptor_from_storage_key(project_id, client)
if subject_types is not None:
subject_types = subject_types.split(',')
group_list_response = client.list_groups(scope_descriptor=scope_descriptor,
continuation_token=continuation_token, subject_types=subject_types)
return group_list_response
def create_group(name=None, description=None, origin_id=None, email_id=None,
groups=None, scope='project', project=None, organization=None, detect=None):
"""
:param name: Name of Azure DevOps group.
:type name: str
:param description: Description of Azure DevOps group.
:type description: str
:param origin_id: Create new group using the OriginID as a reference to an existing group
from an external AD or AAD backed provider. Required if name or email-id is missing.
:type origin_id: str
:param email_id: Create new group using the mail address as a reference to an existing group
from an external AD or AAD backed provider. Required if name or origin-id is missing.
:type email_id: str
:param groups: A comma separated list of descriptors referencing groups you want the newly created
group to join.
:type groups: [str]
:param scope: Create group at project or organization level.
:type scope: str
:rtype: :class:`<GraphGroup> <azure.devops.v5_0.graph.models.GraphGroup>`
"""
if scope == 'project':
organization, project = resolve_instance_and_project(detect=detect,
organization=organization,
project=project)
else:
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
if name is not None and origin_id is None and email_id is None:
group_creation_context = GraphGroupVstsCreationContext(display_name=name, description=description)
elif origin_id is not None and email_id is None and name is None:
group_creation_context = GraphGroupOriginIdCreationContext(origin_id=origin_id)
elif email_id is not None and name is None and origin_id is None:
group_creation_context = GraphGroupMailAddressCreationContext(mail_address=email_id)
else:
raise CLIError('Provide exactly one argument out of name, origin-id or email-id.')
scope_descriptor = None
if project is not None:
project_id = get_project_id_from_name(organization, project)
scope_descriptor = get_descriptor_from_storage_key(project_id, client)
if groups is not None:
groups = groups.split(',')
group_details = client.create_group(creation_context=group_creation_context,
scope_descriptor=scope_descriptor, group_descriptors=groups)
return group_details
def get_group(id, organization=None, detect=None): # pylint: disable=redefined-builtin
"""Show group details.
:param id: Descriptor of the group.
:type id: str
:rtype: :class:`<GraphGroup> <azure.devops.v5_0.graph.models.GraphGroup>`
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
group_details = client.get_group(group_descriptor=id)
return group_details
def update_group(id, name=None, description=None, organization=None, detect=None): # pylint: disable=redefined-builtin
"""Update name AND/OR description for an Azure DevOps group.
:param id: Descriptor of the group.
:type id: str
:param name: New name for Azure DevOps group.
:type name: str
:param description: New description for Azure DevOps group.
:type description: str
:rtype: :class:`<GraphGroup> <azure.devops.v5_0.graph.models.GraphGroup>`
"""
if name is None and description is None:
raise CLIError('Either name or description argument must be provided.')
patch_document = []
if name is not None:
patch_document.append(_create_patch_operation('replace', '/displayName', name))
if description is not None:
patch_document.append(_create_patch_operation('replace', '/description', description))
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
update_group_details = client.update_group(group_descriptor=id, patch_document=patch_document)
return update_group_details
def delete_group(id, organization=None, detect=None): # pylint: disable=redefined-builtin
"""Delete an Azure DevOps group.
:param id: Descriptor of the group.
:type id: str
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
delete_group_details = client.delete_group(group_descriptor=id)
return delete_group_details
def list_memberships(id, relationship='members', organization=None, detect=None): # pylint: disable=redefined-builtin
"""List memberships for a group or user.
:param id: Group descriptor or User Email whose membership details are required.
:type id: str
:rtype: [GraphMembership]
"""
organization = resolve_instance(detect=detect, organization=organization)
subject_descriptor = id
client = get_graph_client(organization)
if '@' in id or '.' not in id:
id = resolve_identity_as_id(id, organization)
subject_descriptor = get_descriptor_from_storage_key(id, client)
direction = 'down'
if relationship == 'memberof':
direction = 'up'
membership_list = client.list_memberships(subject_descriptor=subject_descriptor, direction=direction)
lookup_keys = []
for members in membership_list:
if relationship == 'memberof':
key = GraphSubjectLookupKey(members.container_descriptor)
else:
key = GraphSubjectLookupKey(members.member_descriptor)
lookup_keys.append(key)
subject_lookup = GraphSubjectLookup(lookup_keys=lookup_keys)
members_details = client.lookup_subjects(subject_lookup=subject_lookup)
return members_details
def add_membership(member_id, group_id, organization=None, detect=None):
"""Add membership.
:param member_id: Descriptor of the group or Email Id of the user to be added.
User should already be a part of the organization. Use `az devops user add` command to add an user to organization.
:type member_id: str
:param group_id: Descriptor of the group to which member needs to be added.
:type group_id: str
:rtype: :class:`<GraphMembership> <azure.devops.v5_0.graph.models.GraphMembership>`
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
subject_descriptor = member_id
if '@' in member_id or '.' not in member_id:
member_id = resolve_identity_as_id(member_id, organization)
subject_descriptor = get_descriptor_from_storage_key(member_id, client)
membership_details = client.add_membership(subject_descriptor=subject_descriptor,
container_descriptor=group_id)
lookup_keys = []
container = GraphSubjectLookupKey(membership_details.container_descriptor)
subject = GraphSubjectLookupKey(membership_details.member_descriptor)
lookup_keys.append(container)
lookup_keys.append(subject)
subject_lookup = GraphSubjectLookup(lookup_keys=lookup_keys)
membership_details = client.lookup_subjects(subject_lookup=subject_lookup)
return membership_details
def remove_membership(member_id, group_id, organization=None, detect=None):
"""Remove membership.
:param member_id: Descriptor of the group or Email Id of the user to be removed.
:type member_id: str
:param group_id: Descriptor of the group from which member needs to be removed.
:type group_id: str
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_graph_client(organization)
subject_descriptor = member_id
if '@' in member_id or '.' not in member_id:
member_id = resolve_identity_as_id(member_id, organization)
subject_descriptor = get_descriptor_from_storage_key(member_id, client)
try:
client.check_membership_existence(subject_descriptor=subject_descriptor,
container_descriptor=group_id)
membership_details = client.remove_membership(subject_descriptor=subject_descriptor,
container_descriptor=group_id)
except AzureDevOpsClientRequestError as ex:
logger.debug(ex, exc_info=True)
raise CLIError("Membership doesn't exists.")
return membership_details
def get_descriptor_from_storage_key(storage_key, client):
descriptor = client.get_descriptor(storage_key)
return descriptor
def _create_patch_operation(op, path, value):
patch_operation = JsonPatchOperation()
patch_operation.op = op
patch_operation.path = path
patch_operation.value = value
return patch_operation
|