File: aiohttp_websockets.py

package info (click to toggle)
python-gql 4.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,900 kB
  • sloc: python: 21,677; makefile: 54
file content (170 lines) | stat: -rw-r--r-- 8,540 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from ssl import SSLContext
from typing import Any, Dict, List, Literal, Mapping, Optional, Union

from aiohttp import BasicAuth, ClientSession, Fingerprint
from aiohttp.typedefs import LooseHeaders, StrOrURL

from .common.adapters.aiohttp import AIOHTTPWebSocketsAdapter
from .websockets_protocol import WebsocketsProtocolTransportBase


class AIOHTTPWebsocketsTransport(WebsocketsProtocolTransportBase):
    """:ref:`Async Transport <async_transports>` used to execute GraphQL queries on
    remote servers with websocket connection.

    This transport uses asyncio and the provided aiohttp adapter library
    in order to send requests on a websocket connection.
    """

    def __init__(
        self,
        url: StrOrURL,
        *,
        subprotocols: Optional[List[str]] = None,
        heartbeat: Optional[float] = None,
        auth: Optional[BasicAuth] = None,
        origin: Optional[str] = None,
        params: Optional[Mapping[str, str]] = None,
        headers: Optional[LooseHeaders] = None,
        proxy: Optional[StrOrURL] = None,
        proxy_auth: Optional[BasicAuth] = None,
        proxy_headers: Optional[LooseHeaders] = None,
        ssl: Optional[Union[SSLContext, Literal[False], Fingerprint]] = None,
        websocket_close_timeout: float = 10.0,
        receive_timeout: Optional[float] = None,
        ssl_close_timeout: Optional[Union[int, float]] = 10,
        connect_timeout: Optional[Union[int, float]] = 10,
        close_timeout: Optional[Union[int, float]] = 10,
        ack_timeout: Optional[Union[int, float]] = 10,
        keep_alive_timeout: Optional[Union[int, float]] = None,
        init_payload: Dict[str, Any] = {},
        ping_interval: Optional[Union[int, float]] = None,
        pong_timeout: Optional[Union[int, float]] = None,
        answer_pings: bool = True,
        session: Optional[ClientSession] = None,
        client_session_args: Optional[Dict[str, Any]] = None,
        connect_args: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Initialize the transport with the given parameters.

        :param url: The GraphQL server URL. Example: 'wss://server.com:PORT/graphql'.
        :param subprotocols: list of subprotocols sent to the
            backend in the 'subprotocols' http header.
            By default: both apollo and graphql-ws subprotocols.
        :param float heartbeat: Send low level `ping` message every `heartbeat`
                                seconds and wait `pong` response, close
                                connection if `pong` response is not
                                received. The timer is reset on any data reception.
        :param auth: An object that represents HTTP Basic Authorization.
                     :class:`~aiohttp.BasicAuth` (optional)
        :param str origin: Origin header to send to server(optional)
        :param params: Mapping, iterable of tuple of *key*/*value* pairs or
                       string to be sent as parameters in the query
                       string of the new request. Ignored for subsequent
                       redirected requests (optional)

                       Allowed values are:

                       - :class:`collections.abc.Mapping` e.g. :class:`dict`,
                         :class:`multidict.MultiDict` or
                         :class:`multidict.MultiDictProxy`
                       - :class:`collections.abc.Iterable` e.g. :class:`tuple` or
                         :class:`list`
                       - :class:`str` with preferably url-encoded content
                         (**Warning:** content will not be encoded by *aiohttp*)
        :param headers: HTTP Headers that sent with every request
                        May be either *iterable of key-value pairs* or
                        :class:`~collections.abc.Mapping`
                        (e.g. :class:`dict`,
                        :class:`~multidict.CIMultiDict`).
        :param proxy: Proxy URL, :class:`str` or :class:`~yarl.URL` (optional)
        :param aiohttp.BasicAuth proxy_auth: an object that represents proxy HTTP
                                             Basic Authorization (optional)
        :param ssl: SSL validation mode. ``True`` for default SSL check
                      (:func:`ssl.create_default_context` is used),
                      ``False`` for skip SSL certificate validation,
                      :class:`aiohttp.Fingerprint` for fingerprint
                      validation, :class:`ssl.SSLContext` for custom SSL
                      certificate validation.
        :param float websocket_close_timeout: Timeout for websocket to close.
                                              ``10`` seconds by default
        :param float receive_timeout: Timeout for websocket to receive
                                      complete message.  ``None`` (unlimited)
                                      seconds by default
        :param ssl_close_timeout: Timeout in seconds to wait for the ssl connection
                                  to close properly
        :param connect_timeout: Timeout in seconds for the establishment
            of the websocket connection. If None is provided this will wait forever.
        :param close_timeout: Timeout in seconds for the close. If None is provided
            this will wait forever.
        :param ack_timeout: Timeout in seconds to wait for the connection_ack message
            from the server. If None is provided this will wait forever.
        :param keep_alive_timeout: Optional Timeout in seconds to receive
            a sign of liveness from the server.
        :param init_payload: Dict of the payload sent in the connection_init message.
        :param ping_interval: Delay in seconds between pings sent by the client to
            the backend for the graphql-ws protocol. None (by default) means that
            we don't send pings. Note: there are also pings sent by the underlying
            websockets protocol. See the
            :ref:`keepalive documentation <websockets_transport_keepalives>`
            for more information about this.
        :param pong_timeout: Delay in seconds to receive a pong from the backend
            after we sent a ping (only for the graphql-ws protocol).
            By default equal to half of the ping_interval.
        :param answer_pings: Whether the client answers the pings from the backend
            (for the graphql-ws protocol).
            By default: True
        :param session: Optional aiohttp.ClientSession instance.
        :param client_session_args: Dict of extra args passed to
                `aiohttp.ClientSession`_
        :param connect_args: Dict of extra args passed to
                `aiohttp.ClientSession.ws_connect`_

        .. _aiohttp.ClientSession.ws_connect:
          https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession.ws_connect
        .. _aiohttp.ClientSession:
          https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession
        """

        # Instanciate a AIOHTTPWebSocketAdapter to indicate the use
        # of the aiohttp dependency for this transport
        self.adapter: AIOHTTPWebSocketsAdapter = AIOHTTPWebSocketsAdapter(
            url=url,
            headers=headers,
            ssl=ssl,
            session=session,
            client_session_args=client_session_args,
            connect_args=connect_args,
            heartbeat=heartbeat,
            auth=auth,
            origin=origin,
            params=params,
            proxy=proxy,
            proxy_auth=proxy_auth,
            proxy_headers=proxy_headers,
            websocket_close_timeout=websocket_close_timeout,
            receive_timeout=receive_timeout,
            ssl_close_timeout=ssl_close_timeout,
        )

        # Initialize the WebsocketsProtocolTransportBase parent class
        super().__init__(
            adapter=self.adapter,
            init_payload=init_payload,
            connect_timeout=connect_timeout,
            close_timeout=close_timeout,
            ack_timeout=ack_timeout,
            keep_alive_timeout=keep_alive_timeout,
            ping_interval=ping_interval,
            pong_timeout=pong_timeout,
            answer_pings=answer_pings,
            subprotocols=subprotocols,
        )

    @property
    def headers(self) -> Optional[LooseHeaders]:
        return self.adapter.headers

    @property
    def ssl(self) -> Optional[Union[SSLContext, Literal[False], Fingerprint]]:
        return self.adapter.ssl