File: validations.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (153 lines) | stat: -rw-r--r-- 5,282 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Route53ResolverBackend validations that result in ValidationException.

Note that ValidationExceptions are accumulative.
"""

import re
from typing import Any, Optional

from moto.route53resolver.exceptions import RRValidationException


def validate_args(validators: list[tuple[str, Any]]) -> None:
    """Raise exception if any of the validations fails.

    validators is a list of tuples each containing the following:
        (printable field name, field value)

    The error messages are accumulated before the exception is raised.
    """
    validation_map = {
        "creatorRequestId": validate_creator_request_id,
        "direction": validate_direction,
        "domainName": validate_domain_name,
        "ipAddresses": validate_ip_addresses,
        "ipAddresses.subnetId": validate_subnets,
        "maxResults": validate_max_results,
        "name": validate_name,
        "resolverEndpointId": validate_endpoint_id,
        "resolverRuleAssociationId": validate_rule_association_id,
        "resolverRuleId": validate_rule_id,
        "ruleType": validate_rule_type,
        "securityGroupIds": validate_security_group_ids,
        "targetIps.port": validate_target_port,
        "vPCId": validate_vpc_id,
    }

    err_msgs = []
    # This eventually could be a switch (python 3.10), eliminating the need
    # for the above map and individual functions.
    for fieldname, value in validators:
        msg = validation_map[fieldname](value)  # type: ignore
        if msg:
            err_msgs.append((fieldname, value, msg))
    if err_msgs:
        raise RRValidationException(err_msgs)


def validate_creator_request_id(value: Optional[str]) -> str:
    """Raise exception if the creator_request_id has invalid length."""
    if value and len(value) > 255:
        return "have length less than or equal to 255"
    return ""


def validate_direction(value: Optional[str]) -> str:
    """Raise exception if direction not one of the allowed values."""
    if value and value not in ["INBOUND", "OUTBOUND"]:
        return "satisfy enum value set: [INBOUND, OUTBOUND]"
    return ""


def validate_domain_name(value: str) -> str:
    """Raise exception if the domain_name has invalid length."""
    if len(value) > 256:
        return "have length less than or equal to 256"
    return ""


def validate_endpoint_id(value: Optional[str]) -> str:
    """Raise exception if resolver endpoint id has invalid length."""
    if value and len(value) > 64:
        return "have length less than or equal to 64"
    return ""


def validate_ip_addresses(value: str) -> str:
    """Raise exception if IPs fail to match length constraint."""
    if len(value) > 10:
        return "have length less than or equal to 10"
    return ""


def validate_max_results(value: Optional[int]) -> str:
    """Raise exception if number of endpoints or IPs is too large."""
    if value and value > 100:
        return "have length less than or equal to 100"
    return ""


def validate_name(value: Optional[str]) -> str:
    """Raise exception if name fails to match constraints."""
    if value:
        if len(value) > 64:
            return "have length less than or equal to 64"
        name_pattern = r"^(?!^[0-9]+$)([a-zA-Z0-9-_' ']+)$"
        if not re.match(name_pattern, value):
            return rf"satisfy regular expression pattern: {name_pattern}"
    return ""


def validate_rule_association_id(value: Optional[str]) -> str:
    """Raise exception if resolver rule association id has invalid length."""
    if value and len(value) > 64:
        return "have length less than or equal to 64"
    return ""


def validate_rule_id(value: Optional[str]) -> str:
    """Raise exception if resolver rule id has invalid length."""
    if value and len(value) > 64:
        return "have length less than or equal to 64"
    return ""


def validate_rule_type(value: Optional[str]) -> str:
    """Raise exception if rule_type not one of the allowed values."""
    if value and value not in ["FORWARD", "SYSTEM", "RECURSIVE"]:
        return "satisfy enum value set: [FORWARD, SYSTEM, RECURSIVE]"
    return ""


def validate_security_group_ids(value: list[str]) -> str:
    """Raise exception if IPs fail to match length constraint."""
    # Too many security group IDs is an InvalidParameterException.
    for group_id in value:
        if len(group_id) > 64:
            return (
                "have length less than or equal to 64 and Member must have "
                "length greater than or equal to 1"
            )
    return ""


def validate_subnets(value: list[dict[str, Any]]) -> str:
    """Raise exception if subnets fail to match length constraint."""
    for subnet_id in [x["SubnetId"] for x in value]:
        if len(subnet_id) > 32:
            return "have length less than or equal to 32"
    return ""


def validate_target_port(value: Optional[dict[str, int]]) -> str:
    """Raise exception if target port fails to match length constraint."""
    if value and value["Port"] > 65535:
        return "have value less than or equal to 65535"
    return ""


def validate_vpc_id(value: str) -> str:
    """Raise exception if VPC id has invalid length."""
    if len(value) > 64:
        return "have length less than or equal to 64"
    return ""