1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
"""Test for the generic GeoJSON feed."""
import asyncio
import datetime
from http import HTTPStatus
import aiohttp
import pytest
from aio_geojson_client.consts import UPDATE_OK
from aio_geojson_generic_client.feed import GenericFeed
from tests.utils import load_fixture
@pytest.mark.asyncio
async def test_update_ok(mock_aioresponse):
"""Test updating feed is ok."""
home_coordinates = (-31.0, 151.0)
mock_aioresponse.get(
"https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
status=HTTPStatus.OK,
body=load_fixture("feed-1.json"),
)
async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
feed = GenericFeed(
websession,
home_coordinates,
"https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
)
assert (
repr(feed) == "<GenericFeed("
"home=(-31.0, 151.0), "
"url=https://www.rfs.nsw.gov.au"
"/feeds/majorIncidents.json, "
"radius=None)>"
)
status, entries = await feed.update()
assert status == UPDATE_OK
assert entries is not None
assert len(entries) == 4
feed_entry = entries[0]
assert feed_entry.title == "Title 1"
assert feed_entry.external_id == "1234"
assert feed_entry.coordinates == (-37.2345, 149.1234)
assert round(abs(feed_entry.distance_to_home - 714.4), 1) == 0
assert repr(feed_entry) == "<GenericFeedEntry(id=1234)>"
assert feed_entry.publication_date == datetime.datetime(
2018, 9, 21, 6, 30, tzinfo=datetime.timezone.utc
)
assert feed_entry.properties == {
"title": "Title 1",
"category": "Category 1",
"guid": "1234",
"pubDate": "21/09/2018 6:30:00 AM",
"description": "ALERT LEVEL: Alert Level 1 <br />LOCATION: Location 1 <br />COUNCIL AREA: Council 1 <br />STATUS: Status 1 <br />TYPE: Type 1 <br />FIRE: Yes <br />SIZE: 10 ha <br />RESPONSIBLE AGENCY: Agency 1 <br />UPDATED: 21 Sep 2018 16:45",
}
feed_entry = entries[1]
assert feed_entry is not None
assert feed_entry.title == "Title 2"
assert feed_entry.external_id == "Title 2"
feed_entry = entries[2]
assert feed_entry.external_id == hash((-37.2345, 149.1234))
feed_entry = entries[3]
assert feed_entry.title == "Badja Forest Rd, Countegany"
assert feed_entry.geometries is not None
assert len(feed_entry.geometries) == 4
assert round(abs(feed_entry.distance_to_home - 578.5), 1) == 0
@pytest.mark.asyncio
async def test_empty_feed(mock_aioresponse):
"""Test updating feed is ok when feed does not contain any entries."""
home_coordinates = (-41.2, 174.7)
mock_aioresponse.get(
"https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
status=HTTPStatus.OK,
body=load_fixture("feed-2.json"),
)
async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
feed = GenericFeed(
websession,
home_coordinates,
"https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
)
assert (
repr(feed) == "<GenericFeed("
"home=(-41.2, 174.7), "
"url=https://www.rfs.nsw.gov.au"
"/feeds/majorIncidents.json, "
"radius=None)>"
)
status, entries = await feed.update()
assert status == UPDATE_OK
assert entries is not None
assert len(entries) == 0
assert feed.last_timestamp is None
@pytest.mark.asyncio
async def test_feed_entry_properties(mock_aioresponse):
"""Test updating feed is ok with focus on properties."""
home_coordinates = (-41.2, 174.7)
mock_aioresponse.get(
"https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
status=HTTPStatus.OK,
body=load_fixture("feed-3.json"),
)
async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
feed = GenericFeed(
websession,
home_coordinates,
"https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
)
assert (
repr(feed) == "<GenericFeed("
"home=(-41.2, 174.7), "
"url=https://www.rfs.nsw.gov.au"
"/feeds/majorIncidents.json, "
"radius=None)>"
)
status, entries = await feed.update()
assert status == UPDATE_OK
assert entries is not None
assert len(entries) == 3
feed_entry = entries[0]
assert feed_entry.properties == {
"property1": "value1",
"property2": "value2",
}
feed_entry = entries[1]
assert feed_entry.properties is None
feed_entry = entries[2]
assert feed_entry.properties is None
|