File: test_feed.py

package info (click to toggle)
python-aio-geojson-generic-client 0.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 180 kB
  • sloc: python: 332; makefile: 4
file content (142 lines) | stat: -rw-r--r-- 4,927 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Test for the generic GeoJSON feed."""
import asyncio
import datetime
from http import HTTPStatus

import aiohttp
import pytest
from aio_geojson_client.consts import UPDATE_OK

from aio_geojson_generic_client.feed import GenericFeed
from tests.utils import load_fixture


@pytest.mark.asyncio
async def test_update_ok(mock_aioresponse):
    """Test updating feed is ok."""
    home_coordinates = (-31.0, 151.0)
    mock_aioresponse.get(
        "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
        status=HTTPStatus.OK,
        body=load_fixture("feed-1.json"),
    )

    async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
        feed = GenericFeed(
            websession,
            home_coordinates,
            "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
        )
        assert (
            repr(feed) == "<GenericFeed("
            "home=(-31.0, 151.0), "
            "url=https://www.rfs.nsw.gov.au"
            "/feeds/majorIncidents.json, "
            "radius=None)>"
        )
        status, entries = await feed.update()
        assert status == UPDATE_OK
        assert entries is not None
        assert len(entries) == 4

        feed_entry = entries[0]
        assert feed_entry.title == "Title 1"
        assert feed_entry.external_id == "1234"
        assert feed_entry.coordinates == (-37.2345, 149.1234)
        assert round(abs(feed_entry.distance_to_home - 714.4), 1) == 0
        assert repr(feed_entry) == "<GenericFeedEntry(id=1234)>"
        assert feed_entry.publication_date == datetime.datetime(
            2018, 9, 21, 6, 30, tzinfo=datetime.timezone.utc
        )
        assert feed_entry.properties == {
            "title": "Title 1",
            "category": "Category 1",
            "guid": "1234",
            "pubDate": "21/09/2018 6:30:00 AM",
            "description": "ALERT LEVEL: Alert Level 1 <br />LOCATION: Location 1 <br />COUNCIL AREA: Council 1 <br />STATUS: Status 1 <br />TYPE: Type 1 <br />FIRE: Yes <br />SIZE: 10 ha <br />RESPONSIBLE AGENCY: Agency 1 <br />UPDATED: 21 Sep 2018 16:45",
        }

        feed_entry = entries[1]
        assert feed_entry is not None
        assert feed_entry.title == "Title 2"
        assert feed_entry.external_id == "Title 2"

        feed_entry = entries[2]
        assert feed_entry.external_id == hash((-37.2345, 149.1234))

        feed_entry = entries[3]
        assert feed_entry.title == "Badja Forest Rd, Countegany"
        assert feed_entry.geometries is not None
        assert len(feed_entry.geometries) == 4
        assert round(abs(feed_entry.distance_to_home - 578.5), 1) == 0


@pytest.mark.asyncio
async def test_empty_feed(mock_aioresponse):
    """Test updating feed is ok when feed does not contain any entries."""
    home_coordinates = (-41.2, 174.7)
    mock_aioresponse.get(
        "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
        status=HTTPStatus.OK,
        body=load_fixture("feed-2.json"),
    )

    async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
        feed = GenericFeed(
            websession,
            home_coordinates,
            "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
        )
        assert (
            repr(feed) == "<GenericFeed("
            "home=(-41.2, 174.7), "
            "url=https://www.rfs.nsw.gov.au"
            "/feeds/majorIncidents.json, "
            "radius=None)>"
        )
        status, entries = await feed.update()
        assert status == UPDATE_OK
        assert entries is not None
        assert len(entries) == 0
        assert feed.last_timestamp is None


@pytest.mark.asyncio
async def test_feed_entry_properties(mock_aioresponse):
    """Test updating feed is ok with focus on properties."""
    home_coordinates = (-41.2, 174.7)
    mock_aioresponse.get(
        "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
        status=HTTPStatus.OK,
        body=load_fixture("feed-3.json"),
    )

    async with aiohttp.ClientSession(loop=asyncio.get_running_loop()) as websession:
        feed = GenericFeed(
            websession,
            home_coordinates,
            "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json",
        )
        assert (
            repr(feed) == "<GenericFeed("
            "home=(-41.2, 174.7), "
            "url=https://www.rfs.nsw.gov.au"
            "/feeds/majorIncidents.json, "
            "radius=None)>"
        )
        status, entries = await feed.update()
        assert status == UPDATE_OK
        assert entries is not None
        assert len(entries) == 3

        feed_entry = entries[0]
        assert feed_entry.properties == {
            "property1": "value1",
            "property2": "value2",
        }

        feed_entry = entries[1]
        assert feed_entry.properties is None

        feed_entry = entries[2]
        assert feed_entry.properties is None