File: test_client.py

package info (click to toggle)
aiohttp-sse-client2 0.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 264 kB
  • sloc: python: 652; makefile: 85
file content (29 lines) | stat: -rw-r--r-- 902 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests for `aiohttp_sse_client` package."""
import json
import pytest

from aiohttp_sse_client2 import client as sse_client


@pytest.mark.asyncio
async def test_basic_usage():
    """Test basic usage."""
    messages = []
    async with sse_client.EventSource(
        'https://stream.wikimedia.org/v2/stream/recentchange'
    ) as event_source:
        async for message in event_source:
            if len(messages) > 1:
                break
            messages.append(message)

    print(messages)
    assert messages[0].type == 'message'
    assert messages[0].origin == 'https://stream.wikimedia.org'
    assert messages[1].type == 'message'
    assert messages[1].origin == 'https://stream.wikimedia.org'
    data_0 = json.loads(messages[0].data)
    data_1 = json.loads(messages[1].data)
    assert data_0['meta']['id'] != data_1['meta']['id']