File: relay_connection.py

package info (click to toggle)
python-apischema 0.18.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,608 kB
  • sloc: python: 15,266; sh: 7; makefile: 7
file content (92 lines) | stat: -rw-r--r-- 1,899 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from dataclasses import dataclass
from typing import Optional, TypeVar

import graphql
from graphql.utilities import print_schema

from apischema.graphql import graphql_schema, relay, resolver

Cursor = int  # let's use an integer cursor in all our connection
Node = TypeVar("Node")
Connection = relay.Connection[Node, Cursor, relay.Edge[Node, Cursor]]
# Connection can now be used just like Connection[Ship] or Connection[Faction | None]


@dataclass
class Ship:
    name: str


@dataclass
class Faction:
    @resolver
    def ships(
        self, first: int | None, after: Cursor | None
    ) -> Connection[Optional[Ship]] | None:
        edges = [relay.Edge(Ship("X-Wing"), 0), relay.Edge(Ship("B-Wing"), 1)]
        return Connection(edges, relay.PageInfo.from_edges(edges))


def faction() -> Faction | None:
    return Faction()


schema = graphql_schema(query=[faction])
schema_str = """\
type Query {
  faction: Faction
}

type Faction {
  ships(first: Int, after: Int): ShipConnection
}

type ShipConnection {
  edges: [ShipEdge]
  pageInfo: PageInfo!
}

type ShipEdge {
  node: Ship
  cursor: Int!
}

type Ship {
  name: String!
}

type PageInfo {
  hasPreviousPage: Boolean!
  hasNextPage: Boolean!
  startCursor: Int
  endCursor: Int
}"""
assert print_schema(schema) == schema_str
query = """
{
    faction {
        ships {
            pageInfo {
                endCursor
                hasNextPage
            }
            edges {
                cursor
                node {
                    name
                }
            }
        }
    }
}"""
assert graphql.graphql_sync(schema, query).data == {
    "faction": {
        "ships": {
            "pageInfo": {"endCursor": 1, "hasNextPage": False},
            "edges": [
                {"cursor": 0, "node": {"name": "X-Wing"}},
                {"cursor": 1, "node": {"name": "B-Wing"}},
            ],
        }
    }
}