1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
import asyncio
from typing import Awaitable
from pytest import mark
from graphql.execution import execute
from graphql.language import parse
from graphql.type import (
GraphQLSchema,
GraphQLObjectType,
GraphQLField,
GraphQLList,
GraphQLInterfaceType,
GraphQLBoolean,
GraphQLInt,
GraphQLString,
)
class Barrier:
"""Barrier that makes progress only after a certain number of waits."""
def __init__(self, number: int):
self.event = asyncio.Event()
self.number = number
async def wait(self) -> bool:
self.number -= 1
if not self.number:
self.event.set()
return await self.event.wait()
def describe_parallel_execution():
@mark.asyncio
async def resolve_fields_in_parallel():
barrier = Barrier(2)
async def resolve(*_args):
return await barrier.wait()
schema = GraphQLSchema(
GraphQLObjectType(
"Query",
{
"foo": GraphQLField(GraphQLBoolean, resolve=resolve),
"bar": GraphQLField(GraphQLBoolean, resolve=resolve),
},
)
)
ast = parse("{foo, bar}")
# raises TimeoutError if not parallel
awaitable_result = execute(schema, ast)
assert isinstance(awaitable_result, Awaitable)
result = await asyncio.wait_for(awaitable_result, 1.0)
assert result == ({"foo": True, "bar": True}, None)
@mark.asyncio
async def resolve_list_in_parallel():
barrier = Barrier(2)
async def resolve(*_args):
return await barrier.wait()
async def resolve_list(*args):
return [resolve(*args), resolve(*args)]
schema = GraphQLSchema(
GraphQLObjectType(
"Query",
{
"foo": GraphQLField(
GraphQLList(GraphQLBoolean), resolve=resolve_list
)
},
)
)
ast = parse("{foo}")
# raises TimeoutError if not parallel
awaitable_result = execute(schema, ast)
assert isinstance(awaitable_result, Awaitable)
result = await asyncio.wait_for(awaitable_result, 1.0)
assert result == ({"foo": [True, True]}, None)
@mark.asyncio
async def resolve_is_type_of_in_parallel():
FooType = GraphQLInterfaceType("Foo", {"foo": GraphQLField(GraphQLString)})
barrier = Barrier(4)
async def is_type_of_bar(obj, *_args):
await barrier.wait()
return obj["foo"] == "bar"
BarType = GraphQLObjectType(
"Bar",
{"foo": GraphQLField(GraphQLString), "foobar": GraphQLField(GraphQLInt)},
interfaces=[FooType],
is_type_of=is_type_of_bar,
)
async def is_type_of_baz(obj, *_args):
await barrier.wait()
return obj["foo"] == "baz"
BazType = GraphQLObjectType(
"Baz",
{"foo": GraphQLField(GraphQLString), "foobaz": GraphQLField(GraphQLInt)},
interfaces=[FooType],
is_type_of=is_type_of_baz,
)
schema = GraphQLSchema(
GraphQLObjectType(
"Query",
{
"foo": GraphQLField(
GraphQLList(FooType),
resolve=lambda *_args: [
{"foo": "bar", "foobar": 1},
{"foo": "baz", "foobaz": 2},
],
)
},
),
types=[BarType, BazType],
)
ast = parse(
"""
{
foo {
foo
... on Bar { foobar }
... on Baz { foobaz }
}
}
"""
)
# raises TimeoutError if not parallel
awaitable_result = execute(schema, ast)
assert isinstance(awaitable_result, Awaitable)
result = await asyncio.wait_for(awaitable_result, 1.0)
assert result == (
{"foo": [{"foo": "bar", "foobar": 1}, {"foo": "baz", "foobaz": 2}]},
None,
)
|