File: subscription.py

package info (click to toggle)
python-apischema 0.18.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,608 kB
  • sloc: python: 15,266; sh: 7; makefile: 7
file content (41 lines) | stat: -rw-r--r-- 748 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
from typing import AsyncIterable

import graphql
from graphql import print_schema

from apischema.graphql import graphql_schema


def hello() -> str:
    return "world"


async def events() -> AsyncIterable[str]:
    yield "bonjour"
    yield "au revoir"


schema = graphql_schema(query=[hello], subscription=[events])
schema_str = """\
type Query {
  hello: String!
}

type Subscription {
  events: String!
}"""
assert print_schema(schema) == schema_str


async def test():
    subscription = await graphql.subscribe(
        schema, graphql.parse("subscription {events}")
    )
    assert [event.data async for event in subscription] == [
        {"events": "bonjour"},
        {"events": "au revoir"},
    ]


asyncio.run(test())