1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests for `aiohttp_sse_client` package."""
import json
import pytest
from aiohttp_sse_client2 import client as sse_client
@pytest.mark.asyncio
async def test_basic_usage():
"""Test basic usage."""
messages = []
async with sse_client.EventSource(
'https://stream.wikimedia.org/v2/stream/recentchange'
) as event_source:
async for message in event_source:
if len(messages) > 1:
break
messages.append(message)
print(messages)
assert messages[0].type == 'message'
assert messages[0].origin == 'https://stream.wikimedia.org'
assert messages[1].type == 'message'
assert messages[1].origin == 'https://stream.wikimedia.org'
data_0 = json.loads(messages[0].data)
data_1 = json.loads(messages[1].data)
assert data_0['meta']['id'] != data_1['meta']['id']
|