File: test_eventsource_reconnect.py

package info (click to toggle)
aiohttp-sse-client2 0.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 264 kB
  • sloc: python: 652; makefile: 85
file content (56 lines) | stat: -rw-r--r-- 1,601 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import pytest

from aiohttp_sse_client2 import client as sse_client

from .const import WPT_SERVER


@pytest.mark.asyncio
async def test_eventsource_reconnect():
    """Test EventSource: reconnection.

    ..seealso: https://github.com/web-platform-tests/wpt/blob/master/
    eventsource/eventsource-reconnect.htm
    """
    source = sse_client.EventSource(
        WPT_SERVER + 'resources/status-reconnect.py?status=200')
    await source.connect()
    async for e in source:
        assert e.data == 'data'
        break
    await source.close()


@pytest.mark.asyncio
async def test_eventsource_reconnect_event():
    """Test EventSource: reconnection event.

    ..seealso: https://github.com/web-platform-tests/wpt/blob/master/
    eventsource/eventsource-reconnect.htm
    """
    opened  = False
    reconnected = False

    def on_error():
        nonlocal reconnected
        assert source.ready_state == sse_client.READY_STATE_CONNECTING
        assert opened is True
        reconnected = True

    async with sse_client.EventSource(
        WPT_SERVER + 'resources/status-reconnect.py?status=200&ok_first&id=2',
        reconnection_time=timedelta(milliseconds=2),
        on_error=on_error
    ) as source:
        async for e in source:
            if not opened:
                opened = True
                assert reconnected is False
                assert e.data == "ok"
            else:
                assert reconnected is True
                assert e.data == "data"
                break