File: test_20_messages.py

package info (click to toggle)
eccodes-python 2%3A1.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 400 kB
  • sloc: python: 2,695; ansic: 262; makefile: 83
file content (229 lines) | stat: -rw-r--r-- 6,802 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import os.path

import numpy as np
import pytest

from eccodes import messages


SAMPLE_DATA_FOLDER = os.path.join(os.path.dirname(__file__), "sample-data")
TEST_DATA = os.path.join(SAMPLE_DATA_FOLDER, "era5-levels-members.grib")


def _test_Message_read():
    with open(TEST_DATA) as file:
        res1 = messages.Message.from_file(file)

    assert res1.message_get("paramId") == 129
    assert res1["paramId"] == 129
    assert list(res1)[0] == "globalDomain"
    assert list(res1.message_grib_keys("time"))[0] == "dataDate"
    assert "paramId" in res1
    assert len(res1) > 100

    with pytest.raises(KeyError):
        res1["non-existent-key"]

    assert res1.message_get("non-existent-key", default=1) == 1

    res2 = messages.Message.from_message(res1)
    for (k2, v2), (k1, v1) in zip(res2.items(), res1.items()):
        assert k2 == k1
        if isinstance(v2, np.ndarray) or isinstance(v1, np.ndarray):
            assert np.allclose(v2, v1)
        else:
            assert v2 == v1

    with open(TEST_DATA) as file:
        with pytest.raises(EOFError):
            while True:
                messages.Message.from_file(file)


def test_Message_write(tmpdir):
    res = messages.Message.from_sample_name("regular_ll_pl_grib2")
    assert res["gridType"] == "regular_ll"

    res.message_set("Ni", 20)
    assert res["Ni"] == 20

    res["iDirectionIncrementInDegrees"] = 1.0
    assert res["iDirectionIncrementInDegrees"] == 1.0

    res.message_set("gridType", "reduced_gg")
    assert res["gridType"] == "reduced_gg"

    res["pl"] = [2.0, 3.0]
    assert np.allclose(res["pl"], [2.0, 3.0])

    # warn on errors
    res["centreDescription"] = "DUMMY"
    assert res["centreDescription"] != "DUMMY"
    res["edition"] = -1
    assert res["edition"] != -1

    # ignore errors
    res.errors = "ignore"
    res["centreDescription"] = "DUMMY"
    assert res["centreDescription"] != "DUMMY"

    # raise errors
    res.errors = "raise"
    with pytest.raises(KeyError):
        res["centreDescription"] = "DUMMY"

    with pytest.raises(NotImplementedError):
        del res["gridType"]

    out = tmpdir.join("test.grib")
    with open(str(out), "wb") as file:
        res.write(file)


def _test_ComputedKeysMessage_read():
    computed_keys = {
        "ref_time": (lambda m: str(m["dataDate"]) + str(m["dataTime"]), None),
        "error_key": (lambda m: 1 / 0, None),
        "centre": (lambda m: -1, lambda m, v: None),
    }
    with open(TEST_DATA) as file:
        res = messages.ComputedKeysMessage.from_file(file, computed_keys=computed_keys)

    assert res["paramId"] == 129
    assert res["ref_time"] == "201701010"
    assert len(res) > 100
    assert res["centre"] == -1

    with pytest.raises(ZeroDivisionError):
        res["error_key"]


def test_ComputedKeysMessage_write():
    computed_keys = {
        "ref_time": (lambda m: "%s%04d" % (m["dataDate"], m["dataTime"]), None),
        "error_key": (lambda m: 1 / 0, None),
        "centre": (lambda m: -1, lambda m, v: None),
    }
    res = messages.ComputedKeysMessage.from_sample_name(
        "regular_ll_pl_grib2", computed_keys=computed_keys
    )
    res["dataDate"] = 20180101
    res["dataTime"] = 0
    assert res["ref_time"] == "201801010000"

    res["centre"] = 1


def test_compat_create_exclusive(tmpdir):
    test_file = tmpdir.join("file.grib.idx")

    try:
        with messages.compat_create_exclusive(str(test_file)):
            raise RuntimeError("Test remove")
    except RuntimeError:
        pass

    with messages.compat_create_exclusive(str(test_file)) as file:
        file.write(b"Hi!")

    with pytest.raises(OSError):
        with messages.compat_create_exclusive(str(test_file)) as file:
            file.write(b"Hi!")


def _test_FileIndex():
    res = messages.FileIndex.from_filestream(
        messages.FileStream(TEST_DATA), ["paramId"]
    )
    assert res["paramId"] == [129, 130]
    assert len(res) == 1
    assert list(res) == ["paramId"]
    assert res.first()

    with pytest.raises(ValueError):
        res.getone("paramId")

    with pytest.raises(KeyError):
        res["non-existent-key"]

    subres = res.subindex(paramId=130)

    assert subres.get("paramId") == [130]
    assert subres.getone("paramId") == 130
    assert len(subres) == 1


def _test_FileIndex_from_indexpath_or_filestream(tmpdir):
    grib_file = tmpdir.join("file.grib")

    with open(TEST_DATA, "rb") as file:
        grib_file.write_binary(file.read())

    # create index file
    res = messages.FileIndex.from_indexpath_or_filestream(
        messages.FileStream(str(grib_file)), ["paramId"]
    )
    assert isinstance(res, messages.FileIndex)

    # read index file
    res = messages.FileIndex.from_indexpath_or_filestream(
        messages.FileStream(str(grib_file)), ["paramId"]
    )
    assert isinstance(res, messages.FileIndex)

    # do not read nor create the index file
    res = messages.FileIndex.from_indexpath_or_filestream(
        messages.FileStream(str(grib_file)), ["paramId"], indexpath=""
    )
    assert isinstance(res, messages.FileIndex)

    # can't create nor read index file
    res = messages.FileIndex.from_indexpath_or_filestream(
        messages.FileStream(str(grib_file)),
        ["paramId"],
        indexpath=str(tmpdir.join("non-existent-folder").join("non-existent-file")),
    )
    assert isinstance(res, messages.FileIndex)

    # trigger mtime check
    grib_file.remove()
    with open(TEST_DATA, "rb") as file:
        grib_file.write_binary(file.read())

    res = messages.FileIndex.from_indexpath_or_filestream(
        messages.FileStream(str(grib_file)), ["paramId"]
    )
    assert isinstance(res, messages.FileIndex)


def _test_FileIndex_errors():
    class MyMessage(messages.ComputedKeysMessage):
        computed_keys = {"error_key": lambda m: 1 / 0}

    stream = messages.FileStream(TEST_DATA, message_class=MyMessage)
    res = messages.FileIndex.from_filestream(stream, ["paramId", "error_key"])
    assert res["paramId"] == [129, 130]
    assert len(res) == 2
    assert list(res) == ["paramId", "error_key"]
    assert res["error_key"] == ["undef"]


def _test_FileStream():
    res = messages.FileStream(TEST_DATA)
    leader = res.first()
    assert len(leader) > 100
    assert sum(1 for _ in res) == leader["count"]
    assert len(res.index(["paramId"])) == 1

    # __file__ is not a GRIB, but contains the "GRIB" string, so it is a very tricky corner case
    res = messages.FileStream(str(__file__))
    with pytest.raises(EOFError):
        res.first()

    res = messages.FileStream(str(__file__), errors="ignore")
    with pytest.raises(EOFError):
        res.first()

    # res = messages.FileStream(str(__file__), errors='raise')
    # with pytest.raises(bindings.EcCodesError):
    #     res.first()