1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
|
from collections import defaultdict
from typing import Iterable, Optional, Set
from asn1crypto import x509
from ._state import ValProcState
from .errors import PathValidationError
def update_policy_tree(
certificate_policies,
valid_policy_tree: 'PolicyTreeRoot',
depth: int,
any_policy_uninhibited: bool,
) -> Optional['PolicyTreeRoot']:
"""
Internal method to update the policy tree during RFC 5280 validation.
"""
cert_any_policy = None
cert_policy_identifiers = set()
# Step 2 d 1
for policy in certificate_policies:
policy_identifier = policy['policy_identifier'].native
if policy_identifier == 'any_policy':
cert_any_policy = policy
continue
cert_policy_identifiers.add(policy_identifier)
policy_qualifiers = policy['policy_qualifiers']
policy_id_match = False
parent_any_policy = None
# Step 2 d 1 i
for node in valid_policy_tree.at_depth(depth - 1):
if node.valid_policy == 'any_policy':
parent_any_policy = node
if policy_identifier not in node.expected_policy_set:
continue
policy_id_match = True
node.add_child(
policy_identifier, policy_qualifiers, {policy_identifier}
)
# Step 2 d 1 ii
if not policy_id_match and parent_any_policy:
parent_any_policy.add_child(
policy_identifier, policy_qualifiers, {policy_identifier}
)
# Step 2 d 2
if cert_any_policy and any_policy_uninhibited:
for node in valid_policy_tree.at_depth(depth - 1):
for expected_policy_identifier in node.expected_policy_set:
if expected_policy_identifier not in cert_policy_identifiers:
node.add_child(
expected_policy_identifier,
cert_any_policy['policy_qualifiers'],
{expected_policy_identifier},
)
# Step 2 d 3
valid_policy_tree = _prune_policy_tree(valid_policy_tree, depth - 1)
return valid_policy_tree
def _prune_policy_tree(valid_policy_tree, depth):
for node in valid_policy_tree.walk_up(depth):
if not node.children:
node.parent.remove_child(node)
if not valid_policy_tree.children:
valid_policy_tree = None
return valid_policy_tree
def enumerate_policy_mappings(
mappings: Iterable[x509.PolicyMapping], proc_state: ValProcState
):
"""
Internal function to process policy mapping extension values into
a Python dictionary mapping issuer domain policies to the corresponding
policies in the subject policy domain.
"""
policy_map = defaultdict(set)
for mapping in mappings:
issuer_domain_policy = mapping['issuer_domain_policy'].native
subject_domain_policy = mapping['subject_domain_policy'].native
policy_map[issuer_domain_policy].add(subject_domain_policy)
# Step 3 a
if (
issuer_domain_policy == 'any_policy'
or subject_domain_policy == 'any_policy'
):
raise PathValidationError.from_state(
f"The path could not be validated because "
f"{proc_state.describe_cert()} contains "
f"a policy mapping for the \"any policy\"",
proc_state,
)
return policy_map
def apply_policy_mapping(
policy_map, valid_policy_tree, depth: int, policy_mapping_uninhibited: bool
):
"""
Internal function to apply the policy mapping to the current policy tree
in accordance with the algorithm in RFC 5280.
"""
for issuer_domain_policy, subject_domain_policies in policy_map.items():
# Step 3 b 1
if policy_mapping_uninhibited:
issuer_domain_policy_match = False
cert_any_policy = None
for node in valid_policy_tree.at_depth(depth):
if node.valid_policy == 'any_policy':
cert_any_policy = node
if node.valid_policy == issuer_domain_policy:
issuer_domain_policy_match = True
node.expected_policy_set = subject_domain_policies
if not issuer_domain_policy_match and cert_any_policy:
cert_any_policy.parent.add_child(
issuer_domain_policy,
cert_any_policy.qualifier_set,
subject_domain_policies,
)
# Step 3 b 2
else:
for node in valid_policy_tree.at_depth(depth):
if node.valid_policy == issuer_domain_policy:
node.parent.remove_child(node)
valid_policy_tree = _prune_policy_tree(valid_policy_tree, depth - 1)
return valid_policy_tree
def prune_unacceptable_policies(
path_length, valid_policy_tree, acceptable_policies
) -> Optional['PolicyTreeRoot']:
# Step 4 g iii 1: compute nodes that branch off any_policy
# In other words, find all policies that are valid and meaningful in
# the trust root(s) namespace. We don't care about what policy mapping
# transformed them into; that's taken care of by the validation
# algorithm.
# Note: set() consumes the iterator to avoid operating on the tree
# while iterating over it. Performance is probably not a concern
# anyhow.
valid_policy_node_set = set(valid_policy_tree.nodes_in_current_domain())
# Step 4 g iii 2: eliminate unacceptable policies
def _filter_acceptable():
for policy_node in valid_policy_node_set:
policy_id = policy_node.valid_policy
if policy_id == 'any_policy' or policy_id in acceptable_policies:
yield policy_id
else:
policy_node.parent.remove_child(policy_node)
# list of policies that were explicitly valid
valid_and_acceptable = set(_filter_acceptable())
# Step 4 g iii 3: if the final layer contains an anyPolicy node
# (there can be at most one), expand it out into acceptable policies
# that are not explicitly qualified already
try:
final_any_policy: PolicyTreeNode = next(
policy_node
for policy_node in valid_policy_tree.at_depth(path_length)
if policy_node.valid_policy == 'any_policy'
)
wildcard_parent = final_any_policy.parent
assert wildcard_parent is not None
wildcard_quals = final_any_policy.qualifier_set
for acceptable_policy in acceptable_policies - valid_and_acceptable:
wildcard_parent.add_child(
acceptable_policy, wildcard_quals, {acceptable_policy}
)
# prune the anyPolicy node
wildcard_parent.remove_child(final_any_policy)
except StopIteration:
pass
# Step 4 g iii 4: prune the policy tree
return _prune_policy_tree(valid_policy_tree, path_length - 1)
class PolicyTreeRoot:
"""
A generic policy tree node, used for the root node in the tree
"""
@classmethod
def init_policy_tree(cls, valid_policy, qualifier_set, expected_policy_set):
"""
Accepts values for a PolicyTreeNode that will be created at depth 0
:param valid_policy:
A unicode string of a policy name or OID
:param qualifier_set:
An instance of asn1crypto.x509.PolicyQualifierInfos
:param expected_policy_set:
A set of unicode strings containing policy names or OIDs
"""
root = PolicyTreeRoot()
root.add_child(valid_policy, qualifier_set, expected_policy_set)
return root
def __init__(self):
self.parent = None
self.children = []
def add_child(self, valid_policy, qualifier_set, expected_policy_set):
"""
Creates a new PolicyTreeNode as a child of this node
:param valid_policy:
A unicode string of a policy name or OID
:param qualifier_set:
An instance of asn1crypto.x509.PolicyQualifierInfos
:param expected_policy_set:
A set of unicode strings containing policy names or OIDs
"""
child = PolicyTreeNode(valid_policy, qualifier_set, expected_policy_set)
child.parent = self
self.children.append(child)
def remove_child(self, child):
"""
Removes a child from this node
:param child:
An instance of PolicyTreeNode
"""
self.children.remove(child)
def at_depth(self, depth) -> Iterable['PolicyTreeNode']:
"""
Returns a generator yielding all nodes in the tree at a specific depth
:param depth:
An integer >= 0 of the depth of nodes to yield
:return:
A generator yielding PolicyTreeNode objects
"""
for child in list(self.children):
if depth == 0:
yield child
else:
for grandchild in child.at_depth(depth - 1):
yield grandchild
def walk_up(self, depth):
"""
Returns a generator yielding all nodes in the tree at a specific depth,
or above. Yields nodes starting with leaves and traversing up to the
root.
:param depth:
An integer >= 0 of the depth of nodes to walk up from
:return:
A generator yielding PolicyTreeNode objects
"""
for child in list(self.children):
if depth != 0:
for grandchild in child.walk_up(depth - 1):
yield grandchild
yield child
def nodes_in_current_domain(self) -> Iterable['PolicyTreeNode']:
"""
Returns a generator yielding all nodes in the tree that are children
of an ``any_policy`` node.
"""
for child in self.children:
yield child
if child.valid_policy == 'any_policy':
yield from child.nodes_in_current_domain()
class PolicyTreeNode(PolicyTreeRoot):
"""
A policy tree node that is used for all nodes but the root
"""
def __init__(
self,
valid_policy: str,
qualifier_set: x509.PolicyQualifierInfos,
expected_policy_set: Set[str],
):
"""
:param valid_policy:
A unicode string of a policy name or OID
:param qualifier_set:
An instance of asn1crypto.x509.PolicyQualifierInfos
:param expected_policy_set:
A set of unicode strings containing policy names or OIDs
"""
super().__init__()
self.valid_policy = valid_policy
self.qualifier_set = qualifier_set
self.expected_policy_set = expected_policy_set
def path_to_root(self):
node = self
while node is not None:
yield node
node = node.parent
|