1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
#
# SPDX-License-Identifier: MIT
import unittest
from .test_server import WithClient, WithTestServer, TestServer
import secrets
import aiocoap.defaults
oscore_modules = aiocoap.defaults.oscore_missing_modules()
if not oscore_modules:
import aiocoap.oscore
import aiocoap.oscore_sitewrapper
import cbor2
_skip_unless_oscore = unittest.skipIf(
oscore_modules, "Modules missing for running OSCORE tests: %s" % (oscore_modules,)
)
class WithGroupKeys(unittest.IsolatedAsyncioTestCase):
def _set_up_algorithms(self):
self.alg_aead = aiocoap.oscore.algorithms[aiocoap.oscore.DEFAULT_ALGORITHM]
self.alg_group_enc = aiocoap.oscore.algorithms[aiocoap.oscore.DEFAULT_ALGORITHM]
self.hashfun = aiocoap.oscore.hashfunctions[aiocoap.oscore.DEFAULT_HASHFUNCTION]
self.alg_countersign = aiocoap.oscore.Ed25519()
self.alg_pairwise_key_agreement = aiocoap.oscore.EcdhSsHkdf256()
async def asyncSetUp(self):
self._set_up_algorithms()
group_id = b"G"
participants = [b"", b"\x01", b"longname"]
private_keys, creds = zip(
*(self.alg_countersign.generate_with_ccs() for _ in participants)
)
master_secret = secrets.token_bytes(64)
master_salt = b"PoCl4"
# This would only be processed when there is actual contact with the GM
gm_cred = b"dummy credential"
self.groups = [
aiocoap.oscore.SimpleGroupContext(
self.alg_aead,
self.hashfun,
self.alg_countersign,
self.alg_group_enc,
self.alg_pairwise_key_agreement,
group_id,
master_secret,
master_salt,
participants[i],
private_keys[i],
creds[i],
{
participants[j]: creds[j]
for j, _ in enumerate(participants)
if i != j
},
gm_cred,
group_manager_cred_fmt="dummy",
)
for i, _ in enumerate(participants)
]
await super().asyncSetUp()
class WithGroupServer(WithTestServer, WithGroupKeys):
async def asyncSetUp(self):
await super().asyncSetUp()
server_credentials = aiocoap.credentials.CredentialsMap()
server_credentials[":a"] = self.groups[0]
self.server.serversite = aiocoap.oscore_sitewrapper.OscoreSiteWrapper(
self.server.serversite, server_credentials
)
class WithGroupClient(WithClient, WithGroupKeys):
async def asyncSetUp(self):
await super().asyncSetUp()
self.client.client_credentials["coap://%s/*" % self.servernetloc] = self.groups[
1
]
@_skip_unless_oscore
class TestGroupOscore(TestServer, WithGroupServer, WithGroupClient):
pass
@_skip_unless_oscore
class TestGroupOscoreWithPairwise(TestGroupOscore):
async def asyncSetUp(self):
await super().asyncSetUp()
for k, v in self.client.client_credentials.items():
self.client.client_credentials[k] = v.pairwise_for(self.groups[0].sender_id)
@_skip_unless_oscore
class TestDifferentLengths(TestGroupOscore):
def _set_up_algorithms(self):
super()._set_up_algorithms()
# Override Group Encryption Algorithm to have 16 bytes nonce rather than the 13 of alg_aead
self.alg_group_enc = aiocoap.oscore.A128CBC
# For the different length tests, it completely suffices to just run one.
for _testname in dir(TestDifferentLengths):
if not _testname.startswith("test_"):
continue
if _testname != "test_big_resource":
setattr(TestDifferentLengths, _testname, None)
assert any(
getattr(TestDifferentLengths, _t) is not None
for _t in dir(TestDifferentLengths)
if _t.startswith("test_")
), "Removing tests left none"
@_skip_unless_oscore
class TestDifferentLengthsWithPairwise(
TestDifferentLengths, TestGroupOscoreWithPairwise
):
pass
del TestServer
|