1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
#
# SPDX-License-Identifier: MIT
"""This tests advanced cases of blockwise transfer; simple sequential transfers
are covered in test_server.TestServer.test_replacing_resource."""
import asyncio
import unittest
import aiocoap
import aiocoap.defaults
from .test_server import (
WithTestServer,
WithClient,
no_warnings,
asynctest,
BigResource,
BasicTestingSite,
)
class BigChunkyResource(BigResource):
async def render_get(self, request):
request.remote.maximum_block_size_exp = 3
return await super().render_get(request)
class ChunkyTestingSite(BasicTestingSite):
def __init__(self):
super().__init__()
self.add_resource(["big", "chunky"], BigChunkyResource())
class WithChunkyTestServer(WithTestServer):
TestingSite = ChunkyTestingSite
class TestBlockwise(WithChunkyTestServer, WithClient):
# tracked as https://github.com/chrysn/aiocoap/issues/58; behavior can be successful more or less by chance
@unittest.skip
@no_warnings
@asynctest
async def test_sequential(self):
"""Test whether the client serializes simultaneous block requests"""
pattern1 = b"01234 first pattern" + b"01" * 1024
pattern2 = b"01234 second pattern" + b"02" * 1024
request1 = aiocoap.Message(
uri="coap://" + self.servernetloc + "/replacing/one",
code=aiocoap.POST,
payload=pattern1,
)
request2 = aiocoap.Message(
uri="coap://" + self.servernetloc + "/replacing/one",
code=aiocoap.POST,
payload=pattern2,
)
responses = []
for response in asyncio.as_completed(
[self.client.request(r).response for r in [request1, request2]]
):
response = await response
self.assertTrue(
response.code.is_successful(),
"Simultaneous blockwise requests caused error.",
)
responses.append(response.payload)
self.assertSetEqual(
set(responses), set(x.replace(b"0", b"O") for x in (pattern1, pattern2))
)
@no_warnings
@asynctest
async def test_client_hints(self):
"""Test whether a handle_blockwise=True request takes a block2 option
set in it as a hint to start requesting with a low size right away
That way of hinting at size requests is not documented, but understood
and occasionally used; the test primarily serves to identify problems
the server has with initially low block sizes."""
resp = await self.client.request(
aiocoap.Message(
uri="coap://" + self.servernetloc + "/big",
block2=(0, 0, 3),
code=aiocoap.GET,
)
).response
self.assertEqual(resp.code, aiocoap.CONTENT, "Request was unsuccessful")
self.assertEqual(
self._count_received_messages(),
(len(resp.payload) + 127) // 128,
"Response not chunked into 128 bytes",
)
@no_warnings
@asynctest
async def test_server_hints(self):
"""Test whether the needs_blockwise server mechanism considers size
exponent limits of the remote.
Mutating the remote as it is done in the BigChunkyResource is not
documented, but helps verify that the information in
maximum_block_size_exp is generally used."""
resp = await self.client.request(
aiocoap.Message(
uri="coap://" + self.servernetloc + "/big/chunky",
code=aiocoap.GET,
)
).response
self.assertEqual(resp.code, aiocoap.CONTENT, "Request was unsuccessful")
self.assertEqual(
self._count_received_messages(),
(len(resp.payload) + 127) // 128,
"Response not chunked into 128 bytes",
)
@no_warnings
@asynctest
async def test_client_hints_block1(self):
"""Test whether a client can successfully indicate per-remote sizes
even for the block1 phase."""
# same as BigResource's content
payload = b"0123456789----------" * 512
reqmsg = aiocoap.Message(
uri="coap://" + self.servernetloc + "/replacing/one",
code=aiocoap.PUT,
payload=payload,
)
reqmsg.remote.maximum_block_size_exp = 3
resp = await self.client.request(reqmsg).response
self.assertEqual(resp.code, aiocoap.CHANGED, "Request was unsuccessful")
self.assertEqual(
self._count_received_messages(),
(len(payload) + 127) // 128,
"Response not chunked into 128 bytes",
)
_received_logmsg = "Incoming message <aiocoap.Message at"
def _count_received_messages(self):
# only client-side received empty-acks are counted
return sum(
self._received_logmsg in x.msg
for x in self.handler
if x.name != "coap-server"
)
|