File: plugtest_common.py

package info (click to toggle)
aiocoap 0.4.14-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,756 kB
  • sloc: python: 16,846; makefile: 23; sh: 9
file content (78 lines) | stat: -rw-r--r-- 2,401 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
#
# SPDX-License-Identifier: MIT

import shutil
from pathlib import Path

import cbor2 as cbor

from aiocoap import oscore

contextdir = Path(__file__).parent / "common-context"


class LoggingFilesystemSecurityContext(oscore.FilesystemSecurityContext):
    def _extract_external_aad(self, message, request_id, local_is_sender):
        result = super()._extract_external_aad(message, request_id, local_is_sender)
        print(
            "Verify: External AAD: bytes.fromhex(%r), %r"
            % (result.hex(), cbor.loads(result))
        )
        return result


class NotifyingPlugtestSecurityContext(oscore.FilesystemSecurityContext):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.notification_hooks = []

    def notify(self):
        for x in self.notification_hooks:
            x()

    def post_seqnoincrease(self):
        super().post_seqnoincrease()
        self.notify()

    def _replay_window_changed(self):
        super()._replay_window_changed()
        self.notify()


class PlugtestFilesystemSecurityContext(
    LoggingFilesystemSecurityContext, NotifyingPlugtestSecurityContext
):
    pass


def get_security_context(
    contextname, contextcopy: Path, simulate_unclean_shutdown=False
):
    """Copy the base context (disambiguated by contextname in "ab", "cd") onto
    the path in contextcopy if it does not already exist, and load the
    resulting context with the given role. The context will be monkey-patched
    for debugging purposes.

    With the simulate_unclean_shutdown aprameter set to True, any existing
    replay window is removed from the loaded state."""
    if not contextcopy.exists():
        contextcopy.parent.mkdir(parents=True, exist_ok=True)
        shutil.copytree((contextdir / contextname), contextcopy)

        print("Context %s copied to %s" % (contextname, contextcopy))

    secctx = PlugtestFilesystemSecurityContext(contextcopy)

    if simulate_unclean_shutdown:
        secctx.recipient_replay_window._index = None
        secctx.recipient_replay_window._bitfield = None

    return secctx


def additional_verify(description, lhs, rhs):
    if lhs == rhs:
        print("Additional verify passed: %s" % description)
    else:
        print("Additional verify failed (%s != %s): %s" % (lhs, rhs, description))