File: _io.py

package info (click to toggle)
python-asdf 4.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,060 kB
  • sloc: python: 24,224; makefile: 123
file content (155 lines) | stat: -rw-r--r-- 4,584 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
Low-level input/output routines for AsdfFile instances
"""

import contextlib
import io
import pathlib

from . import _version, constants, generic_io, versioning, yamlutil
from ._block.manager import Manager as BlockManager
from .exceptions import DelimiterNotFoundError
from .tags.core import Software


def get_asdf_library_info():
    """
    Get information about asdf to include in the asdf_library entry
    in the Tree.
    """
    return Software(
        {
            "name": "asdf",
            "version": _version.version,
            "homepage": "http://github.com/asdf-format/asdf",
            "author": "The ASDF Developers",
        },
    )


def parse_header_line(line):
    """
    Parses the header line in a ASDF file to obtain the ASDF version.
    """
    parts = line.split()
    if len(parts) != 2 or parts[0] != constants.ASDF_MAGIC:
        msg = "Does not appear to be a ASDF file."
        raise ValueError(msg)

    try:
        version = versioning.AsdfVersion(parts[1].decode("ascii"))
    except ValueError as err:
        msg = f"Unparsable version in ASDF file: {parts[1]}"
        raise ValueError(msg) from err

    if version != versioning._FILE_FORMAT_VERSION:
        msg = f"Unsupported ASDF file format version {version}"
        raise ValueError(msg)

    return version


def read_header_line(gf):
    try:
        header_line = gf.read_until(b"\r?\n", 2, "newline", include=True)
    except DelimiterNotFoundError as e:
        msg = "Does not appear to be a ASDF file."
        raise ValueError(msg) from e

    return parse_header_line(header_line)


def read_comment_section(fd):
    """
    Reads the comment section, between the header line and the
    Tree or first block.
    """
    content = fd.read_until(
        b"(%YAML)|(" + constants.BLOCK_MAGIC + b")",
        5,
        "start of content",
        include=False,
        exception=False,
    )

    comments = []

    lines = content.splitlines()
    for line in lines:
        if not line.startswith(b"#"):
            msg = "Invalid content between header and tree"
            raise ValueError(msg)
        comments.append(line[1:].strip())

    return comments


def find_asdf_version_in_comments(comments, default=None):
    for comment in comments:
        parts = comment.split()
        if len(parts) == 2 and parts[0] == constants.ASDF_STANDARD_COMMENT:
            try:
                version = versioning.AsdfVersion(parts[1].decode("ascii"))
            except ValueError:
                pass
            else:
                return version

    return default


@contextlib.contextmanager
def maybe_close(fd, mode=None, uri=None):
    if mode not in [None, "r", "rw"]:
        msg = f"Unrecognized asdf mode '{mode}'. Must be either 'r' or 'rw'"
        raise ValueError(msg)

    if isinstance(fd, (str, pathlib.Path)):
        mode = mode or "r"
        generic_file = generic_io.get_file(fd, mode=mode, uri=uri)
        try:
            yield generic_file
        except Exception as e:
            generic_file.close()
            raise e
    elif isinstance(fd, generic_io.GenericFile):
        yield fd
    else:
        if mode is None:
            # infer from fd
            if isinstance(fd, io.IOBase):
                mode = "rw" if fd.writable() else "r"
            else:
                mode = "r"
        yield generic_io.get_file(fd, mode=mode, uri=uri)


def read_tree_and_blocks(gf, lazy_load, memmap, validate_checksums):
    token = gf.read(4)
    tree = None
    blocks = BlockManager(uri=gf.uri, lazy_load=lazy_load, memmap=memmap, validate_checksums=validate_checksums)
    if token == b"%YAM":
        reader = gf.reader_until(
            constants.YAML_END_MARKER_REGEX,
            7,
            "End of YAML marker",
            include=True,
            initial_content=token,
        )
        tree = yamlutil.load_tree(reader)
        blocks.read(gf, after_magic=False)
    elif token == constants.BLOCK_MAGIC:
        blocks.read(gf, after_magic=True)
    elif token != b"":
        msg = "ASDF file appears to contain garbage after header."
        raise OSError(msg)

    return tree, blocks


def open_asdf(fd, uri=None, mode=None, lazy_load=True, memmap=False, validate_checksums=False):
    with maybe_close(fd, mode, uri) as generic_file:
        file_format_version = read_header_line(generic_file)
        comments = read_comment_section(generic_file)
        tree, blocks = read_tree_and_blocks(generic_file, lazy_load, memmap, validate_checksums)
        return file_format_version, comments, tree, blocks