File: test_rdw.py

package info (click to toggle)
python-vehicle 2.2.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 432 kB
  • sloc: python: 395; makefile: 8; sh: 5
file content (107 lines) | stat: -rw-r--r-- 3,415 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Tests for the vehicle Library."""
# pylint: disable=protected-access
import asyncio

import aiohttp
import pytest
from aresponses import Response, ResponsesMockServer

from vehicle import RDW
from vehicle.const import Dataset
from vehicle.exceptions import RDWConnectionError, RDWError


async def test_json_request(aresponses: ResponsesMockServer) -> None:
    """Test JSON response is handled correctly."""
    aresponses.add(
        "opendata.rdw.nl",
        "/resource/m9d7-ebf2.json",
        "GET",
        aresponses.Response(
            status=200,
            headers={"Content-Type": "application/json"},
            text='{"status": "ok"}',
        ),
    )
    async with aiohttp.ClientSession() as session:
        rdw = RDW(session=session)
        response = await rdw._request(Dataset.PLATED_VEHICLES)
        assert response == '{"status": "ok"}'
        await rdw.close()


async def test_internal_session(aresponses: ResponsesMockServer) -> None:
    """Test JSON response is handled correctly."""
    aresponses.add(
        "opendata.rdw.nl",
        "/resource/m9d7-ebf2.json",
        "GET",
        aresponses.Response(
            status=200,
            headers={"Content-Type": "application/json"},
            text='{"status": "ok"}',
        ),
    )
    async with RDW() as rdw:
        response = await rdw._request(Dataset.PLATED_VEHICLES)
        assert response == '{"status": "ok"}'


async def test_timeout(aresponses: ResponsesMockServer) -> None:
    """Test request timeout."""

    # Faking a timeout by sleeping
    async def response_handler(_: aiohttp.ClientResponse) -> Response:
        """Response handler for this test."""
        await asyncio.sleep(2)
        return aresponses.Response(body="Goodmorning!")

    aresponses.add(
        "opendata.rdw.nl",
        "/resource/m9d7-ebf2.json",
        "GET",
        response_handler,
    )

    async with aiohttp.ClientSession() as session:
        rdw = RDW(session=session, request_timeout=1)
        with pytest.raises(RDWConnectionError):
            assert await rdw._request(Dataset.PLATED_VEHICLES)


async def test_http_error400(aresponses: ResponsesMockServer) -> None:
    """Test HTTP 404 response handling."""
    aresponses.add(
        "opendata.rdw.nl",
        "/resource/m9d7-ebf2.json",
        "GET",
        aresponses.Response(text="OMG PUPPIES!", status=404),
    )

    async with aiohttp.ClientSession() as session:
        rdw = RDW(session=session)
        with pytest.raises(RDWError):
            assert await rdw._request(Dataset.PLATED_VEHICLES)


async def test_unexpected_response(aresponses: ResponsesMockServer) -> None:
    """Test unexpected response handling."""
    aresponses.add(
        "opendata.rdw.nl",
        "/resource/m9d7-ebf2.json",
        "GET",
        aresponses.Response(text="OMG PUPPIES!", status=200),
    )

    async with aiohttp.ClientSession() as session:
        rdw = RDW(session=session)
        with pytest.raises(RDWError):
            assert await rdw._request(Dataset.PLATED_VEHICLES)


async def test_license_plate_normalization() -> None:
    """Test normalization of license plates."""
    assert RDW.normalize_license_plate("AB-12-34") == "AB1234"
    assert RDW.normalize_license_plate("AB1234") == "AB1234"
    assert RDW.normalize_license_plate("AB1-23-4") == "AB1234"
    assert RDW.normalize_license_plate(" Ab1-23-4 ") == "AB1234"