1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
import os
import string
from typing import Any
from urllib.parse import urlparse
import yaml
from moto.moto_api._internal import mock_random as random
from moto.utilities.utils import get_partition
from .exceptions import ValidationError
def generate_stack_id(stack_name: str, region: str, account: str) -> str:
random_id = random.uuid4()
return f"arn:{get_partition(region)}:cloudformation:{region}:{account}:stack/{stack_name}/{random_id}"
def generate_changeset_id(
changeset_name: str, region_name: str, account_id: str
) -> str:
random_id = random.uuid4()
return f"arn:{get_partition(region_name)}:cloudformation:{region_name}:{account_id}:changeSet/{changeset_name}/{random_id}"
def generate_stackset_id(stackset_name: str) -> str:
random_id = random.uuid4()
return f"{stackset_name}:{random_id}"
def generate_stackset_arn(stackset_id: str, region_name: str, account_id: str) -> str:
return f"arn:{get_partition(region_name)}:cloudformation:{region_name}:{account_id}:stackset/{stackset_id}"
def random_suffix() -> str:
size = 12
chars = list(range(10)) + list(string.ascii_uppercase)
return "".join(str(random.choice(chars)) for x in range(size))
def yaml_tag_constructor(loader: Any, tag: Any, node: Any) -> Any:
"""convert shorthand intrinsic function to full name"""
def _f(loader: Any, tag: Any, node: Any) -> Any:
if tag == "!GetAtt":
if isinstance(node.value, list):
return node.value
return node.value.split(".")
elif type(node) is yaml.SequenceNode:
return loader.construct_sequence(node)
else:
return node.value
if tag == "!Ref":
key = "Ref"
else:
key = f"Fn::{tag[1:]}"
return {key: _f(loader, tag, node)}
def validate_template_cfn_lint(template: str) -> list[Any]:
# Importing cfnlint adds a significant overhead, so we keep it local
try:
# Compatibility for cfn-lint 0.x
# Fail fast with `cfnlint.core.configure_logging` which is removed in cfn-lint 1.x
from cfnlint.core import configure_logging, get_rules, run_checks
from cfnlint.decode import decode
# Save the template to a temporary file -- cfn-lint requires a file
filename = "file.tmp"
with open(filename, "w") as file:
file.write(template)
abs_filename = os.path.abspath(filename)
# decode handles both yaml and json
try:
template, matches = decode(abs_filename, False)
except TypeError:
# As of cfn-lint 0.39.0, the second argument (ignore_bad_template) was dropped
# https://github.com/aws-cloudformation/cfn-python-lint/pull/1580
template, matches = decode(abs_filename)
# Set cfn-lint to info
configure_logging(None)
# Initialize the ruleset to be applied (no overrules, no excludes)
rules = get_rules([], [], [])
# Use us-east-1 region (spec file) for validation
regions = ["us-east-1"]
# Process all the rules and gather the errors
return run_checks(abs_filename, template, rules, regions)
except ImportError:
# Compatibility for cfn-lint 1.x
from cfnlint.api import lint
from cfnlint.config import configure_logging
from cfnlint.core import get_rules
# Set cfn-lint to info
configure_logging(None, False)
# Initialize the ruleset to be applied (no overrules, no excludes)
rules = get_rules([], [], [])
# Use us-east-1 region (spec file) for validation
regions = ["us-east-1"]
# Process all the rules and gather the errors
return lint(template, rules, regions)
def get_stack_from_s3_url(template_url: str, account_id: str, partition: str) -> str:
from moto.s3.models import s3_backends
template_url_parts = urlparse(template_url)
if "localhost" in template_url:
bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1)
else:
if template_url_parts.netloc.endswith(
"amazonaws.com"
) and template_url_parts.netloc.startswith("s3"):
# Handle when S3 url uses amazon url with bucket in path
# Also handles getting region as technically s3 is region'd
# region = template_url.netloc.split('.')[1]
bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1)
else:
bucket_name = template_url_parts.netloc.split(".")[0]
key_name = template_url_parts.path.lstrip("/")
key = s3_backends[account_id][partition].get_object(bucket_name, key_name)
return key.value.decode("utf-8") # type: ignore[union-attr]
def validate_create_change_set(change_set_name: str) -> None:
if not (change_set_name and change_set_name[0].isalpha()):
raise ValidationError(f"Invalid change set name: {change_set_name}")
if not all(c.isalnum() or c == "-" for c in change_set_name):
raise ValidationError(f"Invalid change set name: {change_set_name}")
if len(change_set_name) > 128:
raise ValidationError(
f"Change set name exceeds 128 characters: {change_set_name}"
)
# Additional validations can be added here later
return
|