1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
from aiohttp_sse_client2 import client as sse_client
from .const import WPT_SERVER
@pytest.mark.asyncio
async def test_rquest_accept():
"""Test EventSource: Accept header.
..seealso: https://github.com/web-platform-tests/wpt/blob/master/
eventsource/request-accept.htm
"""
source = sse_client.EventSource(
WPT_SERVER + 'resources/accept.event_stream?pipe=sub')
await source.connect()
async for e in source:
assert e.data == "text/event-stream"
break
await source.close()
@pytest.mark.asyncio
async def test_rquest_cache_control():
"""Test EventSource: Cache-Control.
..seealso: https://github.com/web-platform-tests/wpt/blob/master/
eventsource/request-cache-control.htm
"""
source = sse_client.EventSource(
WPT_SERVER + 'resources/cache-control.event_stream?pipe=sub')
await source.connect()
async for e in source:
assert e.data == "no-cache"
break
await source.close()
@pytest.mark.asyncio
async def test_request_redirect():
"""Test EventSource: redirect.
..seealso: https://github.com/web-platform-tests/wpt/blob/master/
eventsource/request-redirect.htm
"""
async def test(status):
def on_error():
assert False
def on_open():
assert source.ready_state == sse_client.READY_STATE_OPEN
source = sse_client.EventSource(
WPT_SERVER.replace('eventsource', 'common/redirect.py?'
'location=/eventsource/resources/message.py&status='
+ str(status)),
on_open=on_open,
on_error=on_error)
await source.connect()
await source.close()
await test(301)
await test(302)
await test(303)
await test(307)
@pytest.mark.asyncio
async def test_request_status_error():
"""Test EventSource: redirect.
..seealso: https://github.com/web-platform-tests/wpt/blob/master/
eventsource/request-status-error.htm
"""
async def test(status):
def on_error():
assert source.ready_state == sse_client.READY_STATE_CLOSED
def on_message():
assert source.ready_state == sse_client.READY_STATE_OPEN
source = sse_client.EventSource(
WPT_SERVER + 'resources/status-error.py?status=' + str(status),
on_message=on_message,
on_error=on_error)
with pytest.raises(ConnectionError):
await source.connect()
await test(204)
await test(205)
await test(210)
await test(299)
await test(404)
await test(410)
await test(503)
@pytest.mark.asyncio
async def test_request_post_to_connect():
"""Test EventSource option method for connection.
"""
source = sse_client.EventSource(
WPT_SERVER + 'resources/message.py',
option={'method': "POST"}
)
await source.connect()
async for e in source:
assert e.data == "data"
break
await source.close()
|