File: rpcbaseerrors.py

package info (click to toggle)
python-telethon 1.42.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,520 kB
  • sloc: python: 16,285; javascript: 200; makefile: 16; sh: 11
file content (131 lines) | stat: -rw-r--r-- 3,470 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from ..tl import functions

_NESTS_QUERY = (
    functions.InvokeAfterMsgRequest,
    functions.InvokeAfterMsgsRequest,
    functions.InitConnectionRequest,
    functions.InvokeWithLayerRequest,
    functions.InvokeWithoutUpdatesRequest,
    functions.InvokeWithMessagesRangeRequest,
    functions.InvokeWithTakeoutRequest,
)

class RPCError(Exception):
    """Base class for all Remote Procedure Call errors."""
    code = None
    message = None

    def __init__(self, request, message, code=None):
        super().__init__('RPCError {}: {}{}'.format(
            code or self.code, message, self._fmt_request(request)))

        self.request = request
        self.code = code
        self.message = message

    @staticmethod
    def _fmt_request(request):
        n = 0
        reason = ''
        while isinstance(request, _NESTS_QUERY):
            n += 1
            reason += request.__class__.__name__ + '('
            request = request.query
        reason += request.__class__.__name__ + ')' * n

        return ' (caused by {})'.format(reason)

    def __reduce__(self):
        return type(self), (self.request, self.message, self.code)


class InvalidDCError(RPCError):
    """
    The request must be repeated, but directed to a different data center.
    """
    code = 303
    message = 'ERROR_SEE_OTHER'


class BadRequestError(RPCError):
    """
    The query contains errors. In the event that a request was created
    using a form and contains user generated data, the user should be
    notified that the data must be corrected before the query is repeated.
    """
    code = 400
    message = 'BAD_REQUEST'


class UnauthorizedError(RPCError):
    """
    There was an unauthorized attempt to use functionality available only
    to authorized users.
    """
    code = 401
    message = 'UNAUTHORIZED'


class ForbiddenError(RPCError):
    """
    Privacy violation. For example, an attempt to write a message to
    someone who has blacklisted the current user.
    """
    code = 403
    message = 'FORBIDDEN'


class NotFoundError(RPCError):
    """
    An attempt to invoke a non-existent object, such as a method.
    """
    code = 404
    message = 'NOT_FOUND'


class AuthKeyError(RPCError):
    """
    Errors related to invalid authorization key, like
    AUTH_KEY_DUPLICATED which can cause the connection to fail.
    """
    code = 406
    message = 'AUTH_KEY'


class FloodError(RPCError):
    """
    The maximum allowed number of attempts to invoke the given method
    with the given input parameters has been exceeded. For example, in an
    attempt to request a large number of text messages (SMS) for the same
    phone number.
    """
    code = 420
    message = 'FLOOD'


class ServerError(RPCError):
    """
    An internal server error occurred while a request was being processed
    for example, there was a disruption while accessing a database or file
    storage.
    """
    code = 500  # Also witnessed as -500
    message = 'INTERNAL'


class TimedOutError(RPCError):
    """
    Clicking the inline buttons of bots that never (or take to long to)
    call ``answerCallbackQuery`` will result in this "special" RPCError.
    """
    code = 503  # Only witnessed as -503
    message = 'Timeout'


BotTimeout = TimedOutError


base_errors = {x.code: x for x in (
    InvalidDCError, BadRequestError, UnauthorizedError, ForbiddenError,
    NotFoundError, AuthKeyError, FloodError, ServerError, TimedOutError
)}