File: exceptions.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (143 lines) | stat: -rw-r--r-- 4,954 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from typing import Optional

from moto.core.exceptions import ServiceException


class RedshiftClientError(ServiceException):
    pass


class ClusterNotFoundError(RedshiftClientError):
    def __init__(self, cluster_identifier: str):
        super().__init__("ClusterNotFound", f"Cluster {cluster_identifier} not found.")


class ClusterSubnetGroupNotFoundError(RedshiftClientError):
    def __init__(self, subnet_identifier: str):
        super().__init__(
            "ClusterSubnetGroupNotFoundFault",
            f"Subnet group {subnet_identifier} not found.",
        )


class ClusterSecurityGroupNotFoundError(RedshiftClientError):
    code = "ClusterSecurityGroupNotFound"
    message = "The cluster security group name does not refer to an existing cluster security group."

    def __init__(self, group_identifier: Optional[str] = None):
        if group_identifier:
            super().__init__(f"Security group {group_identifier} not found.")


class ClusterParameterGroupNotFoundError(RedshiftClientError):
    def __init__(self, group_identifier: str):
        super().__init__(
            "ClusterParameterGroupNotFound",
            f"ClusterParameterGroup not found: {group_identifier}",
        )


class InvalidSubnetError(RedshiftClientError):
    def __init__(self, subnet_identifier: list[str]):
        super().__init__("InvalidSubnet", f"Subnet {subnet_identifier} not found.")


class SnapshotCopyGrantAlreadyExistsFaultError(RedshiftClientError):
    def __init__(self, snapshot_copy_grant_name: str):
        super().__init__(
            "SnapshotCopyGrantAlreadyExistsFault",
            "Cannot create the snapshot copy grant because a grant "
            f"with the identifier '{snapshot_copy_grant_name}' already exists",
        )


class SnapshotCopyGrantNotFoundFaultError(RedshiftClientError):
    def __init__(self, snapshot_copy_grant_name: str):
        super().__init__(
            "SnapshotCopyGrantNotFoundFault",
            f"Snapshot copy grant not found: {snapshot_copy_grant_name}",
        )


class ClusterSnapshotNotFoundError(RedshiftClientError):
    def __init__(self, snapshot_identifier: str):
        super().__init__(
            "ClusterSnapshotNotFound", f"Snapshot {snapshot_identifier} not found."
        )


class ClusterSnapshotAlreadyExistsError(RedshiftClientError):
    def __init__(self, snapshot_identifier: str):
        super().__init__(
            "ClusterSnapshotAlreadyExists",
            "Cannot create the snapshot because a snapshot with the "
            f"identifier {snapshot_identifier} already exists",
        )


class InvalidParameterValueError(RedshiftClientError):
    def __init__(self, message: str):
        super().__init__("InvalidParameterValue", message)


class ResourceNotFoundFaultError(RedshiftClientError):
    def __init__(
        self,
        resource_type: Optional[str] = None,
        resource_name: Optional[str] = None,
        message: Optional[str] = None,
    ):
        if resource_type and not resource_name:
            msg = f"resource of type '{resource_type}' not found."
        else:
            msg = f"{resource_type} ({resource_name}) not found."
        if message:
            msg = message
        super().__init__("ResourceNotFoundFault", msg)


class SnapshotCopyDisabledFaultError(RedshiftClientError):
    def __init__(self, cluster_identifier: str):
        super().__init__(
            "SnapshotCopyDisabledFault",
            f"Cannot modify retention period because snapshot copy is disabled on Cluster {cluster_identifier}.",
        )


class SnapshotCopyAlreadyDisabledFaultError(RedshiftClientError):
    def __init__(self, cluster_identifier: str):
        super().__init__(
            "SnapshotCopyAlreadyDisabledFault",
            f"Snapshot Copy is already disabled on Cluster {cluster_identifier}.",
        )


class SnapshotCopyAlreadyEnabledFaultError(RedshiftClientError):
    def __init__(self, cluster_identifier: str):
        super().__init__(
            "SnapshotCopyAlreadyEnabledFault",
            f"Snapshot Copy is already enabled on Cluster {cluster_identifier}.",
        )


class ClusterAlreadyExistsFaultError(RedshiftClientError):
    def __init__(self) -> None:
        super().__init__("ClusterAlreadyExists", "Cluster already exists")


class InvalidParameterCombinationError(RedshiftClientError):
    def __init__(self, message: str):
        super().__init__("InvalidParameterCombination", message)


class UnknownSnapshotCopyRegionFaultError(RedshiftClientError):
    def __init__(self, message: str):
        super().__init__("UnknownSnapshotCopyRegionFault", message)


class InvalidClusterSnapshotStateFaultError(RedshiftClientError):
    def __init__(self, snapshot_identifier: str):
        super().__init__(
            "InvalidClusterSnapshotState",
            f"Cannot delete the snapshot {snapshot_identifier} because only manual snapshots may be deleted",
        )