1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
|
"""
Contains commands for lightweight text editing of an ASDF file.
"""
import io
import os
import re
import shutil
# Marked safe because the editor command is specified by an
# environment variable that the user controls.
import subprocess # nosec
import sys
import tempfile
import yaml
from .. import constants, generic_io, schema, util
from ..asdf import AsdfFile, open_asdf
from ..block import BlockManager
from .main import Command
__all__ = ["edit"]
if sys.platform.startswith("win"):
DEFAULT_EDITOR = "notepad"
else:
DEFAULT_EDITOR = "vi"
class Edit(Command):
@classmethod
def setup_arguments(cls, subparsers):
"""
Set up a command line argument parser for the edit subcommand.
"""
# Set up the parser
parser = subparsers.add_parser(
"edit",
description="Edit the YAML portion of an ASDF file in-place.",
)
# Need an input file
parser.add_argument(
"filename",
help="Path to an ASDF file.",
)
parser.set_defaults(func=cls.run)
return parser
@classmethod
def run(cls, args):
"""
Execute the edit subcommand.
"""
return edit(args.filename)
def read_yaml(fd):
"""
Read the YAML portion of an open ASDF file's content.
Parameters
----------
fd : GenericFile
Returns
-------
bytes
YAML content
int
total number of bytes available for YAML area
bool
True if the file contains binary blocks
"""
# All ASDF files produced by this library, even the binary files
# of an exploded ASDF file, include a YAML header, so we'll just
# let this raise an error if the end marker can't be found.
# Revisit this if someone starts producing files without a
# YAML section, which the standard permits but is not possible
# with current software.
reader = fd.reader_until(
constants.YAML_END_MARKER_REGEX,
7,
"End of YAML marker",
include=True,
)
content = reader.read()
reader = fd.reader_until(
constants.BLOCK_MAGIC,
len(constants.BLOCK_MAGIC),
include=False,
exception=False,
)
buffer = reader.read()
contains_blocks = fd.peek(len(constants.BLOCK_MAGIC)) == constants.BLOCK_MAGIC
return content, len(content) + len(buffer), contains_blocks
def write_edited_yaml_larger(path, new_content, version):
"""
Rewrite an ASDF file, replacing the YAML portion with the
specified YAML content and updating the block index if present.
The file is assumed to contain binary blocks.
Parameters
----------
path : str
Path to ASDF file
content : bytes
Updated YAML content
"""
prefix = os.path.splitext(os.path.basename(path))[0] + "-"
# Since the original file may be large, create the temporary
# file in the same directory to avoid filling up the system
# temporary area.
temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(path), prefix=prefix, suffix=".asdf", delete=False)
try:
temp_file.close()
with generic_io.get_file(temp_file.name, mode="w") as fd:
fd.write(new_content)
# Allocate additional space for future YAML updates:
pad_length = util.calculate_padding(len(new_content), True, fd.block_size)
fd.fast_forward(pad_length)
with generic_io.get_file(path) as original_fd:
# Consume the file up to the first block, which must exist
# as a precondition to using this method.
original_fd.seek_until(
constants.BLOCK_MAGIC,
len(constants.BLOCK_MAGIC),
)
ctx = AsdfFile(version=version)
blocks = BlockManager(ctx, copy_arrays=False, lazy_load=False)
blocks.read_internal_blocks(original_fd, past_magic=True, validate_checksums=False)
blocks.finish_reading_internal_blocks()
blocks.write_internal_blocks_serial(fd)
blocks.write_block_index(fd, ctx)
blocks.close()
# the file needs to be closed here to release all memmaps
original_fd.close()
# Swap in the new version of the file atomically:
shutil.copy(temp_file.name, path)
finally:
os.unlink(temp_file.name)
def write_edited_yaml(path, new_content, available_bytes):
"""
Overwrite the YAML portion of an ASDF tree with the specified
YAML content. The content must fit in the space available.
Parameters
----------
path : str
Path to ASDF file
yaml_content : bytes
Updated YAML content
available_bytes : int
Number of bytes available for YAML
"""
# generic_io mode "rw" opens the file as "r+b":
with generic_io.get_file(path, mode="rw") as fd:
fd.write(new_content)
pad_length = available_bytes - len(new_content)
if pad_length > 0:
fd.write(b"\0" * pad_length)
def edit(path):
"""
Copy the YAML portion of an ASDF file to a temporary file, present
the file to the user for editing, then update the original file
with the modified YAML.
Parameters
----------
path : str
Path to ASDF file
"""
# Extract the YAML portion of the original file:
with generic_io.get_file(path, mode="r") as fd:
if util.get_file_type(fd) != util.FileType.ASDF:
print(f"Error: '{path}' is not an ASDF file.")
return 1
original_content, available_bytes, contains_blocks = read_yaml(fd)
original_asdf_version = parse_asdf_version(original_content)
original_yaml_version = parse_yaml_version(original_content)
prefix = os.path.splitext(os.path.basename(path))[0] + "-"
# We can't use temp_file's automatic delete because Windows
# won't allow reading the file from the editor process unless
# it is closed here.
temp_file = tempfile.NamedTemporaryFile(prefix=prefix, suffix=".yaml", delete=False)
try:
# Write the YAML to a temporary path:
temp_file.write(original_content)
temp_file.close()
# Loop so that the user can correct errors in the edited file:
while True:
open_editor(temp_file.name)
with open(temp_file.name, "rb") as f:
new_content = f.read()
if new_content == original_content:
print("No changes made to file")
return 0
try:
new_asdf_version = parse_asdf_version(new_content)
new_yaml_version = parse_yaml_version(new_content)
except Exception as e:
print(f"Error: failed to parse ASDF header: {str(e)}")
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
else:
continue
if new_asdf_version != original_asdf_version or new_yaml_version != original_yaml_version:
print("Error: cannot modify ASDF Standard or YAML version using this tool.")
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
else:
continue
try:
# Blocks are not read during validation, so this will not raise
# an error even though we're only opening the YAML portion of
# the file.
with open_asdf(io.BytesIO(new_content), _force_raw_types=True):
pass
except yaml.YAMLError as e:
print("Error: failed to parse updated YAML:")
print_exception(e)
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
else:
continue
except schema.ValidationError as e:
print("Warning: updated ASDF tree failed validation:")
print_exception(e)
choice = request_input("(c)ontinue editing, (f)orce update, or (a)bort? ", ["c", "f", "a"])
if choice == "a":
return 1
elif choice == "c":
continue
except Exception as e:
print("Error: failed to read updated file as ASDF:")
print_exception(e)
choice = request_input("(c)ontinue editing or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
else:
continue
# We've either opened the file without error, or
# the user has agreed to ignore validation errors.
# Break out of the loop so that we can update the
# original file.
break
finally:
os.unlink(temp_file.name)
if len(new_content) <= available_bytes:
# File has sufficient space allocated in the YAML area.
write_edited_yaml(path, new_content, available_bytes)
elif not contains_blocks:
# File does not have sufficient space, but there are
# no binary blocks, so we can just expand the file.
write_edited_yaml(path, new_content, len(new_content))
else:
# File does not have sufficient space, and binary blocks
# are present.
print("Warning: updated YAML larger than allocated space. File must be rewritten.")
choice = request_input("(c)ontinue or (a)bort? ", ["c", "a"])
if choice == "a":
return 1
else:
write_edited_yaml_larger(path, new_content, new_asdf_version)
def parse_asdf_version(content):
"""
Extract the ASDF Standard version from YAML content.
Parameters
----------
content : bytes
Returns
-------
asdf.versioning.AsdfVersion
ASDF Standard version
"""
comments = AsdfFile._read_comment_section(generic_io.get_file(io.BytesIO(content)))
return AsdfFile._find_asdf_version_in_comments(comments)
def parse_yaml_version(content):
"""
Extract the YAML version from YAML content.
Parameters
----------
content : bytes
Returns
-------
bytes
YAML version string.
"""
match = re.search(b"^%YAML (.*)$", content, flags=re.MULTILINE)
if match is None:
raise ValueError("YAML version number not found")
return match.group(1)
def print_exception(e):
"""
Print an exception, indented 4 spaces and elided if too many lines.
"""
lines = str(e).split("\n")
if len(lines) > 20:
lines = lines[0:20] + ["..."]
for line in lines:
print(f" {line}")
def request_input(message, choices):
"""
Request user input.
Parameters
----------
message : str
Message to display
choices : list of str
List of recognized inputs
"""
while True:
choice = input(message).strip().lower()
if choice in choices:
return choice
else:
print(f"Invalid choice: {choice}")
def open_editor(path):
"""
Launch an editor process with the file at path opened.
"""
editor = os.environ.get("EDITOR", DEFAULT_EDITOR)
# Marked safe because the editor command is specified by an
# environment variable that the user controls.
subprocess.run(f"{editor} {path}", check=True, shell=True) # nosec
|