1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
"""Test for the GeoNet NZ Volcanic Alert Level GeoJSON feed."""
import asyncio
from http import HTTPStatus
import aiohttp
import pytest
from aio_geojson_client.consts import UPDATE_OK
from aio_geojson_geonetnz_volcano.consts import ATTRIBUTION
from aio_geojson_geonetnz_volcano.feed import GeonetnzVolcanoFeed
from tests.utils import load_fixture
@pytest.mark.asyncio
async def test_update_ok(mock_aioresponse):
"""Test updating feed is ok."""
home_coordinates = (-41.2, 174.7)
mock_aioresponse.get(
"https://api.geonet.org.nz/volcano/val",
status=HTTPStatus.OK,
body=load_fixture("val-1.json"),
)
async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
feed = GeonetnzVolcanoFeed(websession, home_coordinates)
assert (
repr(feed) == "<GeonetnzVolcanoFeed(home=(-41.2, 174.7), "
"url=https://api.geonet.org.nz/volcano/val, "
"radius=None)>"
)
status, entries = await feed.update()
assert status == UPDATE_OK
assert entries is not None
assert len(entries) == 3
feed_entry = entries[0]
assert feed_entry is not None
assert feed_entry.title == "Volcano 1"
assert feed_entry.external_id == "volcano1"
assert feed_entry.coordinates[0] == pytest.approx(-38.784)
assert feed_entry.coordinates[1] == pytest.approx(175.896)
assert round(abs(feed_entry.distance_to_home - 287.3), 1) == 0
assert repr(feed_entry) == "<GeonetnzVolcanoFeedEntry(id=volcano1)>"
assert feed_entry.attribution == ATTRIBUTION
assert feed_entry.alert_level == 0
assert feed_entry.activity == "No volcanic unrest."
assert feed_entry.hazards == "Volcanic environment hazards."
feed_entry = entries[1]
assert feed_entry is not None
assert feed_entry.title == "Volcano 2"
assert feed_entry.external_id == "volcano2"
feed_entry = entries[2]
assert feed_entry is not None
@pytest.mark.asyncio
async def test_empty_feed(mock_aioresponse):
"""Test updating feed is ok when feed does not contain any entries."""
home_coordinates = (-41.2, 174.7)
mock_aioresponse.get(
"https://api.geonet.org.nz/volcano/val",
status=HTTPStatus.OK,
body=load_fixture("val-2.json"),
)
async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
feed = GeonetnzVolcanoFeed(websession, home_coordinates)
assert (
repr(feed) == "<GeonetnzVolcanoFeed(home=(-41.2, 174.7), "
"url=https://api.geonet.org.nz/volcano/val, "
"radius=None)>"
)
status, entries = await feed.update()
assert status == UPDATE_OK
assert entries is not None
assert len(entries) == 0
assert feed.last_timestamp is None
|