1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
from typing import Any, List, Optional
class TransportError(Exception):
"""Base class for all the Transport exceptions"""
pass
class TransportProtocolError(TransportError):
"""Transport protocol error.
The answer received from the server does not correspond to the transport protocol.
"""
class TransportServerError(TransportError):
"""The server returned a global error.
This exception will close the transport connection.
"""
code: Optional[int]
def __init__(self, message: str, code: Optional[int] = None):
super().__init__(message)
self.code = code
class TransportQueryError(TransportError):
"""The server returned an error for a specific query.
This exception should not close the transport connection.
"""
query_id: Optional[int]
errors: Optional[List[Any]]
data: Optional[Any]
extensions: Optional[Any]
def __init__(
self,
msg: str,
query_id: Optional[int] = None,
errors: Optional[List[Any]] = None,
data: Optional[Any] = None,
extensions: Optional[Any] = None,
):
super().__init__(msg)
self.query_id = query_id
self.errors = errors
self.data = data
self.extensions = extensions
class TransportClosed(TransportError):
"""Transport is already closed.
This exception is generated when the client is trying to use the transport
while the transport was previously closed.
"""
class TransportConnectionFailed(TransportError):
"""Transport connection failed.
This exception is by the connection adapter code when a connection closed
or if an unexpected Exception was received when trying to send a request.
"""
class TransportAlreadyConnected(TransportError):
"""Transport is already connected.
Exception generated when the client is trying to connect to the transport
while the transport is already connected.
"""
|