1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
"""Tests for the vehicle Library."""
# pylint: disable=protected-access
import asyncio
import aiohttp
import pytest
from aresponses import Response, ResponsesMockServer
from vehicle import RDW
from vehicle.const import Dataset
from vehicle.exceptions import RDWConnectionError, RDWError
async def test_json_request(aresponses: ResponsesMockServer) -> None:
"""Test JSON response is handled correctly."""
aresponses.add(
"opendata.rdw.nl",
"/resource/m9d7-ebf2.json",
"GET",
aresponses.Response(
status=200,
headers={"Content-Type": "application/json"},
text='{"status": "ok"}',
),
)
async with aiohttp.ClientSession() as session:
rdw = RDW(session=session)
response = await rdw._request(Dataset.PLATED_VEHICLES)
assert response == '{"status": "ok"}'
await rdw.close()
async def test_internal_session(aresponses: ResponsesMockServer) -> None:
"""Test JSON response is handled correctly."""
aresponses.add(
"opendata.rdw.nl",
"/resource/m9d7-ebf2.json",
"GET",
aresponses.Response(
status=200,
headers={"Content-Type": "application/json"},
text='{"status": "ok"}',
),
)
async with RDW() as rdw:
response = await rdw._request(Dataset.PLATED_VEHICLES)
assert response == '{"status": "ok"}'
async def test_timeout(aresponses: ResponsesMockServer) -> None:
"""Test request timeout."""
# Faking a timeout by sleeping
async def response_handler(_: aiohttp.ClientResponse) -> Response:
"""Response handler for this test."""
await asyncio.sleep(2)
return aresponses.Response(body="Goodmorning!")
aresponses.add(
"opendata.rdw.nl",
"/resource/m9d7-ebf2.json",
"GET",
response_handler,
)
async with aiohttp.ClientSession() as session:
rdw = RDW(session=session, request_timeout=1)
with pytest.raises(RDWConnectionError):
assert await rdw._request(Dataset.PLATED_VEHICLES)
async def test_http_error400(aresponses: ResponsesMockServer) -> None:
"""Test HTTP 404 response handling."""
aresponses.add(
"opendata.rdw.nl",
"/resource/m9d7-ebf2.json",
"GET",
aresponses.Response(text="OMG PUPPIES!", status=404),
)
async with aiohttp.ClientSession() as session:
rdw = RDW(session=session)
with pytest.raises(RDWError):
assert await rdw._request(Dataset.PLATED_VEHICLES)
async def test_unexpected_response(aresponses: ResponsesMockServer) -> None:
"""Test unexpected response handling."""
aresponses.add(
"opendata.rdw.nl",
"/resource/m9d7-ebf2.json",
"GET",
aresponses.Response(text="OMG PUPPIES!", status=200),
)
async with aiohttp.ClientSession() as session:
rdw = RDW(session=session)
with pytest.raises(RDWError):
assert await rdw._request(Dataset.PLATED_VEHICLES)
async def test_license_plate_normalization() -> None:
"""Test normalization of license plates."""
assert RDW.normalize_license_plate("AB-12-34") == "AB1234"
assert RDW.normalize_license_plate("AB1234") == "AB1234"
assert RDW.normalize_license_plate("AB1-23-4") == "AB1234"
assert RDW.normalize_license_plate(" Ab1-23-4 ") == "AB1234"
|