1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from knack.util import CLIError
from knack.log import get_logger
from azext_devops.devops_sdk.v5_0.security.models import (AccessControlEntry)
from azext_devops.dev.common.services import (get_security_client,
resolve_instance)
from azext_devops.dev.common.identities import (get_identity_descriptor_from_subject_descriptor,
resolve_identity_as_identity_descriptor)
from .security_permission_helper import PermissionDetails
logger = get_logger(__name__)
def list_namespaces(local_only=False, organization=None, detect=None):
""" List all available namespaces for an organization.
:param local_only: If true, retrieve only local security namespaces.
:type local_only: bool
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
response = client.query_security_namespaces(local_only=local_only)
return response
def show_namespace(namespace_id, organization=None, detect=None):
""" Show details of permissions available in each namespace.
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
response = _get_permission_types(client, namespace_id)
return response
def list_tokens(namespace_id, subject, token=None,
recurse=False, organization=None, detect=None):
""" List tokens for given user/group and namespace.
:param recurse: If true and this is a hierarchical namespace, return child ACLs of the specified token.
:type recurse: bool
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
subject = _resolve_subject_as_identity_descriptor(subject, organization)
response = _query_permissions(client, namespace_id, subject, token, recurse)
return response
def show_permissions(namespace_id, subject, token, organization=None, detect=None):
""" Show permissions for given token, namespace and user/group.
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
subject = _resolve_subject_as_identity_descriptor(subject, organization)
list_response = _query_permissions(client, namespace_id, subject, token, False)
permissions_types = _get_permission_types(client, namespace_id)
resolved_permissions_response = _resolve_bits(list_response, permissions_types)
response = _update_json(list_response, resolved_permissions_response)
return response
def reset_all_permissions(namespace_id, subject, token, organization=None, detect=None):
""" Clear all permissions of this token for a user/group.
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
subject = _resolve_subject_as_identity_descriptor(subject, organization)
response = client.remove_access_control_entries(security_namespace_id=namespace_id,
token=token, descriptors=subject)
return response
def reset_permissions(namespace_id, permission_bit, subject, token, organization=None, detect=None):
""" Reset permission for given permission bit(s)
:param permission_bit: Permission bit or addition of permission bits which needs to be reset
for given user/group and token.
:type permission_bit:int
"""
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
subject = _resolve_subject_as_identity_descriptor(subject, organization)
client.remove_permission(security_namespace_id=namespace_id, permissions=permission_bit,
descriptor=subject, token=token)
# get the effective permission list for this namespace , token
list_response = _query_permissions(client, namespace_id, subject, token, False)
permissions_types = _get_permission_types(client, namespace_id)
resolved_permissions_response = _resolve_bits(list_response, permissions_types, permission_bit)
response = _update_json(list_response, resolved_permissions_response)
return response
def update_permissions(namespace_id, subject, token, merge=True, allow_bit=0, deny_bit=0,
organization=None, detect=None):
""" Assign allow or deny permission to given user/group.
"""
if allow_bit == 0 and deny_bit == 0:
raise CLIError('Either --allow-bit or --deny-bit parameter should be provided.')
organization = resolve_instance(detect=detect, organization=organization)
client = get_security_client(organization)
subject = _resolve_subject_as_identity_descriptor(subject, organization)
container_object = {}
aces_list = []
ace_object = AccessControlEntry(descriptor=subject, allow=allow_bit, deny=deny_bit)
aces_list.append(ace_object)
container_object['token'] = token
if merge:
container_object['merge'] = True
else:
container_object['merge'] = False
container_object['accessControlEntries'] = aces_list
client.set_access_control_entries(security_namespace_id=namespace_id, container=container_object)
allow_bit = allow_bit & (~deny_bit)
changed_bits = allow_bit + deny_bit
list_response = _query_permissions(client, namespace_id, subject, token, False)
permissions_types = _get_permission_types(client, namespace_id)
resolved_permissions_response = _resolve_bits(list_response, permissions_types, changed_bits)
response = _update_json(list_response, resolved_permissions_response)
return response
def _resolve_bits(response, permissions_types, changed_bits=0):
inherited_allow = 0
inherited_deny = 0
effective_allow = 0
effective_deny = 0
if len(response) > 1 or len(response[0].aces_dictionary) > 1:
raise CLIError('Multiple entries found in acesDictionary. Please filter the response by token.')
acl = response[0]
ace = list(acl.aces_dictionary.values())[0]
allow_bit = ace.allow
deny_bit = ace.deny
if acl.include_extended_info is True:
if ace.extended_info.effective_allow is not None:
effective_allow = ace.extended_info.effective_allow
if ace.extended_info.effective_deny is not None:
effective_deny = ace.extended_info.effective_deny
if acl.include_extended_info is True:
inherited_allow = allow_bit ^ effective_allow
inherited_deny = deny_bit ^ effective_deny
# If changed_bits is zero, display all permissions
if changed_bits == 0:
total_permission_types = len(permissions_types[0].actions)
last_permission_bit_value = permissions_types[0].actions[total_permission_types - 1].bit
changed_bits = 2 * last_permission_bit_value - 1
permission_response = []
for item in permissions_types[0].actions:
if changed_bits & item.bit:
permission_value_string = None
if effective_deny and item.bit & effective_deny:
permission_value_string = 'Deny'
if inherited_deny & item.bit:
permission_value_string = 'Deny (inherited)'
elif effective_allow and item.bit & effective_allow:
permission_value_string = 'Allow'
if inherited_allow & item.bit:
permission_value_string = 'Allow (inherited)'
else:
permission_value_string = 'Not set'
permission_obj = PermissionDetails()
permission_obj.bit = item.bit
permission_obj.name = item.name
permission_obj.display_name = item.display_name
permission_obj.effective_permission = permission_value_string
permission_response.append(permission_obj)
return permission_response
def _update_json(original_response, permissions_response):
response = []
for acl in original_response:
acl_value = acl.serialize()
for ace in acl_value['acesDictionary']:
acl_value['acesDictionary'][ace]['resolvedPermissions'] = permissions_response
response.append(acl_value)
return response
def _get_permission_types(client, namespace_id):
response = client.query_security_namespaces(security_namespace_id=namespace_id)
return response
def _query_permissions(client, namespace_id, subject, token, recurse):
list_response = client.query_access_control_lists(security_namespace_id=namespace_id,
token=token, descriptors=subject,
include_extended_info=True, recurse=recurse)
return list_response
def _resolve_subject_as_identity_descriptor(subject, organization):
if '@' in subject:
subject = resolve_identity_as_identity_descriptor(identity_filter=subject, organization=organization)
elif '.' in subject:
# try to solve graph subject descriptor for groups
subject = get_identity_descriptor_from_subject_descriptor(subject_descriptor=subject, organization=organization)
return subject
|