File: reader.py

package info (click to toggle)
eccodes-python 2%3A1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,668 kB
  • sloc: python: 2,939; ansic: 268; makefile: 83
file content (128 lines) | stat: -rw-r--r-- 3,371 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import eccodes
import gribapi
from gribapi import ffi

from .message import GRIBMessage


class ReaderBase:
    def __init__(self):
        self._peeked = None

    def __iter__(self):
        return self

    def __next__(self):
        if self._peeked is not None:
            msg = self._peeked
            self._peeked = None
            return msg
        handle = self._next_handle()
        if handle is None:
            raise StopIteration
        return GRIBMessage(handle)

    def _next_handle(self):
        raise NotImplementedError

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

    def peek(self):
        """Return the next available message without consuming it"""
        if self._peeked is None:
            handle = self._next_handle()
            if handle is not None:
                self._peeked = GRIBMessage(handle)
        return self._peeked


class FileReader(ReaderBase):
    """Read messages from a file"""

    def __init__(self, path):
        super().__init__()
        self.file = open(path, "rb")

    def _next_handle(self):
        return eccodes.codes_new_from_file(self.file, eccodes.CODES_PRODUCT_GRIB)

    def __enter__(self):
        self.file.__enter__()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        return self.file.__exit__(exc_type, exc_value, traceback)


class MemoryReader(ReaderBase):
    """Read messages from memory"""

    def __init__(self, buf):
        super().__init__()
        self.buf = buf

    def _next_handle(self):
        if self.buf is None:
            return None
        handle = eccodes.codes_new_from_message(self.buf)
        self.buf = None
        return handle


@ffi.callback("long(*)(void*, void*, long)")
def pyread_callback(payload, buf, length):
    stream = ffi.from_handle(payload)
    read = stream.read(length)
    n = len(read)
    ffi.buffer(buf, length)[:n] = read
    return n if n > 0 else -1  # -1 means EOF


try:
    cstd = ffi.dlopen(None)  # Raises OSError on Windows
    ffi.cdef("void free(void* pointer);")
    ffi.cdef(
        "void* wmo_read_any_from_stream_malloc(void*, long (*stream_proc)(void*, void*, long), size_t*, int*);"
    )
except OSError:
    cstd = None


def codes_new_from_stream(stream):
    if cstd is None:
        raise OSError("This feature is not supported on Windows")
    sh = ffi.new_handle(stream)
    length = ffi.new("size_t*")
    err = ffi.new("int*")
    err, buf = gribapi.err_last(gribapi.lib.wmo_read_any_from_stream_malloc)(
        sh, pyread_callback, length
    )
    buf = ffi.gc(buf, cstd.free, size=length[0])
    if err:
        if err != gribapi.lib.GRIB_END_OF_FILE:
            gribapi.GRIB_CHECK(err)
        return None

    # TODO: remove the extra copy?
    handle = gribapi.lib.grib_handle_new_from_message_copy(ffi.NULL, buf, length[0])
    if handle == ffi.NULL:
        return None
    else:
        return gribapi.put_handle(handle)


class StreamReader(ReaderBase):
    """Read messages from a stream (an object with a ``read`` method)"""

    def __init__(self, stream):
        if cstd is None:
            raise OSError("This feature is not supported on Windows")
        super().__init__()
        self.stream = stream

    def _next_handle(self):
        return codes_new_from_stream(self.stream)