1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
"""
Low-level input/output routines for AsdfFile instances
"""
import contextlib
import io
import pathlib
from . import _version, constants, generic_io, versioning, yamlutil
from ._block.manager import Manager as BlockManager
from .exceptions import DelimiterNotFoundError
from .tags.core import Software
def get_asdf_library_info():
"""
Get information about asdf to include in the asdf_library entry
in the Tree.
"""
return Software(
{
"name": "asdf",
"version": _version.version,
"homepage": "http://github.com/asdf-format/asdf",
"author": "The ASDF Developers",
},
)
def parse_header_line(line):
"""
Parses the header line in a ASDF file to obtain the ASDF version.
"""
parts = line.split()
if len(parts) != 2 or parts[0] != constants.ASDF_MAGIC:
msg = "Does not appear to be a ASDF file."
raise ValueError(msg)
try:
version = versioning.AsdfVersion(parts[1].decode("ascii"))
except ValueError as err:
msg = f"Unparsable version in ASDF file: {parts[1]}"
raise ValueError(msg) from err
if version != versioning._FILE_FORMAT_VERSION:
msg = f"Unsupported ASDF file format version {version}"
raise ValueError(msg)
return version
def read_header_line(gf):
try:
header_line = gf.read_until(b"\r?\n", 2, "newline", include=True)
except DelimiterNotFoundError as e:
msg = "Does not appear to be a ASDF file."
raise ValueError(msg) from e
return parse_header_line(header_line)
def read_comment_section(fd):
"""
Reads the comment section, between the header line and the
Tree or first block.
"""
content = fd.read_until(
b"(%YAML)|(" + constants.BLOCK_MAGIC + b")",
5,
"start of content",
include=False,
exception=False,
)
comments = []
lines = content.splitlines()
for line in lines:
if not line.startswith(b"#"):
msg = "Invalid content between header and tree"
raise ValueError(msg)
comments.append(line[1:].strip())
return comments
def find_asdf_version_in_comments(comments, default=None):
for comment in comments:
parts = comment.split()
if len(parts) == 2 and parts[0] == constants.ASDF_STANDARD_COMMENT:
try:
version = versioning.AsdfVersion(parts[1].decode("ascii"))
except ValueError:
pass
else:
return version
return default
@contextlib.contextmanager
def maybe_close(fd, mode=None, uri=None):
if mode not in [None, "r", "rw"]:
msg = f"Unrecognized asdf mode '{mode}'. Must be either 'r' or 'rw'"
raise ValueError(msg)
if isinstance(fd, (str, pathlib.Path)):
mode = mode or "r"
generic_file = generic_io.get_file(fd, mode=mode, uri=uri)
try:
yield generic_file
except Exception as e:
generic_file.close()
raise e
elif isinstance(fd, generic_io.GenericFile):
yield fd
else:
if mode is None:
# infer from fd
if isinstance(fd, io.IOBase):
mode = "rw" if fd.writable() else "r"
else:
mode = "r"
yield generic_io.get_file(fd, mode=mode, uri=uri)
def read_tree_and_blocks(gf, lazy_load, memmap, validate_checksums):
token = gf.read(4)
tree = None
blocks = BlockManager(uri=gf.uri, lazy_load=lazy_load, memmap=memmap, validate_checksums=validate_checksums)
if token == b"%YAM":
reader = gf.reader_until(
constants.YAML_END_MARKER_REGEX,
7,
"End of YAML marker",
include=True,
initial_content=token,
)
tree = yamlutil.load_tree(reader)
blocks.read(gf, after_magic=False)
elif token == constants.BLOCK_MAGIC:
blocks.read(gf, after_magic=True)
elif token != b"":
msg = "ASDF file appears to contain garbage after header."
raise OSError(msg)
return tree, blocks
def open_asdf(fd, uri=None, mode=None, lazy_load=True, memmap=False, validate_checksums=False):
with maybe_close(fd, mode, uri) as generic_file:
file_format_version = read_header_line(generic_file)
comments = read_comment_section(generic_file)
tree, blocks = read_tree_and_blocks(generic_file, lazy_load, memmap, validate_checksums)
return file_format_version, comments, tree, blocks
|