1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
import json
import logging
from ssl import SSLContext
from typing import Any, Dict, Optional, Tuple, Union, cast
from urllib.parse import urlparse
from graphql import ExecutionResult
from ..graphql_request import GraphQLRequest
from .appsync_auth import AppSyncAuthentication, AppSyncIAMAuthentication
from .common.adapters.websockets import WebSocketsAdapter
from .common.base import SubscriptionTransportBase
from .exceptions import TransportProtocolError, TransportServerError
from .websockets import WebsocketsTransport
log = logging.getLogger("gql.transport.appsync")
try:
import botocore
except ImportError: # pragma: no cover
# botocore is only needed for the IAM AppSync authentication method
pass
class AppSyncWebsocketsTransport(SubscriptionTransportBase):
""":ref:`Async Transport <async_transports>` used to execute GraphQL subscription on
AWS appsync realtime endpoint.
This transport uses asyncio and the websockets library in order to send requests
on a websocket connection.
"""
auth: AppSyncAuthentication
def __init__(
self,
url: str,
*,
auth: Optional[AppSyncAuthentication] = None,
session: Optional["botocore.session.Session"] = None,
ssl: Union[SSLContext, bool] = False,
connect_timeout: int = 10,
close_timeout: int = 10,
ack_timeout: int = 10,
keep_alive_timeout: Optional[Union[int, float]] = None,
connect_args: Dict[str, Any] = {},
) -> None:
"""Initialize the transport with the given parameters.
:param url: The GraphQL endpoint URL. Example:
https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
:param auth: Optional AWS authentication class which will provide the
necessary headers to be correctly authenticated. If this
argument is not provided, then we will try to authenticate
using IAM.
:param ssl: ssl_context of the connection.
:param connect_timeout: Timeout in seconds for the establishment
of the websocket connection. If None is provided this will wait forever.
:param close_timeout: Timeout in seconds for the close. If None is provided
this will wait forever.
:param ack_timeout: Timeout in seconds to wait for the connection_ack message
from the server. If None is provided this will wait forever.
:param keep_alive_timeout: Optional Timeout in seconds to receive
a sign of liveness from the server.
:param connect_args: Other parameters forwarded to websockets.connect
"""
if not auth:
# Extract host from url
host = str(urlparse(url).netloc)
# May raise NoRegionError or NoCredentialsError or ImportError
auth = AppSyncIAMAuthentication(host=host, session=session)
self.auth: AppSyncAuthentication = auth
self.ack_timeout: Optional[Union[int, float]] = ack_timeout
self.init_payload: Dict[str, Any] = {}
url = self.auth.get_auth_url(url)
# Instanciate a WebSocketAdapter to indicate the use
# of the websockets dependency for this transport
self.adapter: WebSocketsAdapter = WebSocketsAdapter(
url=url,
ssl=ssl,
connect_args=connect_args,
)
# Initialize the generic SubscriptionTransportBase parent class
super().__init__(
adapter=self.adapter,
connect_timeout=connect_timeout,
close_timeout=close_timeout,
keep_alive_timeout=keep_alive_timeout,
)
# Using the same 'graphql-ws' protocol as the apollo protocol
self.adapter.subprotocols = [
WebsocketsTransport.APOLLO_SUBPROTOCOL,
]
self.subprotocol = WebsocketsTransport.APOLLO_SUBPROTOCOL
def _parse_answer(
self, answer: str
) -> Tuple[str, Optional[int], Optional[ExecutionResult]]:
"""Parse the answer received from the server.
Difference between apollo protocol and aws protocol:
- aws protocol can return an error without an id
- aws protocol will send start_ack messages
Returns a list consisting of:
- the answer_type:
- 'connection_ack',
- 'connection_error',
- 'start_ack',
- 'ka',
- 'data',
- 'error',
- 'complete'
- the answer id (Integer) if received or None
- an execution Result if the answer_type is 'data' or None
"""
answer_type: str = ""
try:
json_answer = json.loads(answer)
answer_type = str(json_answer.get("type"))
if answer_type == "start_ack":
return ("start_ack", None, None)
elif answer_type == "error" and "id" not in json_answer:
error_payload = json_answer.get("payload")
raise TransportServerError(f"Server error: '{error_payload!r}'")
else:
return WebsocketsTransport._parse_answer_apollo(
cast(WebsocketsTransport, self), json_answer
)
except ValueError:
raise TransportProtocolError(
f"Server did not return a GraphQL result: {answer}"
)
async def _send_query(
self,
request: GraphQLRequest,
) -> int:
query_id = self.next_query_id
self.next_query_id += 1
data: Dict[str, Any] = request.payload
serialized_data = json.dumps(data, separators=(",", ":"))
payload = {"data": serialized_data}
message: Dict = {
"id": str(query_id),
"type": "start",
"payload": payload,
}
assert self.auth is not None
message["payload"]["extensions"] = {
"authorization": self.auth.get_headers(serialized_data)
}
await self._send(
json.dumps(
message,
separators=(",", ":"),
)
)
return query_id
subscribe = SubscriptionTransportBase.subscribe # type: ignore[assignment]
"""Send a subscription query and receive the results using
a python async generator.
Only subscriptions are supported, queries and mutations are forbidden.
The results are sent as an ExecutionResult object.
"""
async def execute(
self,
request: GraphQLRequest,
) -> ExecutionResult:
"""This method is not available.
Only subscriptions are supported on the AWS realtime endpoint.
:raise: AssertionError"""
raise AssertionError(
"execute method is not allowed for AppSyncWebsocketsTransport "
"because only subscriptions are allowed on the realtime endpoint."
)
_initialize = WebsocketsTransport._initialize
_stop_listener = WebsocketsTransport._send_stop_message # type: ignore
_send_init_message_and_wait_ack = (
WebsocketsTransport._send_init_message_and_wait_ack
)
_wait_ack = WebsocketsTransport._wait_ack
@property
def ssl(self) -> Union[SSLContext, bool]:
return self.adapter.ssl
|