1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
"""Asynchronous client for the Open-Meteo API."""
# pylint: disable=protected-access
import asyncio
import aiohttp
import pytest
from aresponses import Response, ResponsesMockServer
from yarl import URL
from open_meteo import OpenMeteo
from open_meteo.exceptions import OpenMeteoConnectionError, OpenMeteoError
async def test_json_request(aresponses: ResponsesMockServer) -> None:
"""Test JSON response is handled correctly."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(
status=200,
headers={"Content-Type": "application/json"},
text='{"status": "ok"}',
),
)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(session=session)
response = await open_meteo._request(URL("http://example.com/api/"))
assert response == '{"status": "ok"}'
async def test_internal_session(aresponses: ResponsesMockServer) -> None:
"""Test JSON response is handled correctly."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(
status=200,
headers={"Content-Type": "application/json"},
text='{"status": "ok"}',
),
)
async with OpenMeteo() as open_meteo:
response = await open_meteo._request(URL("http://example.com/api/"))
assert response == '{"status": "ok"}'
async def test_timeout(aresponses: ResponsesMockServer) -> None:
"""Test request timeout."""
# Faking a timeout by sleeping
async def response_handler(_: aiohttp.ClientResponse) -> Response:
await asyncio.sleep(2)
return aresponses.Response(body="Goodmorning!")
aresponses.add("example.com", "/api/", "POST", response_handler)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(
session=session,
request_timeout=1,
)
with pytest.raises(OpenMeteoConnectionError):
assert await open_meteo._request(URL("http://example.com/api/"))
async def test_http_error400(aresponses: ResponsesMockServer) -> None:
"""Test HTTP 404 response handling."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(text="OMG PUPPIES!", status=404),
)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(session=session)
with pytest.raises(OpenMeteoError):
assert await open_meteo._request(URL("http://example.com/api/"))
async def test_http_error500(aresponses: ResponsesMockServer) -> None:
"""Test HTTP 500 response handling."""
aresponses.add(
"example.com",
"/api/",
"GET",
aresponses.Response(
body=b'{"status":"nok"}',
status=500,
headers={"Content-Type": "application/json"},
),
)
async with aiohttp.ClientSession() as session:
open_meteo = OpenMeteo(session=session)
with pytest.raises(OpenMeteoError):
assert await open_meteo._request(URL("http://example.com/api/"))
|