File: policy_tree.py

package info (click to toggle)
python-pyhanko-certvalidator 0.26.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,956 kB
  • sloc: python: 9,254; sh: 47; makefile: 4
file content (332 lines) | stat: -rw-r--r-- 10,970 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
from collections import defaultdict
from typing import Iterable, Optional, Set

from asn1crypto import x509

from ._state import ValProcState
from .errors import PathValidationError


def update_policy_tree(
    certificate_policies,
    valid_policy_tree: 'PolicyTreeRoot',
    depth: int,
    any_policy_uninhibited: bool,
) -> Optional['PolicyTreeRoot']:
    """
    Internal method to update the policy tree during RFC 5280 validation.
    """

    cert_any_policy = None
    cert_policy_identifiers = set()

    # Step 2 d 1
    for policy in certificate_policies:
        policy_identifier = policy['policy_identifier'].native

        if policy_identifier == 'any_policy':
            cert_any_policy = policy
            continue

        cert_policy_identifiers.add(policy_identifier)

        policy_qualifiers = policy['policy_qualifiers']

        policy_id_match = False
        parent_any_policy = None

        # Step 2 d 1 i
        for node in valid_policy_tree.at_depth(depth - 1):
            if node.valid_policy == 'any_policy':
                parent_any_policy = node
            if policy_identifier not in node.expected_policy_set:
                continue
            policy_id_match = True
            node.add_child(
                policy_identifier, policy_qualifiers, {policy_identifier}
            )

        # Step 2 d 1 ii
        if not policy_id_match and parent_any_policy:
            parent_any_policy.add_child(
                policy_identifier, policy_qualifiers, {policy_identifier}
            )

    # Step 2 d 2
    if cert_any_policy and any_policy_uninhibited:
        for node in valid_policy_tree.at_depth(depth - 1):
            for expected_policy_identifier in node.expected_policy_set:
                if expected_policy_identifier not in cert_policy_identifiers:
                    node.add_child(
                        expected_policy_identifier,
                        cert_any_policy['policy_qualifiers'],
                        {expected_policy_identifier},
                    )

    # Step 2 d 3
    valid_policy_tree = _prune_policy_tree(valid_policy_tree, depth - 1)
    return valid_policy_tree


def _prune_policy_tree(valid_policy_tree, depth):
    for node in valid_policy_tree.walk_up(depth):
        if not node.children:
            node.parent.remove_child(node)
    if not valid_policy_tree.children:
        valid_policy_tree = None
    return valid_policy_tree


def enumerate_policy_mappings(
    mappings: Iterable[x509.PolicyMapping], proc_state: ValProcState
):
    """
    Internal function to process policy mapping extension values into
    a Python dictionary mapping issuer domain policies to the corresponding
    policies in the subject policy domain.
    """
    policy_map = defaultdict(set)
    for mapping in mappings:
        issuer_domain_policy = mapping['issuer_domain_policy'].native
        subject_domain_policy = mapping['subject_domain_policy'].native

        policy_map[issuer_domain_policy].add(subject_domain_policy)

        # Step 3 a
        if (
            issuer_domain_policy == 'any_policy'
            or subject_domain_policy == 'any_policy'
        ):
            raise PathValidationError.from_state(
                f"The path could not be validated because "
                f"{proc_state.describe_cert()} contains "
                f"a policy mapping for the \"any policy\"",
                proc_state,
            )

    return policy_map


def apply_policy_mapping(
    policy_map, valid_policy_tree, depth: int, policy_mapping_uninhibited: bool
):
    """
    Internal function to apply the policy mapping to the current policy tree
    in accordance with the algorithm in RFC 5280.
    """

    for issuer_domain_policy, subject_domain_policies in policy_map.items():
        # Step 3 b 1
        if policy_mapping_uninhibited:
            issuer_domain_policy_match = False
            cert_any_policy = None

            for node in valid_policy_tree.at_depth(depth):
                if node.valid_policy == 'any_policy':
                    cert_any_policy = node
                if node.valid_policy == issuer_domain_policy:
                    issuer_domain_policy_match = True
                    node.expected_policy_set = subject_domain_policies

            if not issuer_domain_policy_match and cert_any_policy:
                cert_any_policy.parent.add_child(
                    issuer_domain_policy,
                    cert_any_policy.qualifier_set,
                    subject_domain_policies,
                )

        # Step 3 b 2
        else:
            for node in valid_policy_tree.at_depth(depth):
                if node.valid_policy == issuer_domain_policy:
                    node.parent.remove_child(node)
            valid_policy_tree = _prune_policy_tree(valid_policy_tree, depth - 1)
    return valid_policy_tree


def prune_unacceptable_policies(
    path_length, valid_policy_tree, acceptable_policies
) -> Optional['PolicyTreeRoot']:
    # Step 4 g iii 1: compute nodes that branch off any_policy
    #  In other words, find all policies that are valid and meaningful in
    #  the trust root(s) namespace. We don't care about what policy mapping
    #  transformed them into; that's taken care of by the validation
    #  algorithm.
    #  Note: set() consumes the iterator to avoid operating on the tree
    #  while iterating over it. Performance is probably not a concern
    #  anyhow.
    valid_policy_node_set = set(valid_policy_tree.nodes_in_current_domain())

    # Step 4 g iii 2: eliminate unacceptable policies
    def _filter_acceptable():
        for policy_node in valid_policy_node_set:
            policy_id = policy_node.valid_policy
            if policy_id == 'any_policy' or policy_id in acceptable_policies:
                yield policy_id
            else:
                policy_node.parent.remove_child(policy_node)

    # list of policies that were explicitly valid
    valid_and_acceptable = set(_filter_acceptable())

    # Step 4 g iii 3: if the final layer contains an anyPolicy node
    # (there can be at most one), expand it out into acceptable policies
    # that are not explicitly qualified already
    try:
        final_any_policy: PolicyTreeNode = next(
            policy_node
            for policy_node in valid_policy_tree.at_depth(path_length)
            if policy_node.valid_policy == 'any_policy'
        )
        wildcard_parent = final_any_policy.parent
        assert wildcard_parent is not None
        wildcard_quals = final_any_policy.qualifier_set
        for acceptable_policy in acceptable_policies - valid_and_acceptable:
            wildcard_parent.add_child(
                acceptable_policy, wildcard_quals, {acceptable_policy}
            )
        # prune the anyPolicy node
        wildcard_parent.remove_child(final_any_policy)
    except StopIteration:
        pass

    # Step 4 g iii 4: prune the policy tree
    return _prune_policy_tree(valid_policy_tree, path_length - 1)


class PolicyTreeRoot:
    """
    A generic policy tree node, used for the root node in the tree
    """

    @classmethod
    def init_policy_tree(cls, valid_policy, qualifier_set, expected_policy_set):
        """
        Accepts values for a PolicyTreeNode that will be created at depth 0

        :param valid_policy:
            A unicode string of a policy name or OID

        :param qualifier_set:
            An instance of asn1crypto.x509.PolicyQualifierInfos

        :param expected_policy_set:
            A set of unicode strings containing policy names or OIDs
        """
        root = PolicyTreeRoot()
        root.add_child(valid_policy, qualifier_set, expected_policy_set)
        return root

    def __init__(self):
        self.parent = None
        self.children = []

    def add_child(self, valid_policy, qualifier_set, expected_policy_set):
        """
        Creates a new PolicyTreeNode as a child of this node

        :param valid_policy:
            A unicode string of a policy name or OID

        :param qualifier_set:
            An instance of asn1crypto.x509.PolicyQualifierInfos

        :param expected_policy_set:
            A set of unicode strings containing policy names or OIDs
        """

        child = PolicyTreeNode(valid_policy, qualifier_set, expected_policy_set)
        child.parent = self
        self.children.append(child)

    def remove_child(self, child):
        """
        Removes a child from this node

        :param child:
            An instance of PolicyTreeNode
        """

        self.children.remove(child)

    def at_depth(self, depth) -> Iterable['PolicyTreeNode']:
        """
        Returns a generator yielding all nodes in the tree at a specific depth

        :param depth:
            An integer >= 0 of the depth of nodes to yield

        :return:
            A generator yielding PolicyTreeNode objects
        """

        for child in list(self.children):
            if depth == 0:
                yield child
            else:
                for grandchild in child.at_depth(depth - 1):
                    yield grandchild

    def walk_up(self, depth):
        """
        Returns a generator yielding all nodes in the tree at a specific depth,
        or above. Yields nodes starting with leaves and traversing up to the
        root.

        :param depth:
            An integer >= 0 of the depth of nodes to walk up from

        :return:
            A generator yielding PolicyTreeNode objects
        """

        for child in list(self.children):
            if depth != 0:
                for grandchild in child.walk_up(depth - 1):
                    yield grandchild
            yield child

    def nodes_in_current_domain(self) -> Iterable['PolicyTreeNode']:
        """
        Returns a generator yielding all nodes in the tree that are children
        of an ``any_policy`` node.
        """

        for child in self.children:
            yield child
            if child.valid_policy == 'any_policy':
                yield from child.nodes_in_current_domain()


class PolicyTreeNode(PolicyTreeRoot):
    """
    A policy tree node that is used for all nodes but the root
    """

    def __init__(
        self,
        valid_policy: str,
        qualifier_set: x509.PolicyQualifierInfos,
        expected_policy_set: Set[str],
    ):
        """
        :param valid_policy:
            A unicode string of a policy name or OID

        :param qualifier_set:
            An instance of asn1crypto.x509.PolicyQualifierInfos

        :param expected_policy_set:
            A set of unicode strings containing policy names or OIDs
        """
        super().__init__()

        self.valid_policy = valid_policy
        self.qualifier_set = qualifier_set
        self.expected_policy_set = expected_policy_set

    def path_to_root(self):
        node = self
        while node is not None:
            yield node
            node = node.parent