1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import pytest
from aiohttp_sse_client2 import client as sse_client
from .const import WPT_SERVER
@pytest.mark.asyncio
async def test_eventsource_onmessage():
"""Test EventSource: onmessage.
..seealso: https://github.com/web-platform-tests/wpt/blob/master/
eventsource/eventsource-onmessage.htm
"""
def on_message(event):
"""Callback for message event."""
assert event.data == "data"
source = sse_client.EventSource(WPT_SERVER + 'resources/message.py',
on_message=on_message)
await source.connect()
async for e in source:
assert e.data == "data"
break
await source.close()
|