1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
|
"""
Contains commands for lightweight text editing of an ASDF file.
"""
import io
import os
import re
import shutil
# Marked safe because the editor command is specified by an
# environment variable that the user controls.
import subprocess # nosec
import sys
import tempfile
import yaml
from asdf import constants, generic_io, schema, util
from asdf._asdf import AsdfFile
from asdf._block import io as bio
from asdf._block.exceptions import BlockIndexError
from .main import Command
__all__ = ["edit"]
if sys.platform.startswith("win"):
DEFAULT_EDITOR = "notepad"
else:
DEFAULT_EDITOR = "vi"
class Edit(Command):
@classmethod
def setup_arguments(cls, subparsers):
"""
Set up a command line argument parser for the edit subcommand.
"""
# Set up the parser
parser = subparsers.add_parser(
"edit",
description="Edit the YAML portion of an ASDF file in-place.",
)
# Need an input file
parser.add_argument(
"filename",
help="Path to an ASDF file.",
)
parser.set_defaults(func=cls.run)
return parser
@classmethod
def run(cls, args):
"""
Execute the edit subcommand.
"""
return edit(args.filename)
def read_yaml(fd):
"""
Read the YAML portion of an open ASDF file's content.
Parameters
----------
fd : GenericFile
Returns
-------
bytes
YAML content
int
total number of bytes available for YAML area
bool
True if the file contains binary blocks
"""
# All ASDF files produced by this library, even the binary files
# of an exploded ASDF file, include a YAML header, so we'll just
# let this raise an error if the end marker can't be found.
# Revisit this if someone starts producing files without a
# YAML section, which the standard permits but is not possible
# with current software.
reader = fd.reader_until(
constants.YAML_END_MARKER_REGEX,
7,
"End of YAML marker",
include=True,
)
content = reader.read()
reader = fd.reader_until(
constants.BLOCK_MAGIC,
len(constants.BLOCK_MAGIC),
include=False,
exception=False,
)
buffer = reader.read()
contains_blocks = fd.peek(len(constants.BLOCK_MAGIC)) == constants.BLOCK_MAGIC
return content, len(content) + len(buffer), contains_blocks
def write_edited_yaml_larger(path, new_content, version):
"""
Rewrite an ASDF file, replacing the YAML portion with the
specified YAML content and updating the block index if present.
The file is assumed to contain binary blocks.
Parameters
----------
path : str
Path to ASDF file
content : bytes
Updated YAML content
"""
prefix = os.path.splitext(os.path.basename(path))[0] + "-"
# Since the original file may be large, create the temporary
# file in the same directory to avoid filling up the system
# temporary area.
temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(path), prefix=prefix, suffix=".asdf", delete=False)
try:
temp_file.close()
with generic_io.get_file(temp_file.name, mode="w") as fd:
fd.write(new_content)
# Allocate additional space for future YAML updates:
pad_length = util.calculate_padding(len(new_content), True, fd.block_size)
fd.fast_forward(pad_length)
# now copy over ASDF block contents
with generic_io.get_file(path) as original_fd:
original_fd.seek_until(constants.BLOCK_MAGIC, len(constants.BLOCK_MAGIC))
old_first_block_offset = original_fd.tell() - len(constants.BLOCK_MAGIC)
new_first_block_offset = fd.tell()
# check if the original file has a block index which we will need to update
# as we're moving the blocks
block_index_offset = bio.find_block_index(original_fd)
if block_index_offset is None:
block_index = None
original_fd.seek(0, generic_io.SEEK_END)
blocks_end = original_fd.tell()
else:
blocks_end = block_index_offset
try:
block_index = bio.read_block_index(original_fd, block_index_offset)
except BlockIndexError:
# the original index was invalid
block_index = None
# copy over blocks byte-for-byte from old_first_block_offset to block_index_offset
original_fd.seek(old_first_block_offset)
block_size = min(fd.block_size, original_fd.block_size)
n_bytes = blocks_end - old_first_block_offset
for offset in range(0, n_bytes, block_size):
this_size = min(block_size, n_bytes - offset)
fd.write(original_fd.read(this_size))
# update index
if block_index is not None:
offset = new_first_block_offset - old_first_block_offset
updated_block_index = [i + offset for i in block_index]
bio.write_block_index(fd, updated_block_index)
# Swap in the new version of the file atomically:
shutil.copy(temp_file.name, path)
finally:
os.unlink(temp_file.name)
def write_edited_yaml(path, new_content, available_bytes):
"""
Overwrite the YAML portion of an ASDF tree with the specified
YAML content. The content must fit in the space available.
Parameters
----------
path : str
Path to ASDF file
yaml_content : bytes
Updated YAML content
available_bytes : int
Number of bytes available for YAML
"""
# generic_io mode "rw" opens the file as "r+b":
with generic_io.get_file(path, mode="rw") as fd:
fd.write(new_content)
pad_length = available_bytes - len(new_content)
if pad_length > 0:
fd.write(b"\0" * pad_length)
def edit(path):
"""
Copy the YAML portion of an ASDF file to a temporary file, present
the file to the user for editing, then update the original file
with the modified YAML.
Parameters
----------
path : str
Path to ASDF file
"""
# Extract the YAML portion of the original file:
with generic_io.get_file(path, mode="r") as fd:
if fd.peek(len(constants.ASDF_MAGIC)) != constants.ASDF_MAGIC:
print(f"Error: '{path}' is not an ASDF file.")
return 1
original_content, available_bytes, contains_blocks = read_yaml(fd)
original_asdf_version = parse_asdf_version(original_content)
original_yaml_version = parse_yaml_version(original_content)
prefix = os.path.splitext(os.path.basename(path))[0] + "-"
# We can't use temp_file's automatic delete because Windows
# won't allow reading the file from the editor process unless
# it is closed here.
temp_file = tempfile.NamedTemporaryFile(prefix=prefix, suffix=".yaml", delete=False)
try:
# Write the YAML to a temporary path:
temp_file.write(original_content)
temp_file.close()
# Loop so that the user can correct errors in the edited file:
while True:
open_editor(temp_file.name)
with open(temp_file.name, "rb") as f:
new_content = f.read()
if new_content == original_content:
print("No changes made to file")
return 0
try:
new_asdf_version = parse_asdf_version(new_content)
new_yaml_version = parse_yaml_version(new_content)
except Exception as e:
print(f"Error: failed to parse ASDF header: {e!s}")
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
continue
if new_asdf_version != original_asdf_version or new_yaml_version != original_yaml_version:
print("Error: cannot modify ASDF Standard or YAML version using this tool.")
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
continue
try:
# check this is an ASDF file
if new_content[: len(constants.ASDF_MAGIC)] != constants.ASDF_MAGIC:
msg = "Does not appear to be a ASDF file."
raise ValueError(msg)
# read the tagged tree (which also checks if the YAML is valid)
tagged_tree = util.load_yaml(io.BytesIO(new_content), tagged=True)
# validate the tagged tree
ctx = AsdfFile(version=new_asdf_version)
schema.validate(tagged_tree, ctx=ctx, reading=True)
except yaml.YAMLError as e:
print("Error: failed to parse updated YAML:")
print_exception(e)
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
continue
except schema.ValidationError as e:
print("Warning: updated ASDF tree failed validation:")
print_exception(e)
choice = request_input("(c)ontinue editing, (f)orce update, or (a)bort? ", ["c", "f", "a"])
if choice == "a":
return 1
if choice == "c":
continue
except Exception as e:
print("Error: failed to read updated file as ASDF:")
print_exception(e)
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
continue
# We've either opened the file without error, or
# the user has agreed to ignore validation errors.
# Break out of the loop so that we can update the
# original file.
break
finally:
os.unlink(temp_file.name)
if len(new_content) <= available_bytes:
# File has sufficient space allocated in the YAML area.
write_edited_yaml(path, new_content, available_bytes)
elif not contains_blocks:
# File does not have sufficient space, but there are
# no binary blocks, so we can just expand the file.
write_edited_yaml(path, new_content, len(new_content))
else:
# File does not have sufficient space, and binary blocks
# are present.
print("Warning: updated YAML larger than allocated space. File must be rewritten.")
choice = request_input("(c)ontinue or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
write_edited_yaml_larger(path, new_content, new_asdf_version)
return None
def parse_asdf_version(content):
"""
Extract the ASDF Standard version from YAML content.
Parameters
----------
content : bytes
Returns
-------
asdf.versioning.AsdfVersion
ASDF Standard version
"""
comments = AsdfFile._read_comment_section(generic_io.get_file(io.BytesIO(content)))
return AsdfFile._find_asdf_version_in_comments(comments)
def parse_yaml_version(content):
"""
Extract the YAML version from YAML content.
Parameters
----------
content : bytes
Returns
-------
bytes
YAML version string.
"""
match = re.search(b"^%YAML (.*)$", content, flags=re.MULTILINE)
if match is None:
msg = "YAML version number not found"
raise ValueError(msg)
return match.group(1)
def print_exception(e):
"""
Print an exception, indented 4 spaces and elided if too many lines.
"""
lines = str(e).split("\n")
if len(lines) > 20:
lines = lines[0:20] + ["..."]
for line in lines:
print(f" {line}")
def request_input(message, choices):
"""
Request user input.
Parameters
----------
message : str
Message to display
choices : list of str
List of recognized inputs
"""
while True:
choice = input(message).strip().lower()
if choice in choices:
return choice
print(f"Invalid choice: {choice}")
return None
def open_editor(path):
"""
Launch an editor process with the file at path opened.
"""
editor = os.environ.get("EDITOR", DEFAULT_EDITOR)
# Marked safe because the editor command is specified by an
# environment variable that the user controls.
subprocess.run(f"{editor} {path}", check=True, shell=True) # noqa: S602
|