File: reader.py

package info (click to toggle)
python-asdf 4.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 7,032 kB
  • sloc: python: 24,068; makefile: 123
file content (250 lines) | stat: -rw-r--r-- 8,501 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import warnings
import weakref

from asdf import constants
from asdf.exceptions import AsdfBlockIndexWarning, AsdfWarning, DelimiterNotFoundError

from . import io as bio
from .exceptions import BlockIndexError


class ReadBlock:
    """
    Represents an ASDF block read from a file.
    """

    def __init__(self, offset, fd, memmap, lazy_load, validate_checksum, header=None, data_offset=None, data=None):
        self.offset = offset  # after block magic bytes
        self._fd = weakref.ref(fd)
        self._header = header
        self.data_offset = data_offset
        self._data = data
        self._cached_data = None
        self.memmap = memmap
        self.lazy_load = lazy_load
        self.validate_checksum = validate_checksum
        if not lazy_load:
            self.load()

    def close(self):
        self._cached_data = None

    @property
    def loaded(self):
        return self._data is not None

    def load(self):
        """
        Load the block data (if it is not already loaded).

        Raises
        ------
        OSError
            If attempting to load from a closed file.
        """
        if self.loaded:
            return
        fd = self._fd()
        if fd is None or fd.is_closed():
            msg = "Attempt to load block from closed file"
            raise OSError(msg)
        position = fd.tell()
        _, self._header, self.data_offset, self._data = bio.read_block(
            fd, offset=self.offset, memmap=self.memmap, lazy_load=self.lazy_load
        )
        fd.seek(position)

    @property
    def data(self):
        """
        Read, parse and return data for an ASDF block.

        Returns
        -------
        data : ndarray
            A one-dimensional ndarray of dypte uint8 read from an ASDF block

        Raises
        ------
        ValueError
            If the header checksum does not match the checksum of the data
            and validate_checksums was set to True.
        """
        if not self.loaded:
            self.load()
        if callable(self._data):
            data = self._data()
        else:
            data = self._data
        if self.validate_checksum:
            checksum = bio.calculate_block_checksum(data)
            if not self._header["flags"] & constants.BLOCK_FLAG_STREAMED and checksum != self._header["checksum"]:
                msg = f"Block at {self.offset} does not match given checksum"
                raise ValueError(msg)
            # only validate data the first time it's read
            self.validate_checksum = False
        return data

    @property
    def cached_data(self):
        """
        Return cached data for an ASDF block.

        The first time this is called it may read data from the file
        (if lazy loaded). Subsequent calls will return the same
        ndarray.
        """
        if self._cached_data is None:
            self._cached_data = self.data
        return self._cached_data

    @property
    def header(self):
        """
        Get the block header. For a lazy loaded block the first time
        this is called the header will be read from the file and
        cached.

        Returns
        -------
        header : dict
            Dictionary containing the read ASDF header.
        """
        if not self.loaded:
            self.load()
        return self._header


def _read_blocks_serially(fd, memmap=False, lazy_load=False, validate_checksums=False, after_magic=False):
    """
    Read blocks serially from a file without looking for a block index.

    For parameter and return value descriptions see `read_blocks`.
    """
    blocks = []
    magic_len = len(constants.BLOCK_MAGIC)

    if not after_magic:
        # seek until the first magic is found
        try:
            fd.seek_until(b"(" + constants.BLOCK_MAGIC + b")", magic_len)
        except DelimiterNotFoundError:
            return blocks
        after_magic = True

    buff = constants.BLOCK_MAGIC
    while buff == constants.BLOCK_MAGIC:
        # read the block
        offset, header, data_offset, data = bio.read_block(fd, memmap=memmap, lazy_load=lazy_load)
        blocks.append(
            ReadBlock(
                offset, fd, memmap, lazy_load, validate_checksums, header=header, data_offset=data_offset, data=data
            )
        )
        if blocks[-1].header["flags"] & constants.BLOCK_FLAG_STREAMED:
            # a file can only have 1 streamed block and it must be at the end so we
            # can stop looking for more blocks
            return blocks

        # check for the next block
        buff = fd.read(magic_len)

    # check remaining bytes
    if buff == constants.INDEX_HEADER[: len(buff)]:
        # remaining bytes are the start of the block index
        return blocks
    if buff == b"\0" * len(buff):
        # remaining bytes are null
        return blocks
    msg = f"Read invalid bytes {buff!r} after blocks, your file might be corrupt"
    warnings.warn(msg, AsdfWarning)
    return blocks


def read_blocks(fd, memmap=False, lazy_load=False, validate_checksums=False, after_magic=False):
    """
    Read a sequence of ASDF blocks from a file.

    If the file is seekable (and lazy_load is False) an attempt will
    made to find, read and parse a block index. If this fails, the
    blocks will be read serially. If parsing the block index
    succeeds, the first first and last blocks will be read (to
    confirm that those portions of the index are correct). All
    other blocks will not be read until they are accessed.

    Parameters
    ----------
    fd : file or generic_io.GenericIO
        File to read. Reading will start at the current position.

    memmap : bool, optional, default False
        If true, memory map block data.

    lazy_load : bool, optional, default False
        If true, block data will be a callable that when executed
        will return the block data. See the ``lazy_load`` argument
        to ``asdf._block.io.read_block`` for more details.

    validate_checksums : bool, optional, default False
        When reading blocks compute the block data checksum and
        compare it to the checksum read from the block header.
        Note that this comparison will occur when the data is
        accessed if ``lazy_load`` was set to True.

    after_magic : bool, optional, default False
        If True don't expect block magic bytes for the first block
        read from the file.

    Returns
    -------

    read_blocks : list of ReadBlock
        A list of ReadBlock instances.

    Raises
    ------
    OSError
        Invalid bytes encountered while reading blocks.

    ValueError
        A read block has an invalid checksum.
    """
    if not lazy_load or not fd.seekable():
        # load all blocks serially
        return _read_blocks_serially(fd, memmap, lazy_load, validate_checksums, after_magic)

    # try to find block index
    starting_offset = fd.tell()
    index_offset = bio.find_block_index(fd, starting_offset)
    if index_offset is None:
        # if failed, load all blocks serially
        fd.seek(starting_offset)
        return _read_blocks_serially(fd, memmap, lazy_load, validate_checksums, after_magic)

    # setup empty blocks
    try:
        block_index = bio.read_block_index(fd, index_offset)
    except BlockIndexError as e:
        # failed to read block index, fall back to serial reading
        msg = f"Failed to read block index, falling back to serial reading: {e!s}"
        warnings.warn(msg, AsdfBlockIndexWarning)
        fd.seek(starting_offset)
        return _read_blocks_serially(fd, memmap, lazy_load, validate_checksums, after_magic)
    # skip magic for each block
    magic_len = len(constants.BLOCK_MAGIC)
    blocks = [ReadBlock(offset + magic_len, fd, memmap, lazy_load, validate_checksums) for offset in block_index]
    try:
        # load first and last blocks to check if the index looks correct
        for index in (0, -1):
            fd.seek(block_index[index])
            buff = fd.read(magic_len)
            if buff != constants.BLOCK_MAGIC:
                msg = "Invalid block magic"
                raise OSError(msg)
            blocks[index].load()
    except (OSError, ValueError) as e:
        msg = f"Invalid block index contents for block {index}, falling back to serial reading: {e!s}"
        warnings.warn(msg, AsdfBlockIndexWarning)
        fd.seek(starting_offset)
        return _read_blocks_serially(fd, memmap, lazy_load, after_magic)
    return blocks