File: time_slide.py

package info (click to toggle)
python-pyhanko-certvalidator 0.26.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,956 kB
  • sloc: python: 9,254; sh: 47; makefile: 4
file content (444 lines) | stat: -rw-r--r-- 16,260 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
import asyncio
from datetime import datetime, timedelta
from typing import Iterable, List, Optional, Set, Tuple

from asn1crypto import algos, keys, x509

from pyhanko_certvalidator._state import ValProcState
from pyhanko_certvalidator.errors import (
    DisallowedAlgorithmError,
    InsufficientPOEError,
    InsufficientRevinfoError,
    RevokedError,
)
from pyhanko_certvalidator.ltv.types import (
    ValidationTimingInfo,
    ValidationTimingParams,
)
from pyhanko_certvalidator.path import ValidationPath
from pyhanko_certvalidator.policy_decl import (
    AlgorithmUsagePolicy,
    CertRevTrustPolicy,
    RevocationCheckingRule,
)
from pyhanko_certvalidator.revinfo.archival import RevinfoContainer
from pyhanko_certvalidator.revinfo.manager import RevinfoManager
from pyhanko_certvalidator.revinfo.validate_crl import (
    CRLOfInterest,
    _check_cert_on_crl_and_delta,
    _CRLErrs,
    collect_relevant_crls_with_paths,
)
from pyhanko_certvalidator.revinfo.validate_ocsp import (
    OCSPResponseOfInterest,
    _check_ocsp_status,
    collect_relevant_responses_with_paths,
)
from pyhanko_certvalidator.util import ConsList

__all__ = ['time_slide', 'ades_gather_prima_facie_revinfo']


async def ades_gather_prima_facie_revinfo(
    path: ValidationPath,
    revinfo_manager: RevinfoManager,
    control_time: datetime,
    revocation_checking_rule: RevocationCheckingRule,
) -> Tuple[List[CRLOfInterest], List[OCSPResponseOfInterest]]:
    """
    Gather potentially relevant revocation information for the leaf
    certificate of a candidate validation path.
    Only the scope of the revocation information will be checked, no
    detailed validation will occur.

    :param path:
        The candidate validation path.
    :param revinfo_manager:
        The revocation info manager.
    :param control_time:
        The time horizon that serves as a relevance cutoff.
    :param revocation_checking_rule:
        Revocation info rule controlling which kind(s) of revocation
        information will be fetched.
    :return:
        A 2-element tuple containing a list of the fetched CRLs and
        OCSP responses, respectively.
    """

    cert = path.leaf
    if revocation_checking_rule.ocsp_relevant:
        ocsp_result = await collect_relevant_responses_with_paths(
            cert, path, revinfo_manager, control_time
        )
        ocsps = ocsp_result.responses
    else:
        ocsps = []

    if revocation_checking_rule.crl_relevant:
        crl_result = await collect_relevant_crls_with_paths(
            cert, path, revinfo_manager, control_time
        )
        crls = crl_result.crls
    else:
        crls = []
    return crls, ocsps


def _tails(path: ValidationPath):
    cur_path = path
    yield cur_path, True
    while cur_path.pkix_len > 1:
        cur_path = cur_path.copy_and_drop_leaf()
        yield cur_path, False


def _apply_algo_policy(
    algo_policy: AlgorithmUsagePolicy,
    algo_used: algos.SignedDigestAlgorithm,
    control_time: datetime,
    public_key: keys.PublicKeyInfo,
    val_proc_state: ValProcState,
):
    sig_constraint = algo_policy.signature_algorithm_allowed(
        algo_used, control_time, public_key
    )
    algo_name = algo_used['algorithm'].native
    if not sig_constraint.allowed:
        if sig_constraint.not_allowed_after:
            # rewind the clock up until the point where the algorithm
            # was actually permissible
            control_time = min(control_time, sig_constraint.not_allowed_after)
        else:
            msg = (
                f"Algorithm {algo_name} is banned outright without "
                f"time constraints."
            )
            if sig_constraint.failure_reason is not None:
                msg += f" Reason: {sig_constraint.failure_reason}"
            raise DisallowedAlgorithmError.from_state(
                msg,
                val_proc_state,
                banned_since=None,
            )
    return control_time


def _update_control_time_for_unrevoked(
    control_time: datetime,
    revinfo_container: RevinfoContainer,
    rev_trust_policy: CertRevTrustPolicy,
    time_tolerance: timedelta,
):
    # if the cert is not on the list, we need the freshness check
    usability = revinfo_container.usable_at(
        rev_trust_policy,
        ValidationTimingParams(
            timing_info=ValidationTimingInfo(
                validation_time=control_time,
                best_signature_time=control_time,
                point_in_time_validation=True,
            ),
            time_tolerance=time_tolerance,
        ),
    )
    issuance_date = revinfo_container.issuance_date
    if not usability.rating.usable_ades:
        # set the control time to the issuance date / last usable date
        # (note: the TOO_NEW check is to prevent problems
        #  with freshness policies involving cooldown periods,
        #  which aren't really supported in the time sliding
        #  algorithm, but hey)
        # NOTE: the spec mandates using the issuance date here, but I believe
        # that's wrong: the last date at which the revinfo is still considered
        # fresh should be used instead. This distinction matters, since
        # (especially when CRLs are used) the issuance date of the revinfo
        # is often before the signature time.
        cutoff_date = usability.last_usable_at or issuance_date
        if cutoff_date is not None:
            control_time = min(cutoff_date, control_time)
    return control_time


def _update_control_time(
    revoked_date: Optional[datetime],
    control_time: datetime,
    revinfo_container: RevinfoContainer,
    algo_policy: Optional[AlgorithmUsagePolicy],
    issuer_public_key: keys.PublicKeyInfo,
    val_proc_state: ValProcState,
):
    if revoked_date:
        # this means we have to update control_time
        control_time = min(revoked_date, control_time)
    algo_used = revinfo_container.revinfo_sig_mechanism_used
    if algo_policy is not None and algo_used is not None:
        control_time = _apply_algo_policy(
            algo_policy,
            algo_used,
            control_time,
            issuer_public_key,
            val_proc_state,
        )
    return control_time


async def _time_slide(
    path: ValidationPath,
    init_control_time: datetime,
    revinfo_manager: RevinfoManager,
    rev_trust_policy: CertRevTrustPolicy,
    algo_usage_policy: Optional[AlgorithmUsagePolicy],
    # TODO use policy objects
    time_tolerance: timedelta,
    cert_stack: ConsList[bytes],
    path_stack: ConsList[ValidationPath],
) -> datetime:
    control_time = init_control_time
    checking_policy = rev_trust_policy.revocation_checking_policy

    # For zero-length paths, there is nothing to check
    if path.pkix_len == 0:
        return init_control_time

    # The ETSI algorithm requires us to collect revinfo for each
    # cert in the path, starting with the first (after the root).
    # Since our revinfo collection methods require paths instead of individual
    # certs, we instead loop over partial paths
    partial_paths = list(reversed(list(_tails(path))))
    poe_manager = revinfo_manager.poe_manager
    for current_path, is_ee in partial_paths:
        crls, ocsps = await ades_gather_prima_facie_revinfo(
            current_path,
            revinfo_manager=revinfo_manager,
            control_time=control_time,
            revocation_checking_rule=(
                checking_policy.ee_certificate_rule
                if is_ee
                else checking_policy.intermediate_ca_cert_rule
            ),
        )
        cert = current_path.leaf
        new_cert_stack = cert_stack.cons(cert.dump())
        new_path_stack = path_stack.cons(path)

        proc_state = ValProcState(cert_path_stack=new_path_stack)

        if poe_manager[cert] > control_time:
            raise InsufficientPOEError.from_state(
                f"No proof of existence available for certificate "
                f"{cert.subject.human_friendly} at control time "
                f"{control_time.isoformat()}.",
                proc_state,
            )
        if not crls and not ocsps:
            if isinstance(cert, x509.Certificate):
                ident = cert.subject.human_friendly
            else:
                ident = "attribute certificate"

            # don't raise an error for revo-exempt certs (OCSP responders)
            if cert.ocsp_no_check_value is None:
                raise InsufficientRevinfoError.from_state(
                    f"No revocation info from before {control_time.isoformat()}"
                    f" found for certificate {ident}.",
                    proc_state,
                )

        once_revoked = False
        most_recent_crl = None
        # We always take the chain of trust of a CRL/OCSP response
        # at face value
        for crl_of_interest in crls:
            # skip CRLs that are no longer relevant
            issued = crl_of_interest.crl.issuance_date
            if (
                not issued
                or issued > control_time
                or poe_manager[crl_of_interest.crl] > control_time
            ):
                continue
            sub_paths = crl_of_interest.prov_paths

            # recurse into the paths associated with the CRL and adjust
            # the control time accordingly
            # don't bother checking issuers that already appear
            # in the chain of trust that we're currently looking into
            sub_path_skip_list: Set[bytes] = set(new_cert_stack) | set(
                cert.dump() for cert in current_path
            )
            sub_path_control_times = await asyncio.gather(
                *(
                    _time_slide(
                        crl_path.path,
                        control_time,
                        revinfo_manager,
                        rev_trust_policy,
                        algo_usage_policy,
                        time_tolerance,
                        cert_stack=new_cert_stack,
                        path_stack=new_path_stack,
                    )
                    for crl_path in sub_paths
                    if (
                        crl_path.path.leaf
                        and crl_path.path.leaf.dump() not in sub_path_skip_list
                    )
                )
            )
            control_time = min([control_time, *sub_path_control_times])

            for candidate_crl_path in sub_paths:
                revoked_date, revoked_reason = _check_cert_on_crl_and_delta(
                    crl_issuer=candidate_crl_path.path.leaf,
                    cert=cert,
                    certificate_list_cont=crl_of_interest.crl,
                    delta_certificate_list_cont=candidate_crl_path.delta,
                    errs=_CRLErrs(),
                )
                crl_iss_cert = candidate_crl_path.path.leaf
                assert isinstance(crl_iss_cert, x509.Certificate)

                once_revoked |= revoked_date is not None

                crl_container = crl_of_interest.crl
                if (
                    most_recent_crl is None
                    or most_recent_crl.issuance_date
                    < crl_container.issuance_date
                ):
                    most_recent_crl = crl_container
                control_time = _update_control_time(
                    revoked_date,
                    control_time,
                    revinfo_container=crl_container,
                    algo_policy=algo_usage_policy,
                    issuer_public_key=crl_iss_cert.public_key,
                    val_proc_state=proc_state,
                )

        most_recent_ocsp = None
        for ocsp_of_interest in ocsps:
            ocsp_container = ocsp_of_interest.ocsp_response
            issued = ocsp_container.issuance_date
            if (
                not issued
                or issued > control_time
                or poe_manager[ocsp_of_interest.ocsp_response] > control_time
            ):
                continue

            control_time = await _time_slide(
                ocsp_of_interest.prov_path,
                control_time,
                revinfo_manager,
                rev_trust_policy,
                algo_usage_policy,
                time_tolerance,
                cert_stack=new_cert_stack,
                path_stack=new_path_stack,
            )
            try:
                _check_ocsp_status(
                    ocsp_response=ocsp_container,
                    proc_state=ValProcState(cert_path_stack=new_path_stack),
                    control_time=control_time,
                )
                revoked_date = None
            except RevokedError as e:
                revoked_date = e.revocation_dt

            once_revoked |= revoked_date is not None
            ocsp_iss_cert = ocsp_of_interest.prov_path.leaf
            assert isinstance(ocsp_iss_cert, x509.Certificate)
            if (
                most_recent_ocsp is None
                or most_recent_ocsp.issuance_date < issued
            ):
                most_recent_ocsp = ocsp_container
            control_time = _update_control_time(
                revoked_date,
                control_time,
                revinfo_container=ocsp_container,
                algo_policy=algo_usage_policy,
                issuer_public_key=ocsp_iss_cert.public_key,
                val_proc_state=proc_state,
            )
        # check the algorithm constraints for the certificate itself
        if algo_usage_policy is not None:
            leaf_ca = list(current_path.iter_authorities())[-1]
            control_time = _apply_algo_policy(
                algo_usage_policy,
                cert['signature_algorithm'],
                control_time,
                leaf_ca.public_key,
                val_proc_state=proc_state,
            )

        # (c) if the certificate was not marked as revoked -> update
        # based on the freshness of the most recent piece of revinfo
        if not once_revoked:
            revinfo_items: Iterable[RevinfoContainer] = [
                x for x in (most_recent_ocsp, most_recent_crl) if x is not None
            ]
            most_recent_revinfo = max(
                revinfo_items,
                key=lambda x: x.issuance_date or control_time,
                default=None,
            )
            if most_recent_revinfo is not None:
                control_time = _update_control_time_for_unrevoked(
                    control_time=control_time,
                    revinfo_container=most_recent_revinfo,
                    rev_trust_policy=rev_trust_policy,
                    time_tolerance=time_tolerance,
                )

    return control_time


async def time_slide(
    path: ValidationPath,
    init_control_time: datetime,
    revinfo_manager: RevinfoManager,
    rev_trust_policy: CertRevTrustPolicy,
    algo_usage_policy: Optional[AlgorithmUsagePolicy],
    time_tolerance: timedelta,
) -> datetime:
    """
    Execute the ETSI EN 319 102-1 time slide algorithm against the given path.

    .. warning::
        This is incubating internal API.

    .. note::
        This implementation will also attempt to take into account chains of
        trust of indirect CRLs. This is not a requirement of the specification,
        but also somewhat unlikely to arise in practice in cases where AdES
        compliance actually matters.

    :param path:
        The prospective validation path against which to execute the time slide
        algorithm.
    :param init_control_time:
        The initial control time, typically the current time.
    :param revinfo_manager:
        The revocation info manager.
    :param rev_trust_policy:
        The trust policy for revocation information.
    :param algo_usage_policy:
        The algorithm usage policy.
    :param time_tolerance:
        The tolerance to apply when evaluating time-related constraints.
    :return:
        The resulting control time.
    """
    return await _time_slide(
        path,
        init_control_time,
        revinfo_manager,
        rev_trust_policy,
        algo_usage_policy,
        time_tolerance,
        cert_stack=ConsList.empty(),
        path_stack=ConsList.empty(),
    )