1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
"""Asynchronous client for the Open-Meteo API."""
# pylint: disable=protected-access
import aiohttp
import pytest
from aresponses import ResponsesMockServer
from yarl import URL
from open_meteo import OpenMeteo
from open_meteo.exceptions import OpenMeteoError
async def test_json_request(aresponses: ResponsesMockServer) -> None:
"""Test JSON response is handled correctly."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(
status=200,
headers={"Content-Type": "application/json"},
text='{"status": "ok"}',
),
)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(session=session)
response = await open_meteo._request(URL("http://example.com/api/"))
assert response == '{"status": "ok"}'
async def test_internal_session(aresponses: ResponsesMockServer) -> None:
"""Test JSON response is handled correctly."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(
status=200,
headers={"Content-Type": "application/json"},
text='{"status": "ok"}',
),
)
async with OpenMeteo() as open_meteo:
response = await open_meteo._request(URL("http://example.com/api/"))
assert response == '{"status": "ok"}'
async def test_http_error400(aresponses: ResponsesMockServer) -> None:
"""Test HTTP 404 response handling."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(text="OMG PUPPIES!", status=404),
)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(session=session)
with pytest.raises(OpenMeteoError):
assert await open_meteo._request(URL("http://example.com/api/"))
async def test_http_error500(aresponses: ResponsesMockServer) -> None:
"""Test HTTP 500 response handling."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(
body=b'{"status":"nok"}',
status=500,
headers={"Content-Type": "application/json"},
),
)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(session=session)
with pytest.raises(OpenMeteoError):
assert await open_meteo._request(URL("http://example.com/api/"))
|