File: local_schema.py

package info (click to toggle)
python-gql 4.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,900 kB
  • sloc: python: 21,677; makefile: 54
file content (102 lines) | stat: -rw-r--r-- 2,869 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import asyncio
from inspect import isawaitable
from typing import Any, AsyncGenerator, Awaitable, cast

from graphql import ExecutionResult, GraphQLSchema, execute, subscribe

from gql.transport import AsyncTransport

from ..graphql_request import GraphQLRequest


class LocalSchemaTransport(AsyncTransport):
    """A transport for executing GraphQL queries against a local schema."""

    def __init__(
        self,
        schema: GraphQLSchema,
    ):
        """Initialize the transport with the given local schema.

        :param schema: Local schema as GraphQLSchema object
        """
        self.schema = schema

    async def connect(self):
        """No connection needed on local transport"""
        pass

    async def close(self):
        """No close needed on local transport"""
        pass

    async def execute(
        self,
        request: GraphQLRequest,
        *args: Any,
        **kwargs: Any,
    ) -> ExecutionResult:
        """Execute the provided request for on a local GraphQL Schema."""

        inner_kwargs = {
            "variable_values": request.variable_values,
            "operation_name": request.operation_name,
            **kwargs,
        }

        result_or_awaitable = execute(
            self.schema,
            request.document,
            *args,
            **inner_kwargs,
        )

        execution_result: ExecutionResult

        if isawaitable(result_or_awaitable):
            result_or_awaitable = cast(Awaitable[ExecutionResult], result_or_awaitable)
            execution_result = await result_or_awaitable
        else:
            result_or_awaitable = cast(ExecutionResult, result_or_awaitable)
            execution_result = result_or_awaitable

        return execution_result

    @staticmethod
    async def _await_if_necessary(obj):
        """This method is necessary to work with
        graphql-core versions < and >= 3.3.0a3"""
        return await obj if asyncio.iscoroutine(obj) else obj

    async def subscribe(
        self,
        request: GraphQLRequest,
        *args: Any,
        **kwargs: Any,
    ) -> AsyncGenerator[ExecutionResult, None]:
        """Send a subscription and receive the results using an async generator

        The results are sent as an ExecutionResult object
        """

        inner_kwargs = {
            "variable_values": request.variable_values,
            "operation_name": request.operation_name,
            **kwargs,
        }

        subscribe_result = await self._await_if_necessary(
            subscribe(
                self.schema,
                request.document,
                *args,
                **inner_kwargs,
            )
        )

        if isinstance(subscribe_result, ExecutionResult):
            yield subscribe_result

        else:
            async for result in subscribe_result:
                yield result