File: exceptions.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (115 lines) | stat: -rw-r--r-- 3,366 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from moto.core.exceptions import ServiceException

from .constants import MAXIMUM_VISIBILITY_TIMEOUT


class SQSException(ServiceException):
    pass


class ReceiptHandleIsInvalid(SQSException):
    def __init__(self) -> None:
        super().__init__(
            "ReceiptHandleIsInvalid", "The input receipt handle is invalid."
        )


class MessageAttributesInvalid(SQSException):
    code = "InvalidParameterValue"

    def __init__(self, description: str):
        super().__init__(description)


class QueueDoesNotExist(SQSException):
    def __init__(self) -> None:
        super().__init__(
            "AWS.SimpleQueueService.NonExistentQueue",
            "The specified queue does not exist.",
        )


class QueueAlreadyExists(SQSException):
    def __init__(self, message: str):
        super().__init__("QueueAlreadyExists", message)


class EmptyBatchRequest(SQSException):
    def __init__(self, action: str = "Send") -> None:
        super().__init__(
            "AWS.SimpleQueueService.EmptyBatchRequest",
            f"There should be at least one {action}MessageBatchRequestEntry in the request.",
        )


class InvalidBatchEntryId(SQSException):
    def __init__(self) -> None:
        super().__init__(
            "InvalidBatchEntryId",
            "A batch entry id can only contain alphanumeric characters, "
            "hyphens and underscores. It can be at most 80 letters long.",
        )


class BatchRequestTooLong(SQSException):
    def __init__(self, length: int):
        # local import to avoid circular dependencies
        from .models import MAXIMUM_MESSAGE_LENGTH

        super().__init__(
            "BatchRequestTooLong",
            f"Batch requests cannot be longer than {MAXIMUM_MESSAGE_LENGTH} bytes. "
            f"You have sent {length} bytes.",
        )


class BatchEntryIdsNotDistinct(SQSException):
    def __init__(self, entry_id: str):
        super().__init__("BatchEntryIdsNotDistinct", f"Id {entry_id} repeated.")


class TooManyEntriesInBatchRequest(SQSException):
    def __init__(self, number: int):
        super().__init__(
            "TooManyEntriesInBatchRequest",
            f"Maximum number of entries per request are 10. You have sent {number}.",
        )


class InvalidAttributeName(SQSException):
    def __init__(self, attribute_name: str):
        super().__init__("InvalidAttributeName", f"Unknown Attribute {attribute_name}.")


class InvalidAttributeValue(SQSException):
    def __init__(self, attribute_name: str):
        super().__init__(
            "InvalidAttributeValue",
            f"Invalid value for the parameter {attribute_name}.",
        )


class InvalidParameterValue(SQSException):
    def __init__(self, message: str):
        super().__init__("InvalidParameterValue", message)


class MaxVisibilityTimeout(SQSException):
    code = "InvalidParameterValue"
    message = (
        f"Invalid request, maximum visibility timeout is {MAXIMUM_VISIBILITY_TIMEOUT}"
    )


class MissingParameter(SQSException):
    def __init__(self, parameter: str):
        super().__init__(
            "MissingParameter", f"The request must contain the parameter {parameter}."
        )


class OverLimit(SQSException):
    def __init__(self, count: int):
        super().__init__(
            "OverLimit", f"{count} Actions were found, maximum allowed is 7."
        )