1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
|
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
#
# SPDX-License-Identifier: MIT
"""A server suitable for running the OSCORE plug test series against it
See https://github.com/EricssonResearch/OSCOAP for the test suite
description."""
import sys
import asyncio
import logging
import argparse
from pathlib import Path
import aiocoap
import aiocoap.oscore as oscore
from aiocoap.oscore_sitewrapper import OscoreSiteWrapper
import aiocoap.error as error
from aiocoap.util.cli import AsyncCLIDaemon
import aiocoap.resource as resource
from aiocoap.credentials import CredentialsMap
from aiocoap.cli.common import add_server_arguments, server_context_from_arguments
from aiocoap.transports.oscore import OSCOREAddress
# In some nested combinations of unittest and coverage, the usually
# provided-by-default inclusion of local files does not work. Ensuring the
# local plugtest_common *can* be included.
import os.path
sys.path.append(os.path.dirname(__file__))
from plugtest_common import *
class PleaseUseOscore(error.ConstructionRenderableError):
code = aiocoap.UNAUTHORIZED
message = "This is an OSCORE plugtest, please use option %d"%aiocoap.numbers.optionnumbers.OptionNumber.OSCORE
def additional_verify_request_options(reference, request):
if request.opt.echo is not None:
# Silently accepting Echo as that's an artefact of B.1.2 recovery
reference.opt.echo = request.opt.echo
additional_verify("Request options as expected", reference.opt, request.opt)
class PlugtestResource(resource.Resource):
options = {}
expected_options = {}
async def render_get(self, request):
reference = aiocoap.Message(**self.expected_options)
if request.opt.observe is not None and 'observe' not in self.expected_options:
# workaround for test 4 hitting on Hello1
reference.opt.observe = request.opt.observe
additional_verify_request_options(reference, request)
return aiocoap.Message(payload=self.message.encode('ascii'), **self.options)
class Hello(PlugtestResource):
options = {'content_format': 0}
expected_options = {} # Uri-Path is stripped by the site
message = "Hello World!"
Hello1 = Hello # same, just registered with the site for protected access
class Hello2(Hello):
expected_options = {'uri_query': ['first=1']}
options = {'etag': b"\x2b", **Hello1.options}
class Hello3(Hello):
expected_options = {'accept': 0}
options = {'max_age': 5, **Hello1.options}
class Observe(PlugtestResource, aiocoap.interfaces.ObservableResource):
expected_options = {'observe': 0}
options = {'content_format': 0}
message = "one"
async def add_observation(self, request, serverobservation):
async def keep_entertained():
await asyncio.sleep(2)
serverobservation.trigger(aiocoap.Message(
mtype=aiocoap.CON, code=aiocoap.CONTENT,
payload=b"two", content_format=0,
))
await asyncio.sleep(2)
serverobservation.trigger(aiocoap.Message(
mtype=aiocoap.CON, code=aiocoap.INTERNAL_SERVER_ERROR,
payload=b"Terminate Observe", content_format=0,
))
t = asyncio.create_task(keep_entertained())
serverobservation.accept(t.cancel)
class Hello6(resource.Resource):
async def render_post(self, request):
additional_verify_request_options(aiocoap.Message(content_format=0), request)
additional_verify("Request payload as expected", request.payload, b"\x4a")
return aiocoap.Message(code=aiocoap.CHANGED, payload=b"\x4a", content_format=0)
class Hello7(resource.Resource):
async def render_put(self, request):
if request.opt.if_none_match:
print("This looks like test 10b")
additional_verify_request_options(aiocoap.Message(content_format=0, if_none_match=True), request)
additional_verify("Request payload as expected", request.payload, b"\x8a")
return aiocoap.Message(code=aiocoap.PRECONDITION_FAILED)
else:
print("This looks like test 9b")
additional_verify_request_options(aiocoap.Message(content_format=0, if_match=[b"{"]), request)
additional_verify("Request payload as expected", request.payload, b"z")
return aiocoap.Message(code=aiocoap.CHANGED)
class DeleteResource(resource.Resource):
async def render_delete(self, request):
additional_verify_request_options(aiocoap.Message(), request)
return aiocoap.Message(code=aiocoap.DELETED)
class BlockResource(PlugtestResource):
expected_options = {}
options = {'content_format': 0}
message = "This is a large resource\n" + "0123456789" * 101
class InnerBlockMixin:
# this might become general enough that it could replace response blockwise
# handler some day -- right now, i'm only doing the absolute minimum
# necessary to satisfy the use case
inner_default_szx = aiocoap.MAX_REGULAR_BLOCK_SIZE_EXP
async def render(self, request):
response = await super().render(request)
if request.opt.block2 is None:
szx = self.inner_default_szx
blockno = 0
else:
szx = request.opt.block2.size_exponent
blockno = request.opt.block2.block_number
return response._extract_block(blockno, szx)
class InnerBlockResource(InnerBlockMixin, BlockResource):
pass
class SeqnoManager(resource.ObservableResource):
def __init__(self, contextmap):
super().__init__()
self.contextmap = contextmap
for c in self.contextmap.values():
c.notification_hooks.append(self.updated_state)
async def render_get(self, request):
text = ""
for name in ('b', 'd'):
the_context = self.contextmap[':' + name]
# this direct access is technically outside the interface for a
# SecurityContext, but then again, there isn't one yet
text += """In context %s, next seqno is %d (persisted up to %d)\n""" % (name.upper(), the_context.sender_sequence_number, the_context.sequence_number_persisted)
if the_context.recipient_replay_window.is_initialized():
index = the_context.recipient_replay_window._index
bitfield = the_context.recipient_replay_window._bitfield
# Useless for the internal representation, but much more readable
while bitfield & 1:
bitfield >>= 1
index += 1
print(index, bitfield)
bitfield_values = [i + index for (i, v) in enumerate(bin(bitfield)[2:][::-1]) if v == '1']
text += """I've seen all sequence numbers lower than %d%s.""" % (
index,
", and also %s" % bitfield_values if bitfield else ""
)
else:
text += "The replay window is uninitialized"
text += "\n"
return aiocoap.Message(payload=text.encode('utf-8'), content_format=0)
async def render_put(self, request):
try:
number = int(request.payload.decode('utf8'))
except (ValueError, UnicodeDecodeError):
raise aiocoap.error.BadRequest("Only numeric values are accepted.")
raise NotImplementedError
class PlugtestSite(resource.Site):
def __init__(self, server_credentials):
super().__init__()
self.add_resource(['.well-known', 'core'], resource.WKCResource(self.get_resources_as_linkheader))
self.add_resource(['oscore', 'hello', 'coap'], Hello())
self.add_resource(['oscore', 'hello', '1'], Hello1())
self.add_resource(['oscore', 'hello', '2'], Hello2())
self.add_resource(['oscore', 'hello', '3'], Hello3())
self.add_resource(['oscore', 'hello', '6'], Hello6())
self.add_resource(['oscore', 'hello', '7'], Hello7())
self.add_resource(['oscore', 'observe1'], Observe())
self.add_resource(['oscore', 'observe2'], Observe())
self.add_resource(['oscore', 'test'], DeleteResource())
self.add_resource(['oscore', 'block', 'outer'], BlockResource())
self.add_resource(['oscore', 'block', 'inner'], InnerBlockResource())
self.add_resource(['sequence-numbers'], SeqnoManager(server_credentials))
class PlugtestServerProgram(AsyncCLIDaemon):
async def start(self):
p = argparse.ArgumentParser(description="Server for the OSCORE plug test. Requires a test number to be present.")
p.add_argument("contextdir", help="Directory name where to persist sequence numbers", type=Path)
p.add_argument('--verbose', help="Increase log level", action='store_true')
p.add_argument('--state-was-lost', help="Lose memory of the replay window, forcing B.1.2 recovery", action='store_true')
add_server_arguments(p)
opts = p.parse_args()
if opts.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
server_credentials = CredentialsMap()
server_credentials[':b'] = get_security_context('b', opts.contextdir / "b", opts.state_was_lost)
server_credentials[':d'] = get_security_context('d', opts.contextdir / "d", opts.state_was_lost)
site = PlugtestSite(server_credentials)
site = OscoreSiteWrapper(site, server_credentials)
self.context = await server_context_from_arguments(site, opts)
print("Plugtest server ready.")
sys.stdout.flush() # the unit tests might wait abundantly long for this otherwise
async def shutdown(self):
await self.context.shutdown()
if __name__ == "__main__":
PlugtestServerProgram.sync_main()
|