1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
from moto.core.exceptions import ServiceException
from .constants import MAXIMUM_VISIBILITY_TIMEOUT
class SQSException(ServiceException):
pass
class ReceiptHandleIsInvalid(SQSException):
def __init__(self) -> None:
super().__init__(
"ReceiptHandleIsInvalid", "The input receipt handle is invalid."
)
class MessageAttributesInvalid(SQSException):
code = "InvalidParameterValue"
def __init__(self, description: str):
super().__init__(description)
class QueueDoesNotExist(SQSException):
def __init__(self) -> None:
super().__init__(
"AWS.SimpleQueueService.NonExistentQueue",
"The specified queue does not exist.",
)
class QueueAlreadyExists(SQSException):
def __init__(self, message: str):
super().__init__("QueueAlreadyExists", message)
class EmptyBatchRequest(SQSException):
def __init__(self, action: str = "Send") -> None:
super().__init__(
"AWS.SimpleQueueService.EmptyBatchRequest",
f"There should be at least one {action}MessageBatchRequestEntry in the request.",
)
class InvalidBatchEntryId(SQSException):
def __init__(self) -> None:
super().__init__(
"InvalidBatchEntryId",
"A batch entry id can only contain alphanumeric characters, "
"hyphens and underscores. It can be at most 80 letters long.",
)
class BatchRequestTooLong(SQSException):
def __init__(self, length: int):
# local import to avoid circular dependencies
from .models import MAXIMUM_MESSAGE_LENGTH
super().__init__(
"BatchRequestTooLong",
f"Batch requests cannot be longer than {MAXIMUM_MESSAGE_LENGTH} bytes. "
f"You have sent {length} bytes.",
)
class BatchEntryIdsNotDistinct(SQSException):
def __init__(self, entry_id: str):
super().__init__("BatchEntryIdsNotDistinct", f"Id {entry_id} repeated.")
class TooManyEntriesInBatchRequest(SQSException):
def __init__(self, number: int):
super().__init__(
"TooManyEntriesInBatchRequest",
f"Maximum number of entries per request are 10. You have sent {number}.",
)
class InvalidAttributeName(SQSException):
def __init__(self, attribute_name: str):
super().__init__("InvalidAttributeName", f"Unknown Attribute {attribute_name}.")
class InvalidAttributeValue(SQSException):
def __init__(self, attribute_name: str):
super().__init__(
"InvalidAttributeValue",
f"Invalid value for the parameter {attribute_name}.",
)
class InvalidParameterValue(SQSException):
def __init__(self, message: str):
super().__init__("InvalidParameterValue", message)
class MaxVisibilityTimeout(SQSException):
code = "InvalidParameterValue"
message = (
f"Invalid request, maximum visibility timeout is {MAXIMUM_VISIBILITY_TIMEOUT}"
)
class MissingParameter(SQSException):
def __init__(self, parameter: str):
super().__init__(
"MissingParameter", f"The request must contain the parameter {parameter}."
)
class OverLimit(SQSException):
def __init__(self, count: int):
super().__init__(
"OverLimit", f"{count} Actions were found, maximum allowed is 7."
)
|