1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
import asyncio
import logging
from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport
logging.basicConfig(level=logging.INFO)
async def main():
# Note: this example used the test backend from
# https://github.com/slothmanxyz/typegraphql-ws-apollo
transport = WebsocketsTransport(url="ws://localhost:5000/graphql")
client = Client(transport=transport)
session = await client.connect_async(reconnecting=True)
query = gql("subscription {receiveMessage {message}}")
while True:
try:
async for result in session.subscribe(query):
print(result)
except Exception as e:
print(f"Received exception {e}")
await asyncio.sleep(1)
asyncio.run(main())
|