File: exceptions.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (58 lines) | stat: -rw-r--r-- 2,093 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import json
from typing import Optional

from moto.core.exceptions import JsonRESTError


class OpensearchIngestionExceptions(JsonRESTError):
    pass


class PipelineAlreadyExistsException(OpensearchIngestionExceptions):
    def __init__(self, pipeline_name: str):
        super().__init__(
            "ResourceAlreadyExistsException",
            f"Pipeline with given name {pipeline_name} already exists",
        )
        self.description = json.dumps({"message": self.message})


class PipelineInvalidStateException(OpensearchIngestionExceptions):
    def __init__(
        self, action: str, valid_states: list[str], current_state: Optional[str]
    ):
        super().__init__(
            "ConflictException",
            f"Only pipelines with one of the following statuses are eligible for {action}: {valid_states}. The current status is {current_state}.",
        )
        self.description = json.dumps({"message": self.message})


class PipelineNotFoundException(OpensearchIngestionExceptions):
    def __init__(self, pipeline_name: str):
        super().__init__(
            "ResourceNotFoundException", f"Pipeline {pipeline_name} could not be found."
        )
        self.description = json.dumps({"message": self.message})


class PipelineValidationException(OpensearchIngestionExceptions):
    def __init__(self, message: str):
        super().__init__("ValidationException", message)
        self.description = json.dumps({"message": self.message})


class InvalidVPCOptionsException(OpensearchIngestionExceptions):
    def __init__(self, message: str) -> None:
        super().__init__("ValidationException", f"Invalid VpcOptions: {message}")
        self.description = json.dumps({"message": self.message})


class SubnetNotFoundException(InvalidVPCOptionsException):
    def __init__(self, subnet_id: str) -> None:
        super().__init__(f"The subnet ID {subnet_id} does not exist")


class SecurityGroupNotFoundException(InvalidVPCOptionsException):
    def __init__(self, sg_id: str) -> None:
        super().__init__(f"The security group {sg_id} does not exist")